client.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package kafka
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "time"
  8. "github.com/segmentio/kafka-go/protocol"
  9. )
  10. const (
  11. defaultCreateTopicsTimeout = 2 * time.Second
  12. defaultDeleteTopicsTimeout = 2 * time.Second
  13. defaultCreatePartitionsTimeout = 2 * time.Second
  14. defaultProduceTimeout = 500 * time.Millisecond
  15. defaultMaxWait = 500 * time.Millisecond
  16. )
  17. // Client is a high-level API to interract with kafka brokers.
  18. //
  19. // All methods of the Client type accept a context as first argument, which may
  20. // be used to asynchronously cancel the requests.
  21. //
  22. // Clients are safe to use concurrently from multiple goroutines, as long as
  23. // their configuration is not changed after first use.
  24. type Client struct {
  25. // Address of the kafka cluster (or specific broker) that the client will be
  26. // sending requests to.
  27. //
  28. // This field is optional, the address may be provided in each request
  29. // instead. The request address takes precedence if both were specified.
  30. Addr net.Addr
  31. // Time limit for requests sent by this client.
  32. //
  33. // If zero, no timeout is applied.
  34. Timeout time.Duration
  35. // A transport used to communicate with the kafka brokers.
  36. //
  37. // If nil, DefaultTransport is used.
  38. Transport RoundTripper
  39. }
  40. // A ConsumerGroup and Topic as these are both strings we define a type for
  41. // clarity when passing to the Client as a function argument
  42. //
  43. // N.B TopicAndGroup is currently experimental! Therefore, it is subject to
  44. // change, including breaking changes between MINOR and PATCH releases.
  45. //
  46. // DEPRECATED: this type will be removed in version 1.0, programs should
  47. // migrate to use kafka.(*Client).OffsetFetch instead.
  48. type TopicAndGroup struct {
  49. Topic string
  50. GroupId string
  51. }
  52. // ConsumerOffsets returns a map[int]int64 of partition to committed offset for
  53. // a consumer group id and topic.
  54. //
  55. // DEPRECATED: this method will be removed in version 1.0, programs should
  56. // migrate to use kafka.(*Client).OffsetFetch instead.
  57. func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int]int64, error) {
  58. metadata, err := c.Metadata(ctx, &MetadataRequest{
  59. Topics: []string{tg.Topic},
  60. })
  61. if err != nil {
  62. return nil, fmt.Errorf("failed to get topic metadata :%w", err)
  63. }
  64. topic := metadata.Topics[0]
  65. partitions := make([]int, len(topic.Partitions))
  66. for i := range topic.Partitions {
  67. partitions[i] = topic.Partitions[i].ID
  68. }
  69. offsets, err := c.OffsetFetch(ctx, &OffsetFetchRequest{
  70. GroupID: tg.GroupId,
  71. Topics: map[string][]int{
  72. tg.Topic: partitions,
  73. },
  74. })
  75. if err != nil {
  76. return nil, fmt.Errorf("failed to get offsets: %w", err)
  77. }
  78. topicOffsets := offsets.Topics[topic.Name]
  79. partitionOffsets := make(map[int]int64, len(topicOffsets))
  80. for _, off := range topicOffsets {
  81. partitionOffsets[off.Partition] = off.CommittedOffset
  82. }
  83. return partitionOffsets, nil
  84. }
  85. func (c *Client) roundTrip(ctx context.Context, addr net.Addr, msg protocol.Message) (protocol.Message, error) {
  86. if c.Timeout > 0 {
  87. var cancel context.CancelFunc
  88. ctx, cancel = context.WithTimeout(ctx, c.Timeout)
  89. defer cancel()
  90. }
  91. if addr == nil {
  92. if addr = c.Addr; addr == nil {
  93. return nil, errors.New("no address was given for the kafka cluster in the request or on the client")
  94. }
  95. }
  96. return c.transport().RoundTrip(ctx, addr, msg)
  97. }
  98. func (c *Client) transport() RoundTripper {
  99. if c.Transport != nil {
  100. return c.Transport
  101. }
  102. return DefaultTransport
  103. }
  104. func (c *Client) timeout(ctx context.Context, defaultTimeout time.Duration) time.Duration {
  105. timeout := c.Timeout
  106. if deadline, ok := ctx.Deadline(); ok {
  107. if remain := time.Until(deadline); remain < timeout {
  108. timeout = remain
  109. }
  110. }
  111. if timeout > 0 {
  112. // Half the timeout because it is communicated to kafka in multiple
  113. // requests (e.g. Fetch, Produce, etc...), this adds buffer to account
  114. // for network latency when waiting for the response from kafka.
  115. return timeout / 2
  116. }
  117. return defaultTimeout
  118. }
  119. func (c *Client) timeoutMs(ctx context.Context, defaultTimeout time.Duration) int32 {
  120. return milliseconds(c.timeout(ctx, defaultTimeout))
  121. }