listgroups.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package listgroups
  2. import (
  3. "github.com/segmentio/kafka-go/protocol"
  4. )
  5. func init() {
  6. protocol.Register(&Request{}, &Response{})
  7. }
  8. // Detailed API definition: https://kafka.apache.org/protocol#The_Messages_ListGroups
  9. type Request struct {
  10. _ struct{} `kafka:"min=v0,max=v2"`
  11. brokerID int32
  12. }
  13. func (r *Request) ApiKey() protocol.ApiKey { return protocol.ListGroups }
  14. func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
  15. return cluster.Brokers[r.brokerID], nil
  16. }
  17. func (r *Request) Split(cluster protocol.Cluster) (
  18. []protocol.Message,
  19. protocol.Merger,
  20. error,
  21. ) {
  22. messages := []protocol.Message{}
  23. for _, broker := range cluster.Brokers {
  24. messages = append(messages, &Request{brokerID: broker.ID})
  25. }
  26. return messages, new(Response), nil
  27. }
  28. type Response struct {
  29. ThrottleTimeMs int32 `kafka:"min=v1,max=v2"`
  30. ErrorCode int16 `kafka:"min=v0,max=v2"`
  31. Groups []ResponseGroup `kafka:"min=v0,max=v2"`
  32. }
  33. type ResponseGroup struct {
  34. GroupID string `kafka:"min=v0,max=v2"`
  35. ProtocolType string `kafka:"min=v0,max=v2"`
  36. // Use this to store which broker returned the response
  37. BrokerID int32 `kafka:"-"`
  38. }
  39. func (r *Response) ApiKey() protocol.ApiKey { return protocol.ListGroups }
  40. func (r *Response) Merge(requests []protocol.Message, results []interface{}) (
  41. protocol.Message,
  42. error,
  43. ) {
  44. response := &Response{}
  45. for r, result := range results {
  46. brokerResp := result.(*Response)
  47. respGroups := []ResponseGroup{}
  48. for _, brokerResp := range brokerResp.Groups {
  49. respGroups = append(
  50. respGroups,
  51. ResponseGroup{
  52. GroupID: brokerResp.GroupID,
  53. ProtocolType: brokerResp.ProtocolType,
  54. BrokerID: requests[r].(*Request).brokerID,
  55. },
  56. )
  57. }
  58. response.Groups = append(response.Groups, brokerResp.Groups...)
  59. }
  60. return response, nil
  61. }