123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- package listgroups
- import (
- "github.com/segmentio/kafka-go/protocol"
- )
- func init() {
- protocol.Register(&Request{}, &Response{})
- }
- // Detailed API definition: https://kafka.apache.org/protocol#The_Messages_ListGroups
- type Request struct {
- _ struct{} `kafka:"min=v0,max=v2"`
- brokerID int32
- }
- func (r *Request) ApiKey() protocol.ApiKey { return protocol.ListGroups }
- func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
- return cluster.Brokers[r.brokerID], nil
- }
- func (r *Request) Split(cluster protocol.Cluster) (
- []protocol.Message,
- protocol.Merger,
- error,
- ) {
- messages := []protocol.Message{}
- for _, broker := range cluster.Brokers {
- messages = append(messages, &Request{brokerID: broker.ID})
- }
- return messages, new(Response), nil
- }
- type Response struct {
- ThrottleTimeMs int32 `kafka:"min=v1,max=v2"`
- ErrorCode int16 `kafka:"min=v0,max=v2"`
- Groups []ResponseGroup `kafka:"min=v0,max=v2"`
- }
- type ResponseGroup struct {
- GroupID string `kafka:"min=v0,max=v2"`
- ProtocolType string `kafka:"min=v0,max=v2"`
- // Use this to store which broker returned the response
- BrokerID int32 `kafka:"-"`
- }
- func (r *Response) ApiKey() protocol.ApiKey { return protocol.ListGroups }
- func (r *Response) Merge(requests []protocol.Message, results []interface{}) (
- protocol.Message,
- error,
- ) {
- response := &Response{}
- for r, result := range results {
- brokerResp := result.(*Response)
- respGroups := []ResponseGroup{}
- for _, brokerResp := range brokerResp.Groups {
- respGroups = append(
- respGroups,
- ResponseGroup{
- GroupID: brokerResp.GroupID,
- ProtocolType: brokerResp.ProtocolType,
- BrokerID: requests[r].(*Request).brokerID,
- },
- )
- }
- response.Groups = append(response.Groups, brokerResp.Groups...)
- }
- return response, nil
- }
|