listgroups.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package listgroups
  2. import (
  3. "github.com/segmentio/kafka-go/protocol"
  4. )
  5. func init() {
  6. protocol.Register(&Request{}, &Response{})
  7. }
  8. // Detailed API definition: https://kafka.apache.org/protocol#The_Messages_ListGroups
  9. type Request struct {
  10. _ struct{} `kafka:"min=v0,max=v2"`
  11. brokerID int32
  12. }
  13. func (r *Request) ApiKey() protocol.ApiKey { return protocol.ListGroups }
  14. func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
  15. return cluster.Brokers[r.brokerID], nil
  16. }
  17. func (r *Request) Split(cluster protocol.Cluster) (
  18. []protocol.Message,
  19. protocol.Merger,
  20. error,
  21. ) {
  22. messages := []protocol.Message{}
  23. for _, broker := range cluster.Brokers {
  24. messages = append(messages, &Request{brokerID: broker.ID})
  25. }
  26. return messages, new(Response), nil
  27. }
  28. type Response struct {
  29. ThrottleTimeMs int32 `kafka:"min=v1,max=v2"`
  30. ErrorCode int16 `kafka:"min=v0,max=v2"`
  31. Groups []ResponseGroup `kafka:"min=v0,max=v2"`
  32. }
  33. type ResponseGroup struct {
  34. GroupID string `kafka:"min=v0,max=v2"`
  35. ProtocolType string `kafka:"min=v0,max=v2"`
  36. // Use this to store which broker returned the response
  37. BrokerID int32 `kafka:"-"`
  38. }
  39. func (r *Response) ApiKey() protocol.ApiKey { return protocol.ListGroups }
  40. func (r *Response) Merge(requests []protocol.Message, results []interface{}) (
  41. protocol.Message,
  42. error,
  43. ) {
  44. response := &Response{}
  45. for r, result := range results {
  46. m, err := protocol.Result(result)
  47. if err != nil {
  48. return nil, err
  49. }
  50. brokerResp := m.(*Response)
  51. respGroups := []ResponseGroup{}
  52. for _, brokerResp := range brokerResp.Groups {
  53. respGroups = append(
  54. respGroups,
  55. ResponseGroup{
  56. GroupID: brokerResp.GroupID,
  57. ProtocolType: brokerResp.ProtocolType,
  58. BrokerID: requests[r].(*Request).brokerID,
  59. },
  60. )
  61. }
  62. response.Groups = append(response.Groups, respGroups...)
  63. }
  64. return response, nil
  65. }