alterclientquotas.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/alterclientquotas"
  8. )
  9. // AlterClientQuotasRequest represents a request sent to a kafka broker to
  10. // alter client quotas.
  11. type AlterClientQuotasRequest struct {
  12. // Address of the kafka broker to send the request to.
  13. Addr net.Addr
  14. // List of client quotas entries to alter.
  15. Entries []AlterClientQuotaEntry
  16. // Whether the alteration should be validated, but not performed.
  17. ValidateOnly bool
  18. }
  19. type AlterClientQuotaEntry struct {
  20. // The quota entities to alter.
  21. Entities []AlterClientQuotaEntity
  22. // An individual quota configuration entry to alter.
  23. Ops []AlterClientQuotaOps
  24. }
  25. type AlterClientQuotaEntity struct {
  26. // The quota entity type.
  27. EntityType string
  28. // The name of the quota entity, or null if the default.
  29. EntityName string
  30. }
  31. type AlterClientQuotaOps struct {
  32. // The quota configuration key.
  33. Key string
  34. // The quota configuration value to set, otherwise ignored if the value is to be removed.
  35. Value float64
  36. // Whether the quota configuration value should be removed, otherwise set.
  37. Remove bool
  38. }
  39. type AlterClientQuotaResponseQuotas struct {
  40. // Error is set to a non-nil value including the code and message if a top-level
  41. // error was encountered when doing the update.
  42. Error error
  43. // The altered quota entities.
  44. Entities []AlterClientQuotaEntity
  45. }
  46. // AlterClientQuotasResponse represents a response from a kafka broker to an alter client
  47. // quotas request.
  48. type AlterClientQuotasResponse struct {
  49. // The amount of time that the broker throttled the request.
  50. Throttle time.Duration
  51. // List of altered client quotas responses.
  52. Entries []AlterClientQuotaResponseQuotas
  53. }
  54. // AlterClientQuotas sends client quotas alteration request to a kafka broker and returns
  55. // the response.
  56. func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) {
  57. entries := make([]alterclientquotas.Entry, len(req.Entries))
  58. for entryIdx, entry := range req.Entries {
  59. entities := make([]alterclientquotas.Entity, len(entry.Entities))
  60. for entityIdx, entity := range entry.Entities {
  61. entities[entityIdx] = alterclientquotas.Entity{
  62. EntityType: entity.EntityType,
  63. EntityName: entity.EntityName,
  64. }
  65. }
  66. ops := make([]alterclientquotas.Ops, len(entry.Ops))
  67. for opsIdx, op := range entry.Ops {
  68. ops[opsIdx] = alterclientquotas.Ops{
  69. Key: op.Key,
  70. Value: op.Value,
  71. Remove: op.Remove,
  72. }
  73. }
  74. entries[entryIdx] = alterclientquotas.Entry{
  75. Entities: entities,
  76. Ops: ops,
  77. }
  78. }
  79. m, err := c.roundTrip(ctx, req.Addr, &alterclientquotas.Request{
  80. Entries: entries,
  81. ValidateOnly: req.ValidateOnly,
  82. })
  83. if err != nil {
  84. return nil, fmt.Errorf("kafka.(*Client).AlterClientQuotas: %w", err)
  85. }
  86. res := m.(*alterclientquotas.Response)
  87. responseEntries := make([]AlterClientQuotaResponseQuotas, len(res.Results))
  88. for responseEntryIdx, responseEntry := range res.Results {
  89. responseEntities := make([]AlterClientQuotaEntity, len(responseEntry.Entities))
  90. for responseEntityIdx, responseEntity := range responseEntry.Entities {
  91. responseEntities[responseEntityIdx] = AlterClientQuotaEntity{
  92. EntityType: responseEntity.EntityType,
  93. EntityName: responseEntity.EntityName,
  94. }
  95. }
  96. responseEntries[responseEntryIdx] = AlterClientQuotaResponseQuotas{
  97. Error: makeError(responseEntry.ErrorCode, responseEntry.ErrorMessage),
  98. Entities: responseEntities,
  99. }
  100. }
  101. ret := &AlterClientQuotasResponse{
  102. Throttle: makeDuration(res.ThrottleTimeMs),
  103. Entries: responseEntries,
  104. }
  105. return ret, nil
  106. }