syncgroup.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package kafka
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "fmt"
  7. "net"
  8. "time"
  9. "github.com/segmentio/kafka-go/protocol"
  10. "github.com/segmentio/kafka-go/protocol/consumer"
  11. "github.com/segmentio/kafka-go/protocol/syncgroup"
  12. )
  13. // SyncGroupRequest is the request structure for the SyncGroup function.
  14. type SyncGroupRequest struct {
  15. // Address of the kafka broker to sent he request to.
  16. Addr net.Addr
  17. // GroupID of the group to sync.
  18. GroupID string
  19. // The generation of the group.
  20. GenerationID int
  21. // The member ID assigned by the group.
  22. MemberID string
  23. // The unique identifier for the consumer instance.
  24. GroupInstanceID string
  25. // The name for the class of protocols implemented by the group being joined.
  26. ProtocolType string
  27. // The group protocol name.
  28. ProtocolName string
  29. // The group member assignments.
  30. Assignments []SyncGroupRequestAssignment
  31. }
  32. // SyncGroupRequestAssignment represents an assignement for a goroup memeber.
  33. type SyncGroupRequestAssignment struct {
  34. // The ID of the member to assign.
  35. MemberID string
  36. // The member assignment.
  37. Assignment GroupProtocolAssignment
  38. }
  39. // SyncGroupResponse is the response structure for the SyncGroup function.
  40. type SyncGroupResponse struct {
  41. // An error that may have occurred when attempting to sync the group.
  42. //
  43. // The errors contain the kafka error code. Programs may use the standard
  44. // errors.Is function to test the error against kafka error codes.
  45. Error error
  46. // The amount of time that the broker throttled the request.
  47. Throttle time.Duration
  48. // The group protocol type.
  49. ProtocolType string
  50. // The group protocol name.
  51. ProtocolName string
  52. // The member assignment.
  53. Assignment GroupProtocolAssignment
  54. }
  55. // GroupProtocolAssignment represents an assignment of topics and partitions for a group memeber.
  56. type GroupProtocolAssignment struct {
  57. // The topics and partitions assigned to the group memeber.
  58. AssignedPartitions map[string][]int
  59. // UserData for the assignemnt.
  60. UserData []byte
  61. }
  62. // SyncGroup sends a sync group request to the coordinator and returns the response.
  63. func (c *Client) SyncGroup(ctx context.Context, req *SyncGroupRequest) (*SyncGroupResponse, error) {
  64. syncGroup := syncgroup.Request{
  65. GroupID: req.GroupID,
  66. GenerationID: int32(req.GenerationID),
  67. MemberID: req.MemberID,
  68. GroupInstanceID: req.GroupInstanceID,
  69. ProtocolType: req.ProtocolType,
  70. ProtocolName: req.ProtocolName,
  71. Assignments: make([]syncgroup.RequestAssignment, 0, len(req.Assignments)),
  72. }
  73. for _, assignment := range req.Assignments {
  74. assign := consumer.Assignment{
  75. Version: consumer.MaxVersionSupported,
  76. AssignedPartitions: make([]consumer.TopicPartition, 0, len(assignment.Assignment.AssignedPartitions)),
  77. UserData: assignment.Assignment.UserData,
  78. }
  79. for topic, partitions := range assignment.Assignment.AssignedPartitions {
  80. tp := consumer.TopicPartition{
  81. Topic: topic,
  82. Partitions: make([]int32, 0, len(partitions)),
  83. }
  84. for _, partition := range partitions {
  85. tp.Partitions = append(tp.Partitions, int32(partition))
  86. }
  87. assign.AssignedPartitions = append(assign.AssignedPartitions, tp)
  88. }
  89. assignBytes, err := protocol.Marshal(consumer.MaxVersionSupported, assign)
  90. if err != nil {
  91. return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err)
  92. }
  93. syncGroup.Assignments = append(syncGroup.Assignments, syncgroup.RequestAssignment{
  94. MemberID: assignment.MemberID,
  95. Assignment: assignBytes,
  96. })
  97. }
  98. m, err := c.roundTrip(ctx, req.Addr, &syncGroup)
  99. if err != nil {
  100. return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err)
  101. }
  102. r := m.(*syncgroup.Response)
  103. var assignment consumer.Assignment
  104. err = protocol.Unmarshal(r.Assignments, consumer.MaxVersionSupported, &assignment)
  105. if err != nil {
  106. return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err)
  107. }
  108. res := &SyncGroupResponse{
  109. Throttle: makeDuration(r.ThrottleTimeMS),
  110. Error: makeError(r.ErrorCode, ""),
  111. ProtocolType: r.ProtocolType,
  112. ProtocolName: r.ProtocolName,
  113. Assignment: GroupProtocolAssignment{
  114. AssignedPartitions: make(map[string][]int, len(assignment.AssignedPartitions)),
  115. UserData: assignment.UserData,
  116. },
  117. }
  118. partitions := map[string][]int{}
  119. for _, topicPartition := range assignment.AssignedPartitions {
  120. for _, partition := range topicPartition.Partitions {
  121. partitions[topicPartition.Topic] = append(partitions[topicPartition.Topic], int(partition))
  122. }
  123. }
  124. res.Assignment.AssignedPartitions = partitions
  125. return res, nil
  126. }
  127. type groupAssignment struct {
  128. Version int16
  129. Topics map[string][]int32
  130. UserData []byte
  131. }
  132. func (t groupAssignment) size() int32 {
  133. sz := sizeofInt16(t.Version) + sizeofInt16(int16(len(t.Topics)))
  134. for topic, partitions := range t.Topics {
  135. sz += sizeofString(topic) + sizeofInt32Array(partitions)
  136. }
  137. return sz + sizeofBytes(t.UserData)
  138. }
  139. func (t groupAssignment) writeTo(wb *writeBuffer) {
  140. wb.writeInt16(t.Version)
  141. wb.writeInt32(int32(len(t.Topics)))
  142. for topic, partitions := range t.Topics {
  143. wb.writeString(topic)
  144. wb.writeInt32Array(partitions)
  145. }
  146. wb.writeBytes(t.UserData)
  147. }
  148. func (t *groupAssignment) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  149. // I came across this case when testing for compatibility with bsm/sarama-cluster. It
  150. // appears in some cases, sarama-cluster can send a nil array entry. Admittedly, I
  151. // didn't look too closely at it.
  152. if size == 0 {
  153. t.Topics = map[string][]int32{}
  154. return 0, nil
  155. }
  156. if remain, err = readInt16(r, size, &t.Version); err != nil {
  157. return
  158. }
  159. if remain, err = readMapStringInt32(r, remain, &t.Topics); err != nil {
  160. return
  161. }
  162. if remain, err = readBytes(r, remain, &t.UserData); err != nil {
  163. return
  164. }
  165. return
  166. }
  167. func (t groupAssignment) bytes() []byte {
  168. buf := bytes.NewBuffer(nil)
  169. t.writeTo(&writeBuffer{w: buf})
  170. return buf.Bytes()
  171. }
  172. type syncGroupRequestGroupAssignmentV0 struct {
  173. // MemberID assigned by the group coordinator
  174. MemberID string
  175. // MemberAssignments holds client encoded assignments
  176. //
  177. // See consumer groups section of https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
  178. MemberAssignments []byte
  179. }
  180. func (t syncGroupRequestGroupAssignmentV0) size() int32 {
  181. return sizeofString(t.MemberID) +
  182. sizeofBytes(t.MemberAssignments)
  183. }
  184. func (t syncGroupRequestGroupAssignmentV0) writeTo(wb *writeBuffer) {
  185. wb.writeString(t.MemberID)
  186. wb.writeBytes(t.MemberAssignments)
  187. }
  188. type syncGroupRequestV0 struct {
  189. // GroupID holds the unique group identifier
  190. GroupID string
  191. // GenerationID holds the generation of the group.
  192. GenerationID int32
  193. // MemberID assigned by the group coordinator
  194. MemberID string
  195. GroupAssignments []syncGroupRequestGroupAssignmentV0
  196. }
  197. func (t syncGroupRequestV0) size() int32 {
  198. return sizeofString(t.GroupID) +
  199. sizeofInt32(t.GenerationID) +
  200. sizeofString(t.MemberID) +
  201. sizeofArray(len(t.GroupAssignments), func(i int) int32 { return t.GroupAssignments[i].size() })
  202. }
  203. func (t syncGroupRequestV0) writeTo(wb *writeBuffer) {
  204. wb.writeString(t.GroupID)
  205. wb.writeInt32(t.GenerationID)
  206. wb.writeString(t.MemberID)
  207. wb.writeArray(len(t.GroupAssignments), func(i int) { t.GroupAssignments[i].writeTo(wb) })
  208. }
  209. type syncGroupResponseV0 struct {
  210. // ErrorCode holds response error code
  211. ErrorCode int16
  212. // MemberAssignments holds client encoded assignments
  213. //
  214. // See consumer groups section of https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
  215. MemberAssignments []byte
  216. }
  217. func (t syncGroupResponseV0) size() int32 {
  218. return sizeofInt16(t.ErrorCode) +
  219. sizeofBytes(t.MemberAssignments)
  220. }
  221. func (t syncGroupResponseV0) writeTo(wb *writeBuffer) {
  222. wb.writeInt16(t.ErrorCode)
  223. wb.writeBytes(t.MemberAssignments)
  224. }
  225. func (t *syncGroupResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
  226. if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil {
  227. return
  228. }
  229. if remain, err = readBytes(r, remain, &t.MemberAssignments); err != nil {
  230. return
  231. }
  232. return
  233. }