describeconfigs.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package describeconfigs
  2. import (
  3. "strconv"
  4. "github.com/segmentio/kafka-go/protocol"
  5. )
  6. const (
  7. resourceTypeBroker int8 = 4
  8. )
  9. func init() {
  10. protocol.Register(&Request{}, &Response{})
  11. }
  12. // Detailed API definition: https://kafka.apache.org/protocol#The_Messages_DescribeConfigs
  13. type Request struct {
  14. Resources []RequestResource `kafka:"min=v0,max=v3"`
  15. IncludeSynonyms bool `kafka:"min=v1,max=v3"`
  16. IncludeDocumentation bool `kafka:"min=v3,max=v3"`
  17. }
  18. func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeConfigs }
  19. func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
  20. // Broker metadata requests must be sent to the associated broker
  21. for _, resource := range r.Resources {
  22. if resource.ResourceType == resourceTypeBroker {
  23. brokerID, err := strconv.Atoi(resource.ResourceName)
  24. if err != nil {
  25. return protocol.Broker{}, err
  26. }
  27. return cluster.Brokers[int32(brokerID)], nil
  28. }
  29. }
  30. return cluster.Brokers[cluster.Controller], nil
  31. }
  32. func (r *Request) Split(cluster protocol.Cluster) (
  33. []protocol.Message,
  34. protocol.Merger,
  35. error,
  36. ) {
  37. messages := []protocol.Message{}
  38. topicsMessage := Request{}
  39. for _, resource := range r.Resources {
  40. // Split out broker requests to separate brokers
  41. if resource.ResourceType == resourceTypeBroker {
  42. messages = append(messages, &Request{
  43. Resources: []RequestResource{resource},
  44. })
  45. } else {
  46. topicsMessage.Resources = append(
  47. topicsMessage.Resources, resource,
  48. )
  49. }
  50. }
  51. if len(topicsMessage.Resources) > 0 {
  52. messages = append(messages, &topicsMessage)
  53. }
  54. return messages, new(Response), nil
  55. }
  56. type RequestResource struct {
  57. ResourceType int8 `kafka:"min=v0,max=v3"`
  58. ResourceName string `kafka:"min=v0,max=v3"`
  59. ConfigNames []string `kafka:"min=v0,max=v3,nullable"`
  60. }
  61. type Response struct {
  62. ThrottleTimeMs int32 `kafka:"min=v0,max=v3"`
  63. Resources []ResponseResource `kafka:"min=v0,max=v3"`
  64. }
  65. func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeConfigs }
  66. func (r *Response) Merge(requests []protocol.Message, results []interface{}) (
  67. protocol.Message,
  68. error,
  69. ) {
  70. response := &Response{}
  71. for _, result := range results {
  72. m, err := protocol.Result(result)
  73. if err != nil {
  74. return nil, err
  75. }
  76. response.Resources = append(
  77. response.Resources,
  78. m.(*Response).Resources...,
  79. )
  80. }
  81. return response, nil
  82. }
  83. type ResponseResource struct {
  84. ErrorCode int16 `kafka:"min=v0,max=v3"`
  85. ErrorMessage string `kafka:"min=v0,max=v3,nullable"`
  86. ResourceType int8 `kafka:"min=v0,max=v3"`
  87. ResourceName string `kafka:"min=v0,max=v3"`
  88. ConfigEntries []ResponseConfigEntry `kafka:"min=v0,max=v3"`
  89. }
  90. type ResponseConfigEntry struct {
  91. ConfigName string `kafka:"min=v0,max=v3"`
  92. ConfigValue string `kafka:"min=v0,max=v3,nullable"`
  93. ReadOnly bool `kafka:"min=v0,max=v3"`
  94. IsDefault bool `kafka:"min=v0,max=v0"`
  95. ConfigSource int8 `kafka:"min=v1,max=v3"`
  96. IsSensitive bool `kafka:"min=v0,max=v3"`
  97. ConfigSynonyms []ResponseConfigSynonym `kafka:"min=v1,max=v3"`
  98. ConfigType int8 `kafka:"min=v3,max=v3"`
  99. ConfigDocumentation string `kafka:"min=v3,max=v3,nullable"`
  100. }
  101. type ResponseConfigSynonym struct {
  102. ConfigName string `kafka:"min=v1,max=v3"`
  103. ConfigValue string `kafka:"min=v1,max=v3,nullable"`
  104. ConfigSource int8 `kafka:"min=v1,max=v3"`
  105. }
  106. var _ protocol.BrokerMessage = (*Request)(nil)