123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- package kafka
- import (
- "context"
- "fmt"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol/offsetdelete"
- )
- // OffsetDelete deletes the offset for a consumer group on a particular topic
- // for a particular partition.
- type OffsetDelete struct {
- Topic string
- Partition int
- }
- // OffsetDeleteRequest represents a request sent to a kafka broker to delete
- // the offsets for a partition on a given topic associated with a consumer group.
- type OffsetDeleteRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // ID of the consumer group to delete the offsets for.
- GroupID string
- // Set of topic partitions to delete offsets for.
- Topics map[string][]int
- }
- // OffsetDeleteResponse represents a response from a kafka broker to a delete
- // offset request.
- type OffsetDeleteResponse struct {
- // An error that may have occurred while attempting to delete an offset
- Error error
- // The amount of time that the broker throttled the request.
- Throttle time.Duration
- // Set of topic partitions that the kafka broker has additional info (error?)
- // for.
- Topics map[string][]OffsetDeletePartition
- }
- // OffsetDeletePartition represents the state of a status of a partition in response
- // to deleting offsets.
- type OffsetDeletePartition struct {
- // ID of the partition.
- Partition int
- // An error that may have occurred while attempting to delete an offset for
- // this partition.
- Error error
- }
- // OffsetDelete sends a delete offset request to a kafka broker and returns the
- // response.
- func (c *Client) OffsetDelete(ctx context.Context, req *OffsetDeleteRequest) (*OffsetDeleteResponse, error) {
- topics := make([]offsetdelete.RequestTopic, 0, len(req.Topics))
- for topicName, partitionIndexes := range req.Topics {
- partitions := make([]offsetdelete.RequestPartition, len(partitionIndexes))
- for i, c := range partitionIndexes {
- partitions[i] = offsetdelete.RequestPartition{
- PartitionIndex: int32(c),
- }
- }
- topics = append(topics, offsetdelete.RequestTopic{
- Name: topicName,
- Partitions: partitions,
- })
- }
- m, err := c.roundTrip(ctx, req.Addr, &offsetdelete.Request{
- GroupID: req.GroupID,
- Topics: topics,
- })
- if err != nil {
- return nil, fmt.Errorf("kafka.(*Client).OffsetDelete: %w", err)
- }
- r := m.(*offsetdelete.Response)
- res := &OffsetDeleteResponse{
- Error: makeError(r.ErrorCode, ""),
- Throttle: makeDuration(r.ThrottleTimeMs),
- Topics: make(map[string][]OffsetDeletePartition, len(r.Topics)),
- }
- for _, topic := range r.Topics {
- partitions := make([]OffsetDeletePartition, len(topic.Partitions))
- for i, p := range topic.Partitions {
- partitions[i] = OffsetDeletePartition{
- Partition: int(p.PartitionIndex),
- Error: makeError(p.ErrorCode, ""),
- }
- }
- res.Topics[topic.Name] = partitions
- }
- return res, nil
- }
|