deleteacls.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/deleteacls"
  8. )
  9. // DeleteACLsRequest represents a request sent to a kafka broker to delete
  10. // ACLs.
  11. type DeleteACLsRequest struct {
  12. // Address of the kafka broker to send the request to.
  13. Addr net.Addr
  14. // List of ACL filters to use for deletion.
  15. Filters []DeleteACLsFilter
  16. }
  17. type DeleteACLsFilter struct {
  18. ResourceTypeFilter ResourceType
  19. ResourceNameFilter string
  20. ResourcePatternTypeFilter PatternType
  21. PrincipalFilter string
  22. HostFilter string
  23. Operation ACLOperationType
  24. PermissionType ACLPermissionType
  25. }
  26. // DeleteACLsResponse represents a response from a kafka broker to an ACL
  27. // deletion request.
  28. type DeleteACLsResponse struct {
  29. // The amount of time that the broker throttled the request.
  30. Throttle time.Duration
  31. // List of the results from the deletion request.
  32. Results []DeleteACLsResult
  33. }
  34. type DeleteACLsResult struct {
  35. Error error
  36. MatchingACLs []DeleteACLsMatchingACLs
  37. }
  38. type DeleteACLsMatchingACLs struct {
  39. Error error
  40. ResourceType ResourceType
  41. ResourceName string
  42. ResourcePatternType PatternType
  43. Principal string
  44. Host string
  45. Operation ACLOperationType
  46. PermissionType ACLPermissionType
  47. }
  48. // DeleteACLs sends ACLs deletion request to a kafka broker and returns the
  49. // response.
  50. func (c *Client) DeleteACLs(ctx context.Context, req *DeleteACLsRequest) (*DeleteACLsResponse, error) {
  51. filters := make([]deleteacls.RequestFilter, 0, len(req.Filters))
  52. for _, filter := range req.Filters {
  53. filters = append(filters, deleteacls.RequestFilter{
  54. ResourceTypeFilter: int8(filter.ResourceTypeFilter),
  55. ResourceNameFilter: filter.ResourceNameFilter,
  56. ResourcePatternTypeFilter: int8(filter.ResourcePatternTypeFilter),
  57. PrincipalFilter: filter.PrincipalFilter,
  58. HostFilter: filter.HostFilter,
  59. Operation: int8(filter.Operation),
  60. PermissionType: int8(filter.PermissionType),
  61. })
  62. }
  63. m, err := c.roundTrip(ctx, req.Addr, &deleteacls.Request{
  64. Filters: filters,
  65. })
  66. if err != nil {
  67. return nil, fmt.Errorf("kafka.(*Client).DeleteACLs: %w", err)
  68. }
  69. res := m.(*deleteacls.Response)
  70. results := make([]DeleteACLsResult, 0, len(res.FilterResults))
  71. for _, result := range res.FilterResults {
  72. matchingACLs := make([]DeleteACLsMatchingACLs, 0, len(result.MatchingACLs))
  73. for _, matchingACL := range result.MatchingACLs {
  74. matchingACLs = append(matchingACLs, DeleteACLsMatchingACLs{
  75. Error: makeError(matchingACL.ErrorCode, matchingACL.ErrorMessage),
  76. ResourceType: ResourceType(matchingACL.ResourceType),
  77. ResourceName: matchingACL.ResourceName,
  78. ResourcePatternType: PatternType(matchingACL.ResourcePatternType),
  79. Principal: matchingACL.Principal,
  80. Host: matchingACL.Host,
  81. Operation: ACLOperationType(matchingACL.Operation),
  82. PermissionType: ACLPermissionType(matchingACL.PermissionType),
  83. })
  84. }
  85. results = append(results, DeleteACLsResult{
  86. Error: makeError(result.ErrorCode, result.ErrorMessage),
  87. MatchingACLs: matchingACLs,
  88. })
  89. }
  90. ret := &DeleteACLsResponse{
  91. Throttle: makeDuration(res.ThrottleTimeMs),
  92. Results: results,
  93. }
  94. return ret, nil
  95. }