alteruserscramcredentials.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/alteruserscramcredentials"
  8. )
  9. // AlterUserScramCredentialsRequest represents a request sent to a kafka broker to
  10. // alter user scram credentials.
  11. type AlterUserScramCredentialsRequest struct {
  12. // Address of the kafka broker to send the request to.
  13. Addr net.Addr
  14. // List of credentials to delete.
  15. Deletions []UserScramCredentialsDeletion
  16. // List of credentials to upsert.
  17. Upsertions []UserScramCredentialsUpsertion
  18. }
  19. type ScramMechanism int8
  20. const (
  21. ScramMechanismUnknown ScramMechanism = iota // 0
  22. ScramMechanismSha256 // 1
  23. ScramMechanismSha512 // 2
  24. )
  25. type UserScramCredentialsDeletion struct {
  26. Name string
  27. Mechanism ScramMechanism
  28. }
  29. type UserScramCredentialsUpsertion struct {
  30. Name string
  31. Mechanism ScramMechanism
  32. Iterations int
  33. Salt []byte
  34. SaltedPassword []byte
  35. }
  36. // AlterUserScramCredentialsResponse represents a response from a kafka broker to an alter user
  37. // credentials request.
  38. type AlterUserScramCredentialsResponse struct {
  39. // The amount of time that the broker throttled the request.
  40. Throttle time.Duration
  41. // List of altered user scram credentials.
  42. Results []AlterUserScramCredentialsResponseUser
  43. }
  44. type AlterUserScramCredentialsResponseUser struct {
  45. User string
  46. Error error
  47. }
  48. // AlterUserScramCredentials sends user scram credentials alteration request to a kafka broker and returns
  49. // the response.
  50. func (c *Client) AlterUserScramCredentials(ctx context.Context, req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) {
  51. deletions := make([]alteruserscramcredentials.RequestUserScramCredentialsDeletion, len(req.Deletions))
  52. upsertions := make([]alteruserscramcredentials.RequestUserScramCredentialsUpsertion, len(req.Upsertions))
  53. for deletionIdx, deletion := range req.Deletions {
  54. deletions[deletionIdx] = alteruserscramcredentials.RequestUserScramCredentialsDeletion{
  55. Name: deletion.Name,
  56. Mechanism: int8(deletion.Mechanism),
  57. }
  58. }
  59. for upsertionIdx, upsertion := range req.Upsertions {
  60. upsertions[upsertionIdx] = alteruserscramcredentials.RequestUserScramCredentialsUpsertion{
  61. Name: upsertion.Name,
  62. Mechanism: int8(upsertion.Mechanism),
  63. Iterations: int32(upsertion.Iterations),
  64. Salt: upsertion.Salt,
  65. SaltedPassword: upsertion.SaltedPassword,
  66. }
  67. }
  68. m, err := c.roundTrip(ctx, req.Addr, &alteruserscramcredentials.Request{
  69. Deletions: deletions,
  70. Upsertions: upsertions,
  71. })
  72. if err != nil {
  73. return nil, fmt.Errorf("kafka.(*Client).AlterUserScramCredentials: %w", err)
  74. }
  75. res := m.(*alteruserscramcredentials.Response)
  76. responseEntries := make([]AlterUserScramCredentialsResponseUser, len(res.Results))
  77. for responseIdx, responseResult := range res.Results {
  78. responseEntries[responseIdx] = AlterUserScramCredentialsResponseUser{
  79. User: responseResult.User,
  80. Error: makeError(responseResult.ErrorCode, responseResult.ErrorMessage),
  81. }
  82. }
  83. ret := &AlterUserScramCredentialsResponse{
  84. Throttle: makeDuration(res.ThrottleTimeMs),
  85. Results: responseEntries,
  86. }
  87. return ret, nil
  88. }