123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- package kafka
- import (
- "context"
- "fmt"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol/describeacls"
- )
- // DescribeACLsRequest represents a request sent to a kafka broker to describe
- // existing ACLs.
- type DescribeACLsRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // Filter to filter ACLs on.
- Filter ACLFilter
- }
- type ACLFilter struct {
- ResourceTypeFilter ResourceType
- ResourceNameFilter string
- // ResourcePatternTypeFilter was added in v1 and is not available prior to that.
- ResourcePatternTypeFilter PatternType
- PrincipalFilter string
- HostFilter string
- Operation ACLOperationType
- PermissionType ACLPermissionType
- }
- // DescribeACLsResponse represents a response from a kafka broker to an ACL
- // describe request.
- type DescribeACLsResponse struct {
- // The amount of time that the broker throttled the request.
- Throttle time.Duration
- // Error that occurred while attempting to describe
- // the ACLs.
- Error error
- // ACL resources returned from the describe request.
- Resources []ACLResource
- }
- type ACLResource struct {
- ResourceType ResourceType
- ResourceName string
- PatternType PatternType
- ACLs []ACLDescription
- }
- type ACLDescription struct {
- Principal string
- Host string
- Operation ACLOperationType
- PermissionType ACLPermissionType
- }
- func (c *Client) DescribeACLs(ctx context.Context, req *DescribeACLsRequest) (*DescribeACLsResponse, error) {
- m, err := c.roundTrip(ctx, req.Addr, &describeacls.Request{
- Filter: describeacls.ACLFilter{
- ResourceTypeFilter: int8(req.Filter.ResourceTypeFilter),
- ResourceNameFilter: req.Filter.ResourceNameFilter,
- ResourcePatternTypeFilter: int8(req.Filter.ResourcePatternTypeFilter),
- PrincipalFilter: req.Filter.PrincipalFilter,
- HostFilter: req.Filter.HostFilter,
- Operation: int8(req.Filter.Operation),
- PermissionType: int8(req.Filter.PermissionType),
- },
- })
- if err != nil {
- return nil, fmt.Errorf("kafka.(*Client).DescribeACLs: %w", err)
- }
- res := m.(*describeacls.Response)
- resources := make([]ACLResource, len(res.Resources))
- for resourceIdx, respResource := range res.Resources {
- descriptions := make([]ACLDescription, len(respResource.ACLs))
- for descriptionIdx, respDescription := range respResource.ACLs {
- descriptions[descriptionIdx] = ACLDescription{
- Principal: respDescription.Principal,
- Host: respDescription.Host,
- Operation: ACLOperationType(respDescription.Operation),
- PermissionType: ACLPermissionType(respDescription.PermissionType),
- }
- }
- resources[resourceIdx] = ACLResource{
- ResourceType: ResourceType(respResource.ResourceType),
- ResourceName: respResource.ResourceName,
- PatternType: PatternType(respResource.PatternType),
- ACLs: descriptions,
- }
- }
- ret := &DescribeACLsResponse{
- Throttle: makeDuration(res.ThrottleTimeMs),
- Error: makeError(res.ErrorCode, res.ErrorMessage),
- Resources: resources,
- }
- return ret, nil
- }
|