describeuserscramcredentials.go 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/describeuserscramcredentials"
  8. )
  9. // DescribeUserScramCredentialsRequest represents a request sent to a kafka broker to
  10. // describe user scram credentials.
  11. type DescribeUserScramCredentialsRequest struct {
  12. // Address of the kafka broker to send the request to.
  13. Addr net.Addr
  14. // List of Scram users to describe
  15. Users []UserScramCredentialsUser
  16. }
  17. type UserScramCredentialsUser struct {
  18. Name string
  19. }
  20. // DescribeUserScramCredentialsResponse represents a response from a kafka broker to a describe user
  21. // credentials request.
  22. type DescribeUserScramCredentialsResponse struct {
  23. // The amount of time that the broker throttled the request.
  24. Throttle time.Duration
  25. // Top level error that occurred while attempting to describe
  26. // the user scram credentials.
  27. //
  28. // The errors contain the kafka error code. Programs may use the standard
  29. // errors.Is function to test the error against kafka error codes.
  30. Error error
  31. // List of described user scram credentials.
  32. Results []DescribeUserScramCredentialsResponseResult
  33. }
  34. type DescribeUserScramCredentialsResponseResult struct {
  35. User string
  36. CredentialInfos []DescribeUserScramCredentialsCredentialInfo
  37. Error error
  38. }
  39. type DescribeUserScramCredentialsCredentialInfo struct {
  40. Mechanism ScramMechanism
  41. Iterations int
  42. }
  43. // DescribeUserScramCredentials sends a user scram credentials describe request to a kafka broker and returns
  44. // the response.
  45. func (c *Client) DescribeUserScramCredentials(ctx context.Context, req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) {
  46. users := make([]describeuserscramcredentials.RequestUser, len(req.Users))
  47. for userIdx, user := range req.Users {
  48. users[userIdx] = describeuserscramcredentials.RequestUser{
  49. Name: user.Name,
  50. }
  51. }
  52. m, err := c.roundTrip(ctx, req.Addr, &describeuserscramcredentials.Request{
  53. Users: users,
  54. })
  55. if err != nil {
  56. return nil, fmt.Errorf("kafka.(*Client).DescribeUserScramCredentials: %w", err)
  57. }
  58. res := m.(*describeuserscramcredentials.Response)
  59. responseResults := make([]DescribeUserScramCredentialsResponseResult, len(res.Results))
  60. for responseIdx, responseResult := range res.Results {
  61. credentialInfos := make([]DescribeUserScramCredentialsCredentialInfo, len(responseResult.CredentialInfos))
  62. for credentialInfoIdx, credentialInfo := range responseResult.CredentialInfos {
  63. credentialInfos[credentialInfoIdx] = DescribeUserScramCredentialsCredentialInfo{
  64. Mechanism: ScramMechanism(credentialInfo.Mechanism),
  65. Iterations: int(credentialInfo.Iterations),
  66. }
  67. }
  68. responseResults[responseIdx] = DescribeUserScramCredentialsResponseResult{
  69. User: responseResult.User,
  70. CredentialInfos: credentialInfos,
  71. Error: makeError(responseResult.ErrorCode, responseResult.ErrorMessage),
  72. }
  73. }
  74. ret := &DescribeUserScramCredentialsResponse{
  75. Throttle: makeDuration(res.ThrottleTimeMs),
  76. Error: makeError(res.ErrorCode, res.ErrorMessage),
  77. Results: responseResults,
  78. }
  79. return ret, nil
  80. }