createacls.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "strings"
  7. "time"
  8. "github.com/segmentio/kafka-go/protocol/createacls"
  9. )
  10. // CreateACLsRequest represents a request sent to a kafka broker to add
  11. // new ACLs.
  12. type CreateACLsRequest struct {
  13. // Address of the kafka broker to send the request to.
  14. Addr net.Addr
  15. // List of ACL to create.
  16. ACLs []ACLEntry
  17. }
  18. // CreateACLsResponse represents a response from a kafka broker to an ACL
  19. // creation request.
  20. type CreateACLsResponse struct {
  21. // The amount of time that the broker throttled the request.
  22. Throttle time.Duration
  23. // List of errors that occurred while attempting to create
  24. // the ACLs.
  25. //
  26. // The errors contain the kafka error code. Programs may use the standard
  27. // errors.Is function to test the error against kafka error codes.
  28. Errors []error
  29. }
  30. type ACLPermissionType int8
  31. const (
  32. ACLPermissionTypeUnknown ACLPermissionType = 0
  33. ACLPermissionTypeAny ACLPermissionType = 1
  34. ACLPermissionTypeDeny ACLPermissionType = 2
  35. ACLPermissionTypeAllow ACLPermissionType = 3
  36. )
  37. func (apt ACLPermissionType) String() string {
  38. mapping := map[ACLPermissionType]string{
  39. ACLPermissionTypeUnknown: "Unknown",
  40. ACLPermissionTypeAny: "Any",
  41. ACLPermissionTypeDeny: "Deny",
  42. ACLPermissionTypeAllow: "Allow",
  43. }
  44. s, ok := mapping[apt]
  45. if !ok {
  46. s = mapping[ACLPermissionTypeUnknown]
  47. }
  48. return s
  49. }
  50. // MarshalText transforms an ACLPermissionType into its string representation.
  51. func (apt ACLPermissionType) MarshalText() ([]byte, error) {
  52. return []byte(apt.String()), nil
  53. }
  54. // UnmarshalText takes a string representation of the resource type and converts it to an ACLPermissionType.
  55. func (apt *ACLPermissionType) UnmarshalText(text []byte) error {
  56. normalized := strings.ToLower(string(text))
  57. mapping := map[string]ACLPermissionType{
  58. "unknown": ACLPermissionTypeUnknown,
  59. "any": ACLPermissionTypeAny,
  60. "deny": ACLPermissionTypeDeny,
  61. "allow": ACLPermissionTypeAllow,
  62. }
  63. parsed, ok := mapping[normalized]
  64. if !ok {
  65. *apt = ACLPermissionTypeUnknown
  66. return fmt.Errorf("cannot parse %s as an ACLPermissionType", normalized)
  67. }
  68. *apt = parsed
  69. return nil
  70. }
  71. type ACLOperationType int8
  72. const (
  73. ACLOperationTypeUnknown ACLOperationType = 0
  74. ACLOperationTypeAny ACLOperationType = 1
  75. ACLOperationTypeAll ACLOperationType = 2
  76. ACLOperationTypeRead ACLOperationType = 3
  77. ACLOperationTypeWrite ACLOperationType = 4
  78. ACLOperationTypeCreate ACLOperationType = 5
  79. ACLOperationTypeDelete ACLOperationType = 6
  80. ACLOperationTypeAlter ACLOperationType = 7
  81. ACLOperationTypeDescribe ACLOperationType = 8
  82. ACLOperationTypeClusterAction ACLOperationType = 9
  83. ACLOperationTypeDescribeConfigs ACLOperationType = 10
  84. ACLOperationTypeAlterConfigs ACLOperationType = 11
  85. ACLOperationTypeIdempotentWrite ACLOperationType = 12
  86. )
  87. func (aot ACLOperationType) String() string {
  88. mapping := map[ACLOperationType]string{
  89. ACLOperationTypeUnknown: "Unknown",
  90. ACLOperationTypeAny: "Any",
  91. ACLOperationTypeAll: "All",
  92. ACLOperationTypeRead: "Read",
  93. ACLOperationTypeWrite: "Write",
  94. ACLOperationTypeCreate: "Create",
  95. ACLOperationTypeDelete: "Delete",
  96. ACLOperationTypeAlter: "Alter",
  97. ACLOperationTypeDescribe: "Describe",
  98. ACLOperationTypeClusterAction: "ClusterAction",
  99. ACLOperationTypeDescribeConfigs: "DescribeConfigs",
  100. ACLOperationTypeAlterConfigs: "AlterConfigs",
  101. ACLOperationTypeIdempotentWrite: "IdempotentWrite",
  102. }
  103. s, ok := mapping[aot]
  104. if !ok {
  105. s = mapping[ACLOperationTypeUnknown]
  106. }
  107. return s
  108. }
  109. // MarshalText transforms an ACLOperationType into its string representation.
  110. func (aot ACLOperationType) MarshalText() ([]byte, error) {
  111. return []byte(aot.String()), nil
  112. }
  113. // UnmarshalText takes a string representation of the resource type and converts it to an ACLPermissionType.
  114. func (aot *ACLOperationType) UnmarshalText(text []byte) error {
  115. normalized := strings.ToLower(string(text))
  116. mapping := map[string]ACLOperationType{
  117. "unknown": ACLOperationTypeUnknown,
  118. "any": ACLOperationTypeAny,
  119. "all": ACLOperationTypeAll,
  120. "read": ACLOperationTypeRead,
  121. "write": ACLOperationTypeWrite,
  122. "create": ACLOperationTypeCreate,
  123. "delete": ACLOperationTypeDelete,
  124. "alter": ACLOperationTypeAlter,
  125. "describe": ACLOperationTypeDescribe,
  126. "clusteraction": ACLOperationTypeClusterAction,
  127. "describeconfigs": ACLOperationTypeDescribeConfigs,
  128. "alterconfigs": ACLOperationTypeAlterConfigs,
  129. "idempotentwrite": ACLOperationTypeIdempotentWrite,
  130. }
  131. parsed, ok := mapping[normalized]
  132. if !ok {
  133. *aot = ACLOperationTypeUnknown
  134. return fmt.Errorf("cannot parse %s as an ACLOperationType", normalized)
  135. }
  136. *aot = parsed
  137. return nil
  138. }
  139. type ACLEntry struct {
  140. ResourceType ResourceType
  141. ResourceName string
  142. ResourcePatternType PatternType
  143. Principal string
  144. Host string
  145. Operation ACLOperationType
  146. PermissionType ACLPermissionType
  147. }
  148. // CreateACLs sends ACLs creation request to a kafka broker and returns the
  149. // response.
  150. func (c *Client) CreateACLs(ctx context.Context, req *CreateACLsRequest) (*CreateACLsResponse, error) {
  151. acls := make([]createacls.RequestACLs, 0, len(req.ACLs))
  152. for _, acl := range req.ACLs {
  153. acls = append(acls, createacls.RequestACLs{
  154. ResourceType: int8(acl.ResourceType),
  155. ResourceName: acl.ResourceName,
  156. ResourcePatternType: int8(acl.ResourcePatternType),
  157. Principal: acl.Principal,
  158. Host: acl.Host,
  159. Operation: int8(acl.Operation),
  160. PermissionType: int8(acl.PermissionType),
  161. })
  162. }
  163. m, err := c.roundTrip(ctx, req.Addr, &createacls.Request{
  164. Creations: acls,
  165. })
  166. if err != nil {
  167. return nil, fmt.Errorf("kafka.(*Client).CreateACLs: %w", err)
  168. }
  169. res := m.(*createacls.Response)
  170. ret := &CreateACLsResponse{
  171. Throttle: makeDuration(res.ThrottleTimeMs),
  172. Errors: make([]error, 0, len(res.Results)),
  173. }
  174. for _, t := range res.Results {
  175. ret.Errors = append(ret.Errors, makeError(t.ErrorCode, t.ErrorMessage))
  176. }
  177. return ret, nil
  178. }