joingroup.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. package kafka
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "fmt"
  7. "net"
  8. "time"
  9. "github.com/segmentio/kafka-go/protocol"
  10. "github.com/segmentio/kafka-go/protocol/consumer"
  11. "github.com/segmentio/kafka-go/protocol/joingroup"
  12. )
  13. // JoinGroupRequest is the request structure for the JoinGroup function.
  14. type JoinGroupRequest struct {
  15. // Address of the kafka broker to send the request to.
  16. Addr net.Addr
  17. // GroupID of the group to join.
  18. GroupID string
  19. // The duration after which the coordinator considers the consumer dead
  20. // if it has not received a heartbeat.
  21. SessionTimeout time.Duration
  22. // The duration the coordination will wait for each member to rejoin when rebalancing the group.
  23. RebalanceTimeout time.Duration
  24. // The ID assigned by the group coordinator.
  25. MemberID string
  26. // The unique identifier for the consumer instance.
  27. GroupInstanceID string
  28. // The name for the class of protocols implemented by the group being joined.
  29. ProtocolType string
  30. // The list of protocols the member supports.
  31. Protocols []GroupProtocol
  32. }
  33. // GroupProtocol represents a consumer group protocol.
  34. type GroupProtocol struct {
  35. // The protocol name.
  36. Name string
  37. // The protocol metadata.
  38. Metadata GroupProtocolSubscription
  39. }
  40. type GroupProtocolSubscription struct {
  41. // The Topics to subscribe to.
  42. Topics []string
  43. // UserData assosiated with the subscription for the given protocol
  44. UserData []byte
  45. // Partitions owned by this consumer.
  46. OwnedPartitions map[string][]int
  47. }
  48. // JoinGroupResponse is the response structure for the JoinGroup function.
  49. type JoinGroupResponse struct {
  50. // An error that may have occurred when attempting to join the group.
  51. //
  52. // The errors contain the kafka error code. Programs may use the standard
  53. // errors.Is function to test the error against kafka error codes.
  54. Error error
  55. // The amount of time that the broker throttled the request.
  56. Throttle time.Duration
  57. // The generation ID of the group.
  58. GenerationID int
  59. // The group protocol selected by the coordinatior.
  60. ProtocolName string
  61. // The group protocol name.
  62. ProtocolType string
  63. // The leader of the group.
  64. LeaderID string
  65. // The group member ID.
  66. MemberID string
  67. // The members of the group.
  68. Members []JoinGroupResponseMember
  69. }
  70. // JoinGroupResponseMember represents a group memmber in a reponse to a JoinGroup request.
  71. type JoinGroupResponseMember struct {
  72. // The group memmber ID.
  73. ID string
  74. // The unique identifier of the consumer instance.
  75. GroupInstanceID string
  76. // The group member metadata.
  77. Metadata GroupProtocolSubscription
  78. }
  79. // JoinGroup sends a join group request to the coordinator and returns the response.
  80. func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGroupResponse, error) {
  81. joinGroup := joingroup.Request{
  82. GroupID: req.GroupID,
  83. SessionTimeoutMS: int32(req.SessionTimeout.Milliseconds()),
  84. RebalanceTimeoutMS: int32(req.RebalanceTimeout.Milliseconds()),
  85. MemberID: req.MemberID,
  86. GroupInstanceID: req.GroupInstanceID,
  87. ProtocolType: req.ProtocolType,
  88. Protocols: make([]joingroup.RequestProtocol, 0, len(req.Protocols)),
  89. }
  90. for _, proto := range req.Protocols {
  91. protoMeta := consumer.Subscription{
  92. Version: consumer.MaxVersionSupported,
  93. Topics: proto.Metadata.Topics,
  94. UserData: proto.Metadata.UserData,
  95. OwnedPartitions: make([]consumer.TopicPartition, 0, len(proto.Metadata.OwnedPartitions)),
  96. }
  97. for topic, partitions := range proto.Metadata.OwnedPartitions {
  98. tp := consumer.TopicPartition{
  99. Topic: topic,
  100. Partitions: make([]int32, 0, len(partitions)),
  101. }
  102. for _, partition := range partitions {
  103. tp.Partitions = append(tp.Partitions, int32(partition))
  104. }
  105. protoMeta.OwnedPartitions = append(protoMeta.OwnedPartitions, tp)
  106. }
  107. metaBytes, err := protocol.Marshal(consumer.MaxVersionSupported, protoMeta)
  108. if err != nil {
  109. return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
  110. }
  111. joinGroup.Protocols = append(joinGroup.Protocols, joingroup.RequestProtocol{
  112. Name: proto.Name,
  113. Metadata: metaBytes,
  114. })
  115. }
  116. m, err := c.roundTrip(ctx, req.Addr, &joinGroup)
  117. if err != nil {
  118. return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
  119. }
  120. r := m.(*joingroup.Response)
  121. res := &JoinGroupResponse{
  122. Error: makeError(r.ErrorCode, ""),
  123. Throttle: makeDuration(r.ThrottleTimeMS),
  124. GenerationID: int(r.GenerationID),
  125. ProtocolName: r.ProtocolName,
  126. ProtocolType: r.ProtocolType,
  127. LeaderID: r.LeaderID,
  128. MemberID: r.MemberID,
  129. Members: make([]JoinGroupResponseMember, 0, len(r.Members)),
  130. }
  131. for _, member := range r.Members {
  132. var meta consumer.Subscription
  133. err = protocol.Unmarshal(member.Metadata, consumer.MaxVersionSupported, &meta)
  134. if err != nil {
  135. return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
  136. }
  137. subscription := GroupProtocolSubscription{
  138. Topics: meta.Topics,
  139. UserData: meta.UserData,
  140. OwnedPartitions: make(map[string][]int, len(meta.OwnedPartitions)),
  141. }
  142. for _, owned := range meta.OwnedPartitions {
  143. subscription.OwnedPartitions[owned.Topic] = make([]int, 0, len(owned.Partitions))
  144. for _, partition := range owned.Partitions {
  145. subscription.OwnedPartitions[owned.Topic] = append(subscription.OwnedPartitions[owned.Topic], int(partition))
  146. }
  147. }
  148. res.Members = append(res.Members, JoinGroupResponseMember{
  149. ID: member.MemberID,
  150. GroupInstanceID: member.GroupInstanceID,
  151. Metadata: subscription,
  152. })
  153. }
  154. return res, nil
  155. }
  156. type groupMetadata struct {
  157. Version int16
  158. Topics []string
  159. UserData []byte
  160. }
  161. func (t groupMetadata) size() int32 {
  162. return sizeofInt16(t.Version) +
  163. sizeofStringArray(t.Topics) +
  164. sizeofBytes(t.UserData)
  165. }
  166. func (t groupMetadata) writeTo(wb *writeBuffer) {
  167. wb.writeInt16(t.Version)
  168. wb.writeStringArray(t.Topics)
  169. wb.writeBytes(t.UserData)
  170. }
  171. func (t groupMetadata) bytes() []byte {
  172. buf := bytes.NewBuffer(nil)
  173. t.writeTo(&writeBuffer{w: buf})
  174. return buf.Bytes()
  175. }
  176. func (t *groupMetadata) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  177. if remain, err = readInt16(r, size, &t.Version); err != nil {
  178. return
  179. }
  180. if remain, err = readStringArray(r, remain, &t.Topics); err != nil {
  181. return
  182. }
  183. if remain, err = readBytes(r, remain, &t.UserData); err != nil {
  184. return
  185. }
  186. return
  187. }
  188. type joinGroupRequestGroupProtocolV1 struct {
  189. ProtocolName string
  190. ProtocolMetadata []byte
  191. }
  192. func (t joinGroupRequestGroupProtocolV1) size() int32 {
  193. return sizeofString(t.ProtocolName) +
  194. sizeofBytes(t.ProtocolMetadata)
  195. }
  196. func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) {
  197. wb.writeString(t.ProtocolName)
  198. wb.writeBytes(t.ProtocolMetadata)
  199. }
  200. type joinGroupRequestV1 struct {
  201. // GroupID holds the unique group identifier
  202. GroupID string
  203. // SessionTimeout holds the coordinator considers the consumer dead if it
  204. // receives no heartbeat after this timeout in ms.
  205. SessionTimeout int32
  206. // RebalanceTimeout holds the maximum time that the coordinator will wait
  207. // for each member to rejoin when rebalancing the group in ms
  208. RebalanceTimeout int32
  209. // MemberID assigned by the group coordinator or the zero string if joining
  210. // for the first time.
  211. MemberID string
  212. // ProtocolType holds the unique name for class of protocols implemented by group
  213. ProtocolType string
  214. // GroupProtocols holds the list of protocols that the member supports
  215. GroupProtocols []joinGroupRequestGroupProtocolV1
  216. }
  217. func (t joinGroupRequestV1) size() int32 {
  218. return sizeofString(t.GroupID) +
  219. sizeofInt32(t.SessionTimeout) +
  220. sizeofInt32(t.RebalanceTimeout) +
  221. sizeofString(t.MemberID) +
  222. sizeofString(t.ProtocolType) +
  223. sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() })
  224. }
  225. func (t joinGroupRequestV1) writeTo(wb *writeBuffer) {
  226. wb.writeString(t.GroupID)
  227. wb.writeInt32(t.SessionTimeout)
  228. wb.writeInt32(t.RebalanceTimeout)
  229. wb.writeString(t.MemberID)
  230. wb.writeString(t.ProtocolType)
  231. wb.writeArray(len(t.GroupProtocols), func(i int) { t.GroupProtocols[i].writeTo(wb) })
  232. }
  233. type joinGroupResponseMemberV1 struct {
  234. // MemberID assigned by the group coordinator
  235. MemberID string
  236. MemberMetadata []byte
  237. }
  238. func (t joinGroupResponseMemberV1) size() int32 {
  239. return sizeofString(t.MemberID) +
  240. sizeofBytes(t.MemberMetadata)
  241. }
  242. func (t joinGroupResponseMemberV1) writeTo(wb *writeBuffer) {
  243. wb.writeString(t.MemberID)
  244. wb.writeBytes(t.MemberMetadata)
  245. }
  246. func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  247. if remain, err = readString(r, size, &t.MemberID); err != nil {
  248. return
  249. }
  250. if remain, err = readBytes(r, remain, &t.MemberMetadata); err != nil {
  251. return
  252. }
  253. return
  254. }
  255. type joinGroupResponseV1 struct {
  256. // ErrorCode holds response error code
  257. ErrorCode int16
  258. // GenerationID holds the generation of the group.
  259. GenerationID int32
  260. // GroupProtocol holds the group protocol selected by the coordinator
  261. GroupProtocol string
  262. // LeaderID holds the leader of the group
  263. LeaderID string
  264. // MemberID assigned by the group coordinator
  265. MemberID string
  266. Members []joinGroupResponseMemberV1
  267. }
  268. func (t joinGroupResponseV1) size() int32 {
  269. return sizeofInt16(t.ErrorCode) +
  270. sizeofInt32(t.GenerationID) +
  271. sizeofString(t.GroupProtocol) +
  272. sizeofString(t.LeaderID) +
  273. sizeofString(t.MemberID) +
  274. sizeofArray(len(t.MemberID), func(i int) int32 { return t.Members[i].size() })
  275. }
  276. func (t joinGroupResponseV1) writeTo(wb *writeBuffer) {
  277. wb.writeInt16(t.ErrorCode)
  278. wb.writeInt32(t.GenerationID)
  279. wb.writeString(t.GroupProtocol)
  280. wb.writeString(t.LeaderID)
  281. wb.writeString(t.MemberID)
  282. wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) })
  283. }
  284. func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  285. if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
  286. return
  287. }
  288. if remain, err = readInt32(r, remain, &t.GenerationID); err != nil {
  289. return
  290. }
  291. if remain, err = readString(r, remain, &t.GroupProtocol); err != nil {
  292. return
  293. }
  294. if remain, err = readString(r, remain, &t.LeaderID); err != nil {
  295. return
  296. }
  297. if remain, err = readString(r, remain, &t.MemberID); err != nil {
  298. return
  299. }
  300. fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
  301. var item joinGroupResponseMemberV1
  302. if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
  303. return
  304. }
  305. t.Members = append(t.Members, item)
  306. return
  307. }
  308. if remain, err = readArrayWith(r, remain, fn); err != nil {
  309. return
  310. }
  311. return
  312. }