123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- package kafka
- import (
- "bufio"
- "bytes"
- "context"
- "fmt"
- "net"
- "github.com/segmentio/kafka-go/protocol/describegroups"
- )
- // DescribeGroupsRequest is a request to the DescribeGroups API.
- type DescribeGroupsRequest struct {
- // Addr is the address of the kafka broker to send the request to.
- Addr net.Addr
- // GroupIDs is a slice of groups to get details for.
- GroupIDs []string
- }
- // DescribeGroupsResponse is a response from the DescribeGroups API.
- type DescribeGroupsResponse struct {
- // Groups is a slice of details for the requested groups.
- Groups []DescribeGroupsResponseGroup
- }
- // DescribeGroupsResponseGroup contains the response details for a single group.
- type DescribeGroupsResponseGroup struct {
- // Error is set to a non-nil value if there was an error fetching the details
- // for this group.
- Error error
- // GroupID is the ID of the group.
- GroupID string
- // GroupState is a description of the group state.
- GroupState string
- // Members contains details about each member of the group.
- Members []DescribeGroupsResponseMember
- }
- // MemberInfo represents the membership information for a single group member.
- type DescribeGroupsResponseMember struct {
- // MemberID is the ID of the group member.
- MemberID string
- // ClientID is the ID of the client that the group member is using.
- ClientID string
- // ClientHost is the host of the client that the group member is connecting from.
- ClientHost string
- // MemberMetadata contains metadata about this group member.
- MemberMetadata DescribeGroupsResponseMemberMetadata
- // MemberAssignments contains the topic partitions that this member is assigned to.
- MemberAssignments DescribeGroupsResponseAssignments
- }
- // GroupMemberMetadata stores metadata associated with a group member.
- type DescribeGroupsResponseMemberMetadata struct {
- // Version is the version of the metadata.
- Version int
- // Topics is the list of topics that the member is assigned to.
- Topics []string
- // UserData is the user data for the member.
- UserData []byte
- // OwnedPartitions contains the partitions owned by this group member; only set if
- // consumers are using a cooperative rebalancing assignor protocol.
- OwnedPartitions []DescribeGroupsResponseMemberMetadataOwnedPartition
- }
- type DescribeGroupsResponseMemberMetadataOwnedPartition struct {
- // Topic is the name of the topic.
- Topic string
- // Partitions is the partitions that are owned by the group in the topic.
- Partitions []int
- }
- // GroupMemberAssignmentsInfo stores the topic partition assignment data for a group member.
- type DescribeGroupsResponseAssignments struct {
- // Version is the version of the assignments data.
- Version int
- // Topics contains the details of the partition assignments for each topic.
- Topics []GroupMemberTopic
- // UserData is the user data for the member.
- UserData []byte
- }
- // GroupMemberTopic is a mapping from a topic to a list of partitions in the topic. It is used
- // to represent the topic partitions that have been assigned to a group member.
- type GroupMemberTopic struct {
- // Topic is the name of the topic.
- Topic string
- // Partitions is a slice of partition IDs that this member is assigned to in the topic.
- Partitions []int
- }
- // DescribeGroups calls the Kafka DescribeGroups API to get information about one or more
- // consumer groups. See https://kafka.apache.org/protocol#The_Messages_DescribeGroups for
- // more information.
- func (c *Client) DescribeGroups(
- ctx context.Context,
- req *DescribeGroupsRequest,
- ) (*DescribeGroupsResponse, error) {
- protoResp, err := c.roundTrip(
- ctx,
- req.Addr,
- &describegroups.Request{
- Groups: req.GroupIDs,
- },
- )
- if err != nil {
- return nil, err
- }
- apiResp := protoResp.(*describegroups.Response)
- resp := &DescribeGroupsResponse{}
- for _, apiGroup := range apiResp.Groups {
- group := DescribeGroupsResponseGroup{
- Error: makeError(apiGroup.ErrorCode, ""),
- GroupID: apiGroup.GroupID,
- GroupState: apiGroup.GroupState,
- }
- for _, member := range apiGroup.Members {
- decodedMetadata, err := decodeMemberMetadata(member.MemberMetadata)
- if err != nil {
- return nil, err
- }
- decodedAssignments, err := decodeMemberAssignments(member.MemberAssignment)
- if err != nil {
- return nil, err
- }
- group.Members = append(group.Members, DescribeGroupsResponseMember{
- MemberID: member.MemberID,
- ClientID: member.ClientID,
- ClientHost: member.ClientHost,
- MemberAssignments: decodedAssignments,
- MemberMetadata: decodedMetadata,
- })
- }
- resp.Groups = append(resp.Groups, group)
- }
- return resp, nil
- }
- // decodeMemberMetadata converts raw metadata bytes to a
- // DescribeGroupsResponseMemberMetadata struct.
- //
- // See https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java#L49
- // for protocol details.
- func decodeMemberMetadata(rawMetadata []byte) (DescribeGroupsResponseMemberMetadata, error) {
- mm := DescribeGroupsResponseMemberMetadata{}
- if len(rawMetadata) == 0 {
- return mm, nil
- }
- buf := bytes.NewBuffer(rawMetadata)
- bufReader := bufio.NewReader(buf)
- remain := len(rawMetadata)
- var err error
- var version16 int16
- if remain, err = readInt16(bufReader, remain, &version16); err != nil {
- return mm, err
- }
- mm.Version = int(version16)
- if remain, err = readStringArray(bufReader, remain, &mm.Topics); err != nil {
- return mm, err
- }
- if remain, err = readBytes(bufReader, remain, &mm.UserData); err != nil {
- return mm, err
- }
- if mm.Version == 1 && remain > 0 {
- fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
- op := DescribeGroupsResponseMemberMetadataOwnedPartition{}
- if fnRemain, fnErr = readString(r, size, &op.Topic); fnErr != nil {
- return
- }
- ps := []int32{}
- if fnRemain, fnErr = readInt32Array(r, fnRemain, &ps); fnErr != nil {
- return
- }
- for _, p := range ps {
- op.Partitions = append(op.Partitions, int(p))
- }
- mm.OwnedPartitions = append(mm.OwnedPartitions, op)
- return
- }
- if remain, err = readArrayWith(bufReader, remain, fn); err != nil {
- return mm, err
- }
- }
- if remain != 0 {
- return mm, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
- }
- return mm, nil
- }
- // decodeMemberAssignments converts raw assignment bytes to a DescribeGroupsResponseAssignments
- // struct.
- //
- // See https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java#L49
- // for protocol details.
- func decodeMemberAssignments(rawAssignments []byte) (DescribeGroupsResponseAssignments, error) {
- ma := DescribeGroupsResponseAssignments{}
- if len(rawAssignments) == 0 {
- return ma, nil
- }
- buf := bytes.NewBuffer(rawAssignments)
- bufReader := bufio.NewReader(buf)
- remain := len(rawAssignments)
- var err error
- var version16 int16
- if remain, err = readInt16(bufReader, remain, &version16); err != nil {
- return ma, err
- }
- ma.Version = int(version16)
- fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
- item := GroupMemberTopic{}
- if fnRemain, fnErr = readString(r, size, &item.Topic); fnErr != nil {
- return
- }
- partitions := []int32{}
- if fnRemain, fnErr = readInt32Array(r, fnRemain, &partitions); fnErr != nil {
- return
- }
- for _, partition := range partitions {
- item.Partitions = append(item.Partitions, int(partition))
- }
- ma.Topics = append(ma.Topics, item)
- return
- }
- if remain, err = readArrayWith(bufReader, remain, fn); err != nil {
- return ma, err
- }
- if remain, err = readBytes(bufReader, remain, &ma.UserData); err != nil {
- return ma, err
- }
- if remain != 0 {
- return ma, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
- }
- return ma, nil
- }
- // readInt32Array reads an array of int32s. It's adapted from the implementation of
- // readStringArray.
- func readInt32Array(r *bufio.Reader, sz int, v *[]int32) (remain int, err error) {
- var content []int32
- fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
- var value int32
- if fnRemain, fnErr = readInt32(r, size, &value); fnErr != nil {
- return
- }
- content = append(content, value)
- return
- }
- if remain, err = readArrayWith(r, sz, fn); err != nil {
- return
- }
- *v = content
- return
- }
|