describeconfigs.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/describeconfigs"
  8. )
  9. // DescribeConfigsRequest represents a request sent to a kafka broker to describe configs.
  10. type DescribeConfigsRequest struct {
  11. // Address of the kafka broker to send the request to.
  12. Addr net.Addr
  13. // List of resources to get details for.
  14. Resources []DescribeConfigRequestResource
  15. // Ignored if API version is less than v1
  16. IncludeSynonyms bool
  17. // Ignored if API version is less than v3
  18. IncludeDocumentation bool
  19. }
  20. type DescribeConfigRequestResource struct {
  21. // Resource Type
  22. ResourceType ResourceType
  23. // Resource Name
  24. ResourceName string
  25. // ConfigNames is a list of configurations to update.
  26. ConfigNames []string
  27. }
  28. // DescribeConfigsResponse represents a response from a kafka broker to a describe config request.
  29. type DescribeConfigsResponse struct {
  30. // The amount of time that the broker throttled the request.
  31. Throttle time.Duration
  32. // Resources
  33. Resources []DescribeConfigResponseResource
  34. }
  35. // DescribeConfigResponseResource.
  36. type DescribeConfigResponseResource struct {
  37. // Resource Type
  38. ResourceType int8
  39. // Resource Name
  40. ResourceName string
  41. // Error
  42. Error error
  43. // ConfigEntries
  44. ConfigEntries []DescribeConfigResponseConfigEntry
  45. }
  46. // DescribeConfigResponseConfigEntry.
  47. type DescribeConfigResponseConfigEntry struct {
  48. ConfigName string
  49. ConfigValue string
  50. ReadOnly bool
  51. // Ignored if API version is greater than v0
  52. IsDefault bool
  53. // Ignored if API version is less than v1
  54. ConfigSource int8
  55. IsSensitive bool
  56. // Ignored if API version is less than v1
  57. ConfigSynonyms []DescribeConfigResponseConfigSynonym
  58. // Ignored if API version is less than v3
  59. ConfigType int8
  60. // Ignored if API version is less than v3
  61. ConfigDocumentation string
  62. }
  63. // DescribeConfigResponseConfigSynonym.
  64. type DescribeConfigResponseConfigSynonym struct {
  65. // Ignored if API version is less than v1
  66. ConfigName string
  67. // Ignored if API version is less than v1
  68. ConfigValue string
  69. // Ignored if API version is less than v1
  70. ConfigSource int8
  71. }
  72. // DescribeConfigs sends a config altering request to a kafka broker and returns the
  73. // response.
  74. func (c *Client) DescribeConfigs(ctx context.Context, req *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  75. resources := make([]describeconfigs.RequestResource, len(req.Resources))
  76. for i, t := range req.Resources {
  77. resources[i] = describeconfigs.RequestResource{
  78. ResourceType: int8(t.ResourceType),
  79. ResourceName: t.ResourceName,
  80. ConfigNames: t.ConfigNames,
  81. }
  82. }
  83. m, err := c.roundTrip(ctx, req.Addr, &describeconfigs.Request{
  84. Resources: resources,
  85. IncludeSynonyms: req.IncludeSynonyms,
  86. IncludeDocumentation: req.IncludeDocumentation,
  87. })
  88. if err != nil {
  89. return nil, fmt.Errorf("kafka.(*Client).DescribeConfigs: %w", err)
  90. }
  91. res := m.(*describeconfigs.Response)
  92. ret := &DescribeConfigsResponse{
  93. Throttle: makeDuration(res.ThrottleTimeMs),
  94. Resources: make([]DescribeConfigResponseResource, len(res.Resources)),
  95. }
  96. for i, t := range res.Resources {
  97. configEntries := make([]DescribeConfigResponseConfigEntry, len(t.ConfigEntries))
  98. for j, v := range t.ConfigEntries {
  99. configSynonyms := make([]DescribeConfigResponseConfigSynonym, len(v.ConfigSynonyms))
  100. for k, cs := range v.ConfigSynonyms {
  101. configSynonyms[k] = DescribeConfigResponseConfigSynonym{
  102. ConfigName: cs.ConfigName,
  103. ConfigValue: cs.ConfigValue,
  104. ConfigSource: cs.ConfigSource,
  105. }
  106. }
  107. configEntries[j] = DescribeConfigResponseConfigEntry{
  108. ConfigName: v.ConfigName,
  109. ConfigValue: v.ConfigValue,
  110. ReadOnly: v.ReadOnly,
  111. ConfigSource: v.ConfigSource,
  112. IsDefault: v.IsDefault,
  113. IsSensitive: v.IsSensitive,
  114. ConfigSynonyms: configSynonyms,
  115. ConfigType: v.ConfigType,
  116. ConfigDocumentation: v.ConfigDocumentation,
  117. }
  118. }
  119. ret.Resources[i] = DescribeConfigResponseResource{
  120. ResourceType: t.ResourceType,
  121. ResourceName: t.ResourceName,
  122. Error: makeError(t.ErrorCode, t.ErrorMessage),
  123. ConfigEntries: configEntries,
  124. }
  125. }
  126. return ret, nil
  127. }