describegroups.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. package kafka
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "fmt"
  7. "net"
  8. "github.com/segmentio/kafka-go/protocol/describegroups"
  9. )
  10. // DescribeGroupsRequest is a request to the DescribeGroups API.
  11. type DescribeGroupsRequest struct {
  12. // Addr is the address of the kafka broker to send the request to.
  13. Addr net.Addr
  14. // GroupIDs is a slice of groups to get details for.
  15. GroupIDs []string
  16. }
  17. // DescribeGroupsResponse is a response from the DescribeGroups API.
  18. type DescribeGroupsResponse struct {
  19. // Groups is a slice of details for the requested groups.
  20. Groups []DescribeGroupsResponseGroup
  21. }
  22. // DescribeGroupsResponseGroup contains the response details for a single group.
  23. type DescribeGroupsResponseGroup struct {
  24. // Error is set to a non-nil value if there was an error fetching the details
  25. // for this group.
  26. Error error
  27. // GroupID is the ID of the group.
  28. GroupID string
  29. // GroupState is a description of the group state.
  30. GroupState string
  31. // Members contains details about each member of the group.
  32. Members []DescribeGroupsResponseMember
  33. }
  34. // MemberInfo represents the membership information for a single group member.
  35. type DescribeGroupsResponseMember struct {
  36. // MemberID is the ID of the group member.
  37. MemberID string
  38. // ClientID is the ID of the client that the group member is using.
  39. ClientID string
  40. // ClientHost is the host of the client that the group member is connecting from.
  41. ClientHost string
  42. // MemberMetadata contains metadata about this group member.
  43. MemberMetadata DescribeGroupsResponseMemberMetadata
  44. // MemberAssignments contains the topic partitions that this member is assigned to.
  45. MemberAssignments DescribeGroupsResponseAssignments
  46. }
  47. // GroupMemberMetadata stores metadata associated with a group member.
  48. type DescribeGroupsResponseMemberMetadata struct {
  49. // Version is the version of the metadata.
  50. Version int
  51. // Topics is the list of topics that the member is assigned to.
  52. Topics []string
  53. // UserData is the user data for the member.
  54. UserData []byte
  55. // OwnedPartitions contains the partitions owned by this group member; only set if
  56. // consumers are using a cooperative rebalancing assignor protocol.
  57. OwnedPartitions []DescribeGroupsResponseMemberMetadataOwnedPartition
  58. }
  59. type DescribeGroupsResponseMemberMetadataOwnedPartition struct {
  60. // Topic is the name of the topic.
  61. Topic string
  62. // Partitions is the partitions that are owned by the group in the topic.
  63. Partitions []int
  64. }
  65. // GroupMemberAssignmentsInfo stores the topic partition assignment data for a group member.
  66. type DescribeGroupsResponseAssignments struct {
  67. // Version is the version of the assignments data.
  68. Version int
  69. // Topics contains the details of the partition assignments for each topic.
  70. Topics []GroupMemberTopic
  71. // UserData is the user data for the member.
  72. UserData []byte
  73. }
  74. // GroupMemberTopic is a mapping from a topic to a list of partitions in the topic. It is used
  75. // to represent the topic partitions that have been assigned to a group member.
  76. type GroupMemberTopic struct {
  77. // Topic is the name of the topic.
  78. Topic string
  79. // Partitions is a slice of partition IDs that this member is assigned to in the topic.
  80. Partitions []int
  81. }
  82. // DescribeGroups calls the Kafka DescribeGroups API to get information about one or more
  83. // consumer groups. See https://kafka.apache.org/protocol#The_Messages_DescribeGroups for
  84. // more information.
  85. func (c *Client) DescribeGroups(
  86. ctx context.Context,
  87. req *DescribeGroupsRequest,
  88. ) (*DescribeGroupsResponse, error) {
  89. protoResp, err := c.roundTrip(
  90. ctx,
  91. req.Addr,
  92. &describegroups.Request{
  93. Groups: req.GroupIDs,
  94. },
  95. )
  96. if err != nil {
  97. return nil, err
  98. }
  99. apiResp := protoResp.(*describegroups.Response)
  100. resp := &DescribeGroupsResponse{}
  101. for _, apiGroup := range apiResp.Groups {
  102. group := DescribeGroupsResponseGroup{
  103. Error: makeError(apiGroup.ErrorCode, ""),
  104. GroupID: apiGroup.GroupID,
  105. GroupState: apiGroup.GroupState,
  106. }
  107. for _, member := range apiGroup.Members {
  108. decodedMetadata, err := decodeMemberMetadata(member.MemberMetadata)
  109. if err != nil {
  110. return nil, err
  111. }
  112. decodedAssignments, err := decodeMemberAssignments(member.MemberAssignment)
  113. if err != nil {
  114. return nil, err
  115. }
  116. group.Members = append(group.Members, DescribeGroupsResponseMember{
  117. MemberID: member.MemberID,
  118. ClientID: member.ClientID,
  119. ClientHost: member.ClientHost,
  120. MemberAssignments: decodedAssignments,
  121. MemberMetadata: decodedMetadata,
  122. })
  123. }
  124. resp.Groups = append(resp.Groups, group)
  125. }
  126. return resp, nil
  127. }
  128. // decodeMemberMetadata converts raw metadata bytes to a
  129. // DescribeGroupsResponseMemberMetadata struct.
  130. //
  131. // See https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java#L49
  132. // for protocol details.
  133. func decodeMemberMetadata(rawMetadata []byte) (DescribeGroupsResponseMemberMetadata, error) {
  134. mm := DescribeGroupsResponseMemberMetadata{}
  135. if len(rawMetadata) == 0 {
  136. return mm, nil
  137. }
  138. buf := bytes.NewBuffer(rawMetadata)
  139. bufReader := bufio.NewReader(buf)
  140. remain := len(rawMetadata)
  141. var err error
  142. var version16 int16
  143. if remain, err = readInt16(bufReader, remain, &version16); err != nil {
  144. return mm, err
  145. }
  146. mm.Version = int(version16)
  147. if remain, err = readStringArray(bufReader, remain, &mm.Topics); err != nil {
  148. return mm, err
  149. }
  150. if remain, err = readBytes(bufReader, remain, &mm.UserData); err != nil {
  151. return mm, err
  152. }
  153. if mm.Version == 1 && remain > 0 {
  154. fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
  155. op := DescribeGroupsResponseMemberMetadataOwnedPartition{}
  156. if fnRemain, fnErr = readString(r, size, &op.Topic); fnErr != nil {
  157. return
  158. }
  159. ps := []int32{}
  160. if fnRemain, fnErr = readInt32Array(r, fnRemain, &ps); fnErr != nil {
  161. return
  162. }
  163. for _, p := range ps {
  164. op.Partitions = append(op.Partitions, int(p))
  165. }
  166. mm.OwnedPartitions = append(mm.OwnedPartitions, op)
  167. return
  168. }
  169. if remain, err = readArrayWith(bufReader, remain, fn); err != nil {
  170. return mm, err
  171. }
  172. }
  173. if remain != 0 {
  174. return mm, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
  175. }
  176. return mm, nil
  177. }
  178. // decodeMemberAssignments converts raw assignment bytes to a DescribeGroupsResponseAssignments
  179. // struct.
  180. //
  181. // See https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java#L49
  182. // for protocol details.
  183. func decodeMemberAssignments(rawAssignments []byte) (DescribeGroupsResponseAssignments, error) {
  184. ma := DescribeGroupsResponseAssignments{}
  185. if len(rawAssignments) == 0 {
  186. return ma, nil
  187. }
  188. buf := bytes.NewBuffer(rawAssignments)
  189. bufReader := bufio.NewReader(buf)
  190. remain := len(rawAssignments)
  191. var err error
  192. var version16 int16
  193. if remain, err = readInt16(bufReader, remain, &version16); err != nil {
  194. return ma, err
  195. }
  196. ma.Version = int(version16)
  197. fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
  198. item := GroupMemberTopic{}
  199. if fnRemain, fnErr = readString(r, size, &item.Topic); fnErr != nil {
  200. return
  201. }
  202. partitions := []int32{}
  203. if fnRemain, fnErr = readInt32Array(r, fnRemain, &partitions); fnErr != nil {
  204. return
  205. }
  206. for _, partition := range partitions {
  207. item.Partitions = append(item.Partitions, int(partition))
  208. }
  209. ma.Topics = append(ma.Topics, item)
  210. return
  211. }
  212. if remain, err = readArrayWith(bufReader, remain, fn); err != nil {
  213. return ma, err
  214. }
  215. if remain, err = readBytes(bufReader, remain, &ma.UserData); err != nil {
  216. return ma, err
  217. }
  218. if remain != 0 {
  219. return ma, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
  220. }
  221. return ma, nil
  222. }
  223. // readInt32Array reads an array of int32s. It's adapted from the implementation of
  224. // readStringArray.
  225. func readInt32Array(r *bufio.Reader, sz int, v *[]int32) (remain int, err error) {
  226. var content []int32
  227. fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
  228. var value int32
  229. if fnRemain, fnErr = readInt32(r, size, &value); fnErr != nil {
  230. return
  231. }
  232. content = append(content, value)
  233. return
  234. }
  235. if remain, err = readArrayWith(r, sz, fn); err != nil {
  236. return
  237. }
  238. *v = content
  239. return
  240. }