offsetfetch.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package kafka
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "net"
  7. "time"
  8. "github.com/segmentio/kafka-go/protocol/offsetfetch"
  9. )
  10. // OffsetFetchRequest represents a request sent to a kafka broker to read the
  11. // currently committed offsets of topic partitions.
  12. type OffsetFetchRequest struct {
  13. // Address of the kafka broker to send the request to.
  14. Addr net.Addr
  15. // ID of the consumer group to retrieve the offsets for.
  16. GroupID string
  17. // Set of topic partitions to retrieve the offsets for.
  18. Topics map[string][]int
  19. }
  20. // OffsetFetchResponse represents a response from a kafka broker to an offset
  21. // fetch request.
  22. type OffsetFetchResponse struct {
  23. // The amount of time that the broker throttled the request.
  24. Throttle time.Duration
  25. // Set of topic partitions that the kafka broker has returned offsets for.
  26. Topics map[string][]OffsetFetchPartition
  27. // An error that may have occurred while attempting to retrieve consumer
  28. // group offsets.
  29. //
  30. // The error contains both the kafka error code, and an error message
  31. // returned by the kafka broker. Programs may use the standard errors.Is
  32. // function to test the error against kafka error codes.
  33. Error error
  34. }
  35. // OffsetFetchPartition represents the state of a single partition in a consumer
  36. // group.
  37. type OffsetFetchPartition struct {
  38. // ID of the partition.
  39. Partition int
  40. // Last committed offsets on the partition when the request was served by
  41. // the kafka broker.
  42. CommittedOffset int64
  43. // Consumer group metadata for this partition.
  44. Metadata string
  45. // An error that may have occurred while attempting to retrieve consumer
  46. // group offsets for this partition.
  47. //
  48. // The error contains both the kafka error code, and an error message
  49. // returned by the kafka broker. Programs may use the standard errors.Is
  50. // function to test the error against kafka error codes.
  51. Error error
  52. }
  53. // OffsetFetch sends an offset fetch request to a kafka broker and returns the
  54. // response.
  55. func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  56. // Kafka version 0.10.2.x and above allow null Topics map for OffsetFetch API
  57. // which will return the result for all topics with the desired consumer group:
  58. // https://kafka.apache.org/0102/protocol.html#The_Messages_OffsetFetch
  59. // For Kafka version below 0.10.2.x this call will result in an error
  60. var topics []offsetfetch.RequestTopic
  61. if len(req.Topics) > 0 {
  62. topics = make([]offsetfetch.RequestTopic, 0, len(req.Topics))
  63. for topicName, partitions := range req.Topics {
  64. indexes := make([]int32, len(partitions))
  65. for i, p := range partitions {
  66. indexes[i] = int32(p)
  67. }
  68. topics = append(topics, offsetfetch.RequestTopic{
  69. Name: topicName,
  70. PartitionIndexes: indexes,
  71. })
  72. }
  73. }
  74. m, err := c.roundTrip(ctx, req.Addr, &offsetfetch.Request{
  75. GroupID: req.GroupID,
  76. Topics: topics,
  77. })
  78. if err != nil {
  79. return nil, fmt.Errorf("kafka.(*Client).OffsetFetch: %w", err)
  80. }
  81. res := m.(*offsetfetch.Response)
  82. ret := &OffsetFetchResponse{
  83. Throttle: makeDuration(res.ThrottleTimeMs),
  84. Topics: make(map[string][]OffsetFetchPartition, len(res.Topics)),
  85. Error: makeError(res.ErrorCode, ""),
  86. }
  87. for _, t := range res.Topics {
  88. partitions := make([]OffsetFetchPartition, len(t.Partitions))
  89. for i, p := range t.Partitions {
  90. partitions[i] = OffsetFetchPartition{
  91. Partition: int(p.PartitionIndex),
  92. CommittedOffset: p.CommittedOffset,
  93. Metadata: p.Metadata,
  94. Error: makeError(p.ErrorCode, ""),
  95. }
  96. }
  97. ret.Topics[t.Name] = partitions
  98. }
  99. return ret, nil
  100. }
  101. type offsetFetchRequestV1Topic struct {
  102. // Topic name
  103. Topic string
  104. // Partitions to fetch offsets
  105. Partitions []int32
  106. }
  107. func (t offsetFetchRequestV1Topic) size() int32 {
  108. return sizeofString(t.Topic) +
  109. sizeofInt32Array(t.Partitions)
  110. }
  111. func (t offsetFetchRequestV1Topic) writeTo(wb *writeBuffer) {
  112. wb.writeString(t.Topic)
  113. wb.writeInt32Array(t.Partitions)
  114. }
  115. type offsetFetchRequestV1 struct {
  116. // GroupID holds the unique group identifier
  117. GroupID string
  118. // Topics to fetch offsets.
  119. Topics []offsetFetchRequestV1Topic
  120. }
  121. func (t offsetFetchRequestV1) size() int32 {
  122. return sizeofString(t.GroupID) +
  123. sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() })
  124. }
  125. func (t offsetFetchRequestV1) writeTo(wb *writeBuffer) {
  126. wb.writeString(t.GroupID)
  127. wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) })
  128. }
  129. type offsetFetchResponseV1PartitionResponse struct {
  130. // Partition ID
  131. Partition int32
  132. // Offset of last committed message
  133. Offset int64
  134. // Metadata client wants to keep
  135. Metadata string
  136. // ErrorCode holds response error code
  137. ErrorCode int16
  138. }
  139. func (t offsetFetchResponseV1PartitionResponse) size() int32 {
  140. return sizeofInt32(t.Partition) +
  141. sizeofInt64(t.Offset) +
  142. sizeofString(t.Metadata) +
  143. sizeofInt16(t.ErrorCode)
  144. }
  145. func (t offsetFetchResponseV1PartitionResponse) writeTo(wb *writeBuffer) {
  146. wb.writeInt32(t.Partition)
  147. wb.writeInt64(t.Offset)
  148. wb.writeString(t.Metadata)
  149. wb.writeInt16(t.ErrorCode)
  150. }
  151. func (t *offsetFetchResponseV1PartitionResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  152. if remain, err = readInt32(r, size, &t.Partition); err != nil {
  153. return
  154. }
  155. if remain, err = readInt64(r, remain, &t.Offset); err != nil {
  156. return
  157. }
  158. if remain, err = readString(r, remain, &t.Metadata); err != nil {
  159. return
  160. }
  161. if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
  162. return
  163. }
  164. return
  165. }
  166. type offsetFetchResponseV1Response struct {
  167. // Topic name
  168. Topic string
  169. // PartitionResponses holds offsets by partition
  170. PartitionResponses []offsetFetchResponseV1PartitionResponse
  171. }
  172. func (t offsetFetchResponseV1Response) size() int32 {
  173. return sizeofString(t.Topic) +
  174. sizeofArray(len(t.PartitionResponses), func(i int) int32 { return t.PartitionResponses[i].size() })
  175. }
  176. func (t offsetFetchResponseV1Response) writeTo(wb *writeBuffer) {
  177. wb.writeString(t.Topic)
  178. wb.writeArray(len(t.PartitionResponses), func(i int) { t.PartitionResponses[i].writeTo(wb) })
  179. }
  180. func (t *offsetFetchResponseV1Response) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  181. if remain, err = readString(r, size, &t.Topic); err != nil {
  182. return
  183. }
  184. fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
  185. item := offsetFetchResponseV1PartitionResponse{}
  186. if fnRemain, fnErr = (&item).readFrom(r, size); err != nil {
  187. return
  188. }
  189. t.PartitionResponses = append(t.PartitionResponses, item)
  190. return
  191. }
  192. if remain, err = readArrayWith(r, remain, fn); err != nil {
  193. return
  194. }
  195. return
  196. }
  197. type offsetFetchResponseV1 struct {
  198. // Responses holds topic partition offsets
  199. Responses []offsetFetchResponseV1Response
  200. }
  201. func (t offsetFetchResponseV1) size() int32 {
  202. return sizeofArray(len(t.Responses), func(i int) int32 { return t.Responses[i].size() })
  203. }
  204. func (t offsetFetchResponseV1) writeTo(wb *writeBuffer) {
  205. wb.writeArray(len(t.Responses), func(i int) { t.Responses[i].writeTo(wb) })
  206. }
  207. func (t *offsetFetchResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  208. fn := func(r *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
  209. item := offsetFetchResponseV1Response{}
  210. if fnRemain, fnErr = (&item).readFrom(r, withSize); fnErr != nil {
  211. return
  212. }
  213. t.Responses = append(t.Responses, item)
  214. return
  215. }
  216. if remain, err = readArrayWith(r, size, fn); err != nil {
  217. return
  218. }
  219. return
  220. }