123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- package kafka
- import (
- "context"
- "fmt"
- "net"
- "strings"
- "time"
- "github.com/segmentio/kafka-go/protocol/createacls"
- )
- // CreateACLsRequest represents a request sent to a kafka broker to add
- // new ACLs.
- type CreateACLsRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // List of ACL to create.
- ACLs []ACLEntry
- }
- // CreateACLsResponse represents a response from a kafka broker to an ACL
- // creation request.
- type CreateACLsResponse struct {
- // The amount of time that the broker throttled the request.
- Throttle time.Duration
- // List of errors that occurred while attempting to create
- // the ACLs.
- //
- // The errors contain the kafka error code. Programs may use the standard
- // errors.Is function to test the error against kafka error codes.
- Errors []error
- }
- type ACLPermissionType int8
- const (
- ACLPermissionTypeUnknown ACLPermissionType = 0
- ACLPermissionTypeAny ACLPermissionType = 1
- ACLPermissionTypeDeny ACLPermissionType = 2
- ACLPermissionTypeAllow ACLPermissionType = 3
- )
- func (apt ACLPermissionType) String() string {
- mapping := map[ACLPermissionType]string{
- ACLPermissionTypeUnknown: "Unknown",
- ACLPermissionTypeAny: "Any",
- ACLPermissionTypeDeny: "Deny",
- ACLPermissionTypeAllow: "Allow",
- }
- s, ok := mapping[apt]
- if !ok {
- s = mapping[ACLPermissionTypeUnknown]
- }
- return s
- }
- // MarshalText transforms an ACLPermissionType into its string representation.
- func (apt ACLPermissionType) MarshalText() ([]byte, error) {
- return []byte(apt.String()), nil
- }
- // UnmarshalText takes a string representation of the resource type and converts it to an ACLPermissionType.
- func (apt *ACLPermissionType) UnmarshalText(text []byte) error {
- normalized := strings.ToLower(string(text))
- mapping := map[string]ACLPermissionType{
- "unknown": ACLPermissionTypeUnknown,
- "any": ACLPermissionTypeAny,
- "deny": ACLPermissionTypeDeny,
- "allow": ACLPermissionTypeAllow,
- }
- parsed, ok := mapping[normalized]
- if !ok {
- *apt = ACLPermissionTypeUnknown
- return fmt.Errorf("cannot parse %s as an ACLPermissionType", normalized)
- }
- *apt = parsed
- return nil
- }
- type ACLOperationType int8
- const (
- ACLOperationTypeUnknown ACLOperationType = 0
- ACLOperationTypeAny ACLOperationType = 1
- ACLOperationTypeAll ACLOperationType = 2
- ACLOperationTypeRead ACLOperationType = 3
- ACLOperationTypeWrite ACLOperationType = 4
- ACLOperationTypeCreate ACLOperationType = 5
- ACLOperationTypeDelete ACLOperationType = 6
- ACLOperationTypeAlter ACLOperationType = 7
- ACLOperationTypeDescribe ACLOperationType = 8
- ACLOperationTypeClusterAction ACLOperationType = 9
- ACLOperationTypeDescribeConfigs ACLOperationType = 10
- ACLOperationTypeAlterConfigs ACLOperationType = 11
- ACLOperationTypeIdempotentWrite ACLOperationType = 12
- )
- func (aot ACLOperationType) String() string {
- mapping := map[ACLOperationType]string{
- ACLOperationTypeUnknown: "Unknown",
- ACLOperationTypeAny: "Any",
- ACLOperationTypeAll: "All",
- ACLOperationTypeRead: "Read",
- ACLOperationTypeWrite: "Write",
- ACLOperationTypeCreate: "Create",
- ACLOperationTypeDelete: "Delete",
- ACLOperationTypeAlter: "Alter",
- ACLOperationTypeDescribe: "Describe",
- ACLOperationTypeClusterAction: "ClusterAction",
- ACLOperationTypeDescribeConfigs: "DescribeConfigs",
- ACLOperationTypeAlterConfigs: "AlterConfigs",
- ACLOperationTypeIdempotentWrite: "IdempotentWrite",
- }
- s, ok := mapping[aot]
- if !ok {
- s = mapping[ACLOperationTypeUnknown]
- }
- return s
- }
- // MarshalText transforms an ACLOperationType into its string representation.
- func (aot ACLOperationType) MarshalText() ([]byte, error) {
- return []byte(aot.String()), nil
- }
- // UnmarshalText takes a string representation of the resource type and converts it to an ACLPermissionType.
- func (aot *ACLOperationType) UnmarshalText(text []byte) error {
- normalized := strings.ToLower(string(text))
- mapping := map[string]ACLOperationType{
- "unknown": ACLOperationTypeUnknown,
- "any": ACLOperationTypeAny,
- "all": ACLOperationTypeAll,
- "read": ACLOperationTypeRead,
- "write": ACLOperationTypeWrite,
- "create": ACLOperationTypeCreate,
- "delete": ACLOperationTypeDelete,
- "alter": ACLOperationTypeAlter,
- "describe": ACLOperationTypeDescribe,
- "clusteraction": ACLOperationTypeClusterAction,
- "describeconfigs": ACLOperationTypeDescribeConfigs,
- "alterconfigs": ACLOperationTypeAlterConfigs,
- "idempotentwrite": ACLOperationTypeIdempotentWrite,
- }
- parsed, ok := mapping[normalized]
- if !ok {
- *aot = ACLOperationTypeUnknown
- return fmt.Errorf("cannot parse %s as an ACLOperationType", normalized)
- }
- *aot = parsed
- return nil
- }
- type ACLEntry struct {
- ResourceType ResourceType
- ResourceName string
- ResourcePatternType PatternType
- Principal string
- Host string
- Operation ACLOperationType
- PermissionType ACLPermissionType
- }
- // CreateACLs sends ACLs creation request to a kafka broker and returns the
- // response.
- func (c *Client) CreateACLs(ctx context.Context, req *CreateACLsRequest) (*CreateACLsResponse, error) {
- acls := make([]createacls.RequestACLs, 0, len(req.ACLs))
- for _, acl := range req.ACLs {
- acls = append(acls, createacls.RequestACLs{
- ResourceType: int8(acl.ResourceType),
- ResourceName: acl.ResourceName,
- ResourcePatternType: int8(acl.ResourcePatternType),
- Principal: acl.Principal,
- Host: acl.Host,
- Operation: int8(acl.Operation),
- PermissionType: int8(acl.PermissionType),
- })
- }
- m, err := c.roundTrip(ctx, req.Addr, &createacls.Request{
- Creations: acls,
- })
- if err != nil {
- return nil, fmt.Errorf("kafka.(*Client).CreateACLs: %w", err)
- }
- res := m.(*createacls.Response)
- ret := &CreateACLsResponse{
- Throttle: makeDuration(res.ThrottleTimeMs),
- Errors: make([]error, 0, len(res.Results)),
- }
- for _, t := range res.Results {
- ret.Errors = append(ret.Errors, makeError(t.ErrorCode, t.ErrorMessage))
- }
- return ret, nil
- }
|