123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- package kafka
- import (
- "context"
- "fmt"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol/deletegroups"
- )
- // DeleteGroupsRequest represents a request sent to a kafka broker to delete
- // consumer groups.
- type DeleteGroupsRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // Identifiers of groups to delete.
- GroupIDs []string
- }
- // DeleteGroupsResponse represents a response from a kafka broker to a consumer group
- // deletion request.
- type DeleteGroupsResponse struct {
- // The amount of time that the broker throttled the request.
- Throttle time.Duration
- // Mapping of group ids to errors that occurred while attempting to delete those groups.
- //
- // The errors contain the kafka error code. Programs may use the standard
- // errors.Is function to test the error against kafka error codes.
- Errors map[string]error
- }
- // DeleteGroups sends a delete groups request and returns the response. The request is sent to the group coordinator of the first group
- // of the request. All deleted groups must be managed by the same group coordinator.
- func (c *Client) DeleteGroups(
- ctx context.Context,
- req *DeleteGroupsRequest,
- ) (*DeleteGroupsResponse, error) {
- m, err := c.roundTrip(ctx, req.Addr, &deletegroups.Request{
- GroupIDs: req.GroupIDs,
- })
- if err != nil {
- return nil, fmt.Errorf("kafka.(*Client).DeleteGroups: %w", err)
- }
- r := m.(*deletegroups.Response)
- ret := &DeleteGroupsResponse{
- Throttle: makeDuration(r.ThrottleTimeMs),
- Errors: make(map[string]error, len(r.Responses)),
- }
- for _, t := range r.Responses {
- ret.Errors[t.GroupID] = makeError(t.ErrorCode, "")
- }
- return ret, nil
- }
|