123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- package kafka
- import (
- "context"
- "fmt"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol/alterclientquotas"
- )
- // AlterClientQuotasRequest represents a request sent to a kafka broker to
- // alter client quotas.
- type AlterClientQuotasRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // List of client quotas entries to alter.
- Entries []AlterClientQuotaEntry
- // Whether the alteration should be validated, but not performed.
- ValidateOnly bool
- }
- type AlterClientQuotaEntry struct {
- // The quota entities to alter.
- Entities []AlterClientQuotaEntity
- // An individual quota configuration entry to alter.
- Ops []AlterClientQuotaOps
- }
- type AlterClientQuotaEntity struct {
- // The quota entity type.
- EntityType string
- // The name of the quota entity, or null if the default.
- EntityName string
- }
- type AlterClientQuotaOps struct {
- // The quota configuration key.
- Key string
- // The quota configuration value to set, otherwise ignored if the value is to be removed.
- Value float64
- // Whether the quota configuration value should be removed, otherwise set.
- Remove bool
- }
- type AlterClientQuotaResponseQuotas struct {
- // Error is set to a non-nil value including the code and message if a top-level
- // error was encountered when doing the update.
- Error error
- // The altered quota entities.
- Entities []AlterClientQuotaEntity
- }
- // AlterClientQuotasResponse represents a response from a kafka broker to an alter client
- // quotas request.
- type AlterClientQuotasResponse struct {
- // The amount of time that the broker throttled the request.
- Throttle time.Duration
- // List of altered client quotas responses.
- Entries []AlterClientQuotaResponseQuotas
- }
- // AlterClientQuotas sends client quotas alteration request to a kafka broker and returns
- // the response.
- func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) {
- entries := make([]alterclientquotas.Entry, len(req.Entries))
- for entryIdx, entry := range req.Entries {
- entities := make([]alterclientquotas.Entity, len(entry.Entities))
- for entityIdx, entity := range entry.Entities {
- entities[entityIdx] = alterclientquotas.Entity{
- EntityType: entity.EntityType,
- EntityName: entity.EntityName,
- }
- }
- ops := make([]alterclientquotas.Ops, len(entry.Ops))
- for opsIdx, op := range entry.Ops {
- ops[opsIdx] = alterclientquotas.Ops{
- Key: op.Key,
- Value: op.Value,
- Remove: op.Remove,
- }
- }
- entries[entryIdx] = alterclientquotas.Entry{
- Entities: entities,
- Ops: ops,
- }
- }
- m, err := c.roundTrip(ctx, req.Addr, &alterclientquotas.Request{
- Entries: entries,
- ValidateOnly: req.ValidateOnly,
- })
- if err != nil {
- return nil, fmt.Errorf("kafka.(*Client).AlterClientQuotas: %w", err)
- }
- res := m.(*alterclientquotas.Response)
- responseEntries := make([]AlterClientQuotaResponseQuotas, len(res.Results))
- for responseEntryIdx, responseEntry := range res.Results {
- responseEntities := make([]AlterClientQuotaEntity, len(responseEntry.Entities))
- for responseEntityIdx, responseEntity := range responseEntry.Entities {
- responseEntities[responseEntityIdx] = AlterClientQuotaEntity{
- EntityType: responseEntity.EntityType,
- EntityName: responseEntity.EntityName,
- }
- }
- responseEntries[responseEntryIdx] = AlterClientQuotaResponseQuotas{
- Error: makeError(responseEntry.ErrorCode, responseEntry.ErrorMessage),
- Entities: responseEntities,
- }
- }
- ret := &AlterClientQuotasResponse{
- Throttle: makeDuration(res.ThrottleTimeMs),
- Entries: responseEntries,
- }
- return ret, nil
- }
|