offsetdelete.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/offsetdelete"
  8. )
  9. // OffsetDelete deletes the offset for a consumer group on a particular topic
  10. // for a particular partition.
  11. type OffsetDelete struct {
  12. Topic string
  13. Partition int
  14. }
  15. // OffsetDeleteRequest represents a request sent to a kafka broker to delete
  16. // the offsets for a partition on a given topic associated with a consumer group.
  17. type OffsetDeleteRequest struct {
  18. // Address of the kafka broker to send the request to.
  19. Addr net.Addr
  20. // ID of the consumer group to delete the offsets for.
  21. GroupID string
  22. // Set of topic partitions to delete offsets for.
  23. Topics map[string][]int
  24. }
  25. // OffsetDeleteResponse represents a response from a kafka broker to a delete
  26. // offset request.
  27. type OffsetDeleteResponse struct {
  28. // An error that may have occurred while attempting to delete an offset
  29. Error error
  30. // The amount of time that the broker throttled the request.
  31. Throttle time.Duration
  32. // Set of topic partitions that the kafka broker has additional info (error?)
  33. // for.
  34. Topics map[string][]OffsetDeletePartition
  35. }
  36. // OffsetDeletePartition represents the state of a status of a partition in response
  37. // to deleting offsets.
  38. type OffsetDeletePartition struct {
  39. // ID of the partition.
  40. Partition int
  41. // An error that may have occurred while attempting to delete an offset for
  42. // this partition.
  43. Error error
  44. }
  45. // OffsetDelete sends a delete offset request to a kafka broker and returns the
  46. // response.
  47. func (c *Client) OffsetDelete(ctx context.Context, req *OffsetDeleteRequest) (*OffsetDeleteResponse, error) {
  48. topics := make([]offsetdelete.RequestTopic, 0, len(req.Topics))
  49. for topicName, partitionIndexes := range req.Topics {
  50. partitions := make([]offsetdelete.RequestPartition, len(partitionIndexes))
  51. for i, c := range partitionIndexes {
  52. partitions[i] = offsetdelete.RequestPartition{
  53. PartitionIndex: int32(c),
  54. }
  55. }
  56. topics = append(topics, offsetdelete.RequestTopic{
  57. Name: topicName,
  58. Partitions: partitions,
  59. })
  60. }
  61. m, err := c.roundTrip(ctx, req.Addr, &offsetdelete.Request{
  62. GroupID: req.GroupID,
  63. Topics: topics,
  64. })
  65. if err != nil {
  66. return nil, fmt.Errorf("kafka.(*Client).OffsetDelete: %w", err)
  67. }
  68. r := m.(*offsetdelete.Response)
  69. res := &OffsetDeleteResponse{
  70. Error: makeError(r.ErrorCode, ""),
  71. Throttle: makeDuration(r.ThrottleTimeMs),
  72. Topics: make(map[string][]OffsetDeletePartition, len(r.Topics)),
  73. }
  74. for _, topic := range r.Topics {
  75. partitions := make([]OffsetDeletePartition, len(topic.Partitions))
  76. for i, p := range topic.Partitions {
  77. partitions[i] = OffsetDeletePartition{
  78. Partition: int(p.PartitionIndex),
  79. Error: makeError(p.ErrorCode, ""),
  80. }
  81. }
  82. res.Topics[topic.Name] = partitions
  83. }
  84. return res, nil
  85. }