offsetcommit.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package kafka
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "net"
  7. "time"
  8. "github.com/segmentio/kafka-go/protocol/offsetcommit"
  9. )
  10. // OffsetCommit represent the commit of an offset to a partition.
  11. //
  12. // The extra metadata is opaque to the kafka protocol, it is intended to hold
  13. // information like an identifier for the process that committed the offset,
  14. // or the time at which the commit was made.
  15. type OffsetCommit struct {
  16. Partition int
  17. Offset int64
  18. Metadata string
  19. }
  20. // OffsetCommitRequest represents a request sent to a kafka broker to commit
  21. // offsets for a partition.
  22. type OffsetCommitRequest struct {
  23. // Address of the kafka broker to send the request to.
  24. Addr net.Addr
  25. // ID of the consumer group to publish the offsets for.
  26. GroupID string
  27. // ID of the consumer group generation.
  28. GenerationID int
  29. // ID of the group member submitting the offsets.
  30. MemberID string
  31. // ID of the group instance.
  32. InstanceID string
  33. // Set of topic partitions to publish the offsets for.
  34. //
  35. // Not that offset commits need to be submitted to the broker acting as the
  36. // group coordinator. This will be automatically resolved by the transport.
  37. Topics map[string][]OffsetCommit
  38. }
  39. // OffsetFetchResponse represents a response from a kafka broker to an offset
  40. // commit request.
  41. type OffsetCommitResponse struct {
  42. // The amount of time that the broker throttled the request.
  43. Throttle time.Duration
  44. // Set of topic partitions that the kafka broker has accepted offset commits
  45. // for.
  46. Topics map[string][]OffsetCommitPartition
  47. }
  48. // OffsetFetchPartition represents the state of a single partition in responses
  49. // to committing offsets.
  50. type OffsetCommitPartition struct {
  51. // ID of the partition.
  52. Partition int
  53. // An error that may have occurred while attempting to publish consumer
  54. // group offsets for this partition.
  55. //
  56. // The error contains both the kafka error code, and an error message
  57. // returned by the kafka broker. Programs may use the standard errors.Is
  58. // function to test the error against kafka error codes.
  59. Error error
  60. }
  61. // OffsetCommit sends an offset commit request to a kafka broker and returns the
  62. // response.
  63. func (c *Client) OffsetCommit(ctx context.Context, req *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  64. now := time.Now().UnixNano() / int64(time.Millisecond)
  65. topics := make([]offsetcommit.RequestTopic, 0, len(req.Topics))
  66. for topicName, commits := range req.Topics {
  67. partitions := make([]offsetcommit.RequestPartition, len(commits))
  68. for i, c := range commits {
  69. partitions[i] = offsetcommit.RequestPartition{
  70. PartitionIndex: int32(c.Partition),
  71. CommittedOffset: c.Offset,
  72. CommittedMetadata: c.Metadata,
  73. // This field existed in v1 of the OffsetCommit API, setting it
  74. // to the current timestamp is probably a safe thing to do, but
  75. // it is hard to tell.
  76. CommitTimestamp: now,
  77. }
  78. }
  79. topics = append(topics, offsetcommit.RequestTopic{
  80. Name: topicName,
  81. Partitions: partitions,
  82. })
  83. }
  84. m, err := c.roundTrip(ctx, req.Addr, &offsetcommit.Request{
  85. GroupID: req.GroupID,
  86. GenerationID: int32(req.GenerationID),
  87. MemberID: req.MemberID,
  88. GroupInstanceID: req.InstanceID,
  89. Topics: topics,
  90. // Hardcoded retention; this field existed between v2 and v4 of the
  91. // OffsetCommit API, we would have to figure out a way to give the
  92. // client control over the API version being used to support configuring
  93. // it in the request object.
  94. RetentionTimeMs: int64((24 * time.Hour) / time.Millisecond),
  95. })
  96. if err != nil {
  97. return nil, fmt.Errorf("kafka.(*Client).OffsetCommit: %w", err)
  98. }
  99. r := m.(*offsetcommit.Response)
  100. res := &OffsetCommitResponse{
  101. Throttle: makeDuration(r.ThrottleTimeMs),
  102. Topics: make(map[string][]OffsetCommitPartition, len(r.Topics)),
  103. }
  104. for _, topic := range r.Topics {
  105. partitions := make([]OffsetCommitPartition, len(topic.Partitions))
  106. for i, p := range topic.Partitions {
  107. partitions[i] = OffsetCommitPartition{
  108. Partition: int(p.PartitionIndex),
  109. Error: makeError(p.ErrorCode, ""),
  110. }
  111. }
  112. res.Topics[topic.Name] = partitions
  113. }
  114. return res, nil
  115. }
  116. type offsetCommitRequestV2Partition struct {
  117. // Partition ID
  118. Partition int32
  119. // Offset to be committed
  120. Offset int64
  121. // Metadata holds any associated metadata the client wants to keep
  122. Metadata string
  123. }
  124. func (t offsetCommitRequestV2Partition) size() int32 {
  125. return sizeofInt32(t.Partition) +
  126. sizeofInt64(t.Offset) +
  127. sizeofString(t.Metadata)
  128. }
  129. func (t offsetCommitRequestV2Partition) writeTo(wb *writeBuffer) {
  130. wb.writeInt32(t.Partition)
  131. wb.writeInt64(t.Offset)
  132. wb.writeString(t.Metadata)
  133. }
  134. type offsetCommitRequestV2Topic struct {
  135. // Topic name
  136. Topic string
  137. // Partitions to commit offsets
  138. Partitions []offsetCommitRequestV2Partition
  139. }
  140. func (t offsetCommitRequestV2Topic) size() int32 {
  141. return sizeofString(t.Topic) +
  142. sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
  143. }
  144. func (t offsetCommitRequestV2Topic) writeTo(wb *writeBuffer) {
  145. wb.writeString(t.Topic)
  146. wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
  147. }
  148. type offsetCommitRequestV2 struct {
  149. // GroupID holds the unique group identifier
  150. GroupID string
  151. // GenerationID holds the generation of the group.
  152. GenerationID int32
  153. // MemberID assigned by the group coordinator
  154. MemberID string
  155. // RetentionTime holds the time period in ms to retain the offset.
  156. RetentionTime int64
  157. // Topics to commit offsets
  158. Topics []offsetCommitRequestV2Topic
  159. }
  160. func (t offsetCommitRequestV2) size() int32 {
  161. return sizeofString(t.GroupID) +
  162. sizeofInt32(t.GenerationID) +
  163. sizeofString(t.MemberID) +
  164. sizeofInt64(t.RetentionTime) +
  165. sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() })
  166. }
  167. func (t offsetCommitRequestV2) writeTo(wb *writeBuffer) {
  168. wb.writeString(t.GroupID)
  169. wb.writeInt32(t.GenerationID)
  170. wb.writeString(t.MemberID)
  171. wb.writeInt64(t.RetentionTime)
  172. wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) })
  173. }
  174. type offsetCommitResponseV2PartitionResponse struct {
  175. Partition int32
  176. // ErrorCode holds response error code
  177. ErrorCode int16
  178. }
  179. func (t offsetCommitResponseV2PartitionResponse) size() int32 {
  180. return sizeofInt32(t.Partition) +
  181. sizeofInt16(t.ErrorCode)
  182. }
  183. func (t offsetCommitResponseV2PartitionResponse) writeTo(wb *writeBuffer) {
  184. wb.writeInt32(t.Partition)
  185. wb.writeInt16(t.ErrorCode)
  186. }
  187. func (t *offsetCommitResponseV2PartitionResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  188. if remain, err = readInt32(r, size, &t.Partition); err != nil {
  189. return
  190. }
  191. if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
  192. return
  193. }
  194. return
  195. }
  196. type offsetCommitResponseV2Response struct {
  197. Topic string
  198. PartitionResponses []offsetCommitResponseV2PartitionResponse
  199. }
  200. func (t offsetCommitResponseV2Response) size() int32 {
  201. return sizeofString(t.Topic) +
  202. sizeofArray(len(t.PartitionResponses), func(i int) int32 { return t.PartitionResponses[i].size() })
  203. }
  204. func (t offsetCommitResponseV2Response) writeTo(wb *writeBuffer) {
  205. wb.writeString(t.Topic)
  206. wb.writeArray(len(t.PartitionResponses), func(i int) { t.PartitionResponses[i].writeTo(wb) })
  207. }
  208. func (t *offsetCommitResponseV2Response) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  209. if remain, err = readString(r, size, &t.Topic); err != nil {
  210. return
  211. }
  212. fn := func(r *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
  213. item := offsetCommitResponseV2PartitionResponse{}
  214. if fnRemain, fnErr = (&item).readFrom(r, withSize); fnErr != nil {
  215. return
  216. }
  217. t.PartitionResponses = append(t.PartitionResponses, item)
  218. return
  219. }
  220. if remain, err = readArrayWith(r, remain, fn); err != nil {
  221. return
  222. }
  223. return
  224. }
  225. type offsetCommitResponseV2 struct {
  226. Responses []offsetCommitResponseV2Response
  227. }
  228. func (t offsetCommitResponseV2) size() int32 {
  229. return sizeofArray(len(t.Responses), func(i int) int32 { return t.Responses[i].size() })
  230. }
  231. func (t offsetCommitResponseV2) writeTo(wb *writeBuffer) {
  232. wb.writeArray(len(t.Responses), func(i int) { t.Responses[i].writeTo(wb) })
  233. }
  234. func (t *offsetCommitResponseV2) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  235. fn := func(r *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
  236. item := offsetCommitResponseV2Response{}
  237. if fnRemain, fnErr = (&item).readFrom(r, withSize); fnErr != nil {
  238. return
  239. }
  240. t.Responses = append(t.Responses, item)
  241. return
  242. }
  243. if remain, err = readArrayWith(r, size, fn); err != nil {
  244. return
  245. }
  246. return
  247. }