deletegroups.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/deletegroups"
  8. )
  9. // DeleteGroupsRequest represents a request sent to a kafka broker to delete
  10. // consumer groups.
  11. type DeleteGroupsRequest struct {
  12. // Address of the kafka broker to send the request to.
  13. Addr net.Addr
  14. // Identifiers of groups to delete.
  15. GroupIDs []string
  16. }
  17. // DeleteGroupsResponse represents a response from a kafka broker to a consumer group
  18. // deletion request.
  19. type DeleteGroupsResponse struct {
  20. // The amount of time that the broker throttled the request.
  21. Throttle time.Duration
  22. // Mapping of group ids to errors that occurred while attempting to delete those groups.
  23. //
  24. // The errors contain the kafka error code. Programs may use the standard
  25. // errors.Is function to test the error against kafka error codes.
  26. Errors map[string]error
  27. }
  28. // DeleteGroups sends a delete groups request and returns the response. The request is sent to the group coordinator of the first group
  29. // of the request. All deleted groups must be managed by the same group coordinator.
  30. func (c *Client) DeleteGroups(
  31. ctx context.Context,
  32. req *DeleteGroupsRequest,
  33. ) (*DeleteGroupsResponse, error) {
  34. m, err := c.roundTrip(ctx, req.Addr, &deletegroups.Request{
  35. GroupIDs: req.GroupIDs,
  36. })
  37. if err != nil {
  38. return nil, fmt.Errorf("kafka.(*Client).DeleteGroups: %w", err)
  39. }
  40. r := m.(*deletegroups.Response)
  41. ret := &DeleteGroupsResponse{
  42. Throttle: makeDuration(r.ThrottleTimeMs),
  43. Errors: make(map[string]error, len(r.Responses)),
  44. }
  45. for _, t := range r.Responses {
  46. ret.Errors[t.GroupID] = makeError(t.ErrorCode, "")
  47. }
  48. return ret, nil
  49. }