describeacls.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/describeacls"
  8. )
  9. // DescribeACLsRequest represents a request sent to a kafka broker to describe
  10. // existing ACLs.
  11. type DescribeACLsRequest struct {
  12. // Address of the kafka broker to send the request to.
  13. Addr net.Addr
  14. // Filter to filter ACLs on.
  15. Filter ACLFilter
  16. }
  17. type ACLFilter struct {
  18. ResourceTypeFilter ResourceType
  19. ResourceNameFilter string
  20. // ResourcePatternTypeFilter was added in v1 and is not available prior to that.
  21. ResourcePatternTypeFilter PatternType
  22. PrincipalFilter string
  23. HostFilter string
  24. Operation ACLOperationType
  25. PermissionType ACLPermissionType
  26. }
  27. // DescribeACLsResponse represents a response from a kafka broker to an ACL
  28. // describe request.
  29. type DescribeACLsResponse struct {
  30. // The amount of time that the broker throttled the request.
  31. Throttle time.Duration
  32. // Error that occurred while attempting to describe
  33. // the ACLs.
  34. Error error
  35. // ACL resources returned from the describe request.
  36. Resources []ACLResource
  37. }
  38. type ACLResource struct {
  39. ResourceType ResourceType
  40. ResourceName string
  41. PatternType PatternType
  42. ACLs []ACLDescription
  43. }
  44. type ACLDescription struct {
  45. Principal string
  46. Host string
  47. Operation ACLOperationType
  48. PermissionType ACLPermissionType
  49. }
  50. func (c *Client) DescribeACLs(ctx context.Context, req *DescribeACLsRequest) (*DescribeACLsResponse, error) {
  51. m, err := c.roundTrip(ctx, req.Addr, &describeacls.Request{
  52. Filter: describeacls.ACLFilter{
  53. ResourceTypeFilter: int8(req.Filter.ResourceTypeFilter),
  54. ResourceNameFilter: req.Filter.ResourceNameFilter,
  55. ResourcePatternTypeFilter: int8(req.Filter.ResourcePatternTypeFilter),
  56. PrincipalFilter: req.Filter.PrincipalFilter,
  57. HostFilter: req.Filter.HostFilter,
  58. Operation: int8(req.Filter.Operation),
  59. PermissionType: int8(req.Filter.PermissionType),
  60. },
  61. })
  62. if err != nil {
  63. return nil, fmt.Errorf("kafka.(*Client).DescribeACLs: %w", err)
  64. }
  65. res := m.(*describeacls.Response)
  66. resources := make([]ACLResource, len(res.Resources))
  67. for resourceIdx, respResource := range res.Resources {
  68. descriptions := make([]ACLDescription, len(respResource.ACLs))
  69. for descriptionIdx, respDescription := range respResource.ACLs {
  70. descriptions[descriptionIdx] = ACLDescription{
  71. Principal: respDescription.Principal,
  72. Host: respDescription.Host,
  73. Operation: ACLOperationType(respDescription.Operation),
  74. PermissionType: ACLPermissionType(respDescription.PermissionType),
  75. }
  76. }
  77. resources[resourceIdx] = ACLResource{
  78. ResourceType: ResourceType(respResource.ResourceType),
  79. ResourceName: respResource.ResourceName,
  80. PatternType: PatternType(respResource.PatternType),
  81. ACLs: descriptions,
  82. }
  83. }
  84. ret := &DescribeACLsResponse{
  85. Throttle: makeDuration(res.ThrottleTimeMs),
  86. Error: makeError(res.ErrorCode, res.ErrorMessage),
  87. Resources: resources,
  88. }
  89. return ret, nil
  90. }