alterconfigs.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/alterconfigs"
  8. )
  9. // AlterConfigsRequest represents a request sent to a kafka broker to alter configs.
  10. type AlterConfigsRequest struct {
  11. // Address of the kafka broker to send the request to.
  12. Addr net.Addr
  13. // List of resources to update.
  14. Resources []AlterConfigRequestResource
  15. // When set to true, topics are not created but the configuration is
  16. // validated as if they were.
  17. ValidateOnly bool
  18. }
  19. type AlterConfigRequestResource struct {
  20. // Resource Type
  21. ResourceType ResourceType
  22. // Resource Name
  23. ResourceName string
  24. // Configs is a list of configuration updates.
  25. Configs []AlterConfigRequestConfig
  26. }
  27. type AlterConfigRequestConfig struct {
  28. // Configuration key name
  29. Name string
  30. // The value to set for the configuration key.
  31. Value string
  32. }
  33. // AlterConfigsResponse represents a response from a kafka broker to an alter config request.
  34. type AlterConfigsResponse struct {
  35. // Duration for which the request was throttled due to a quota violation.
  36. Throttle time.Duration
  37. // Mapping of topic names to errors that occurred while attempting to create
  38. // the topics.
  39. //
  40. // The errors contain the kafka error code. Programs may use the standard
  41. // errors.Is function to test the error against kafka error codes.
  42. Errors map[AlterConfigsResponseResource]error
  43. }
  44. // AlterConfigsResponseResource helps map errors to specific resources in an
  45. // alter config response.
  46. type AlterConfigsResponseResource struct {
  47. Type int8
  48. Name string
  49. }
  50. // AlterConfigs sends a config altering request to a kafka broker and returns the
  51. // response.
  52. func (c *Client) AlterConfigs(ctx context.Context, req *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  53. resources := make([]alterconfigs.RequestResources, len(req.Resources))
  54. for i, t := range req.Resources {
  55. configs := make([]alterconfigs.RequestConfig, len(t.Configs))
  56. for j, v := range t.Configs {
  57. configs[j] = alterconfigs.RequestConfig{
  58. Name: v.Name,
  59. Value: v.Value,
  60. }
  61. }
  62. resources[i] = alterconfigs.RequestResources{
  63. ResourceType: int8(t.ResourceType),
  64. ResourceName: t.ResourceName,
  65. Configs: configs,
  66. }
  67. }
  68. m, err := c.roundTrip(ctx, req.Addr, &alterconfigs.Request{
  69. Resources: resources,
  70. ValidateOnly: req.ValidateOnly,
  71. })
  72. if err != nil {
  73. return nil, fmt.Errorf("kafka.(*Client).AlterConfigs: %w", err)
  74. }
  75. res := m.(*alterconfigs.Response)
  76. ret := &AlterConfigsResponse{
  77. Throttle: makeDuration(res.ThrottleTimeMs),
  78. Errors: make(map[AlterConfigsResponseResource]error, len(res.Responses)),
  79. }
  80. for _, t := range res.Responses {
  81. ret.Errors[AlterConfigsResponseResource{
  82. Type: t.ResourceType,
  83. Name: t.ResourceName,
  84. }] = makeError(t.ErrorCode, t.ErrorMessage)
  85. }
  86. return ret, nil
  87. }