123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- package kafka
- import (
- "context"
- "fmt"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol/addpartitionstotxn"
- )
- // AddPartitionToTxn represents a partition to be added
- // to a transaction.
- type AddPartitionToTxn struct {
- // Partition is the ID of a partition to add to the transaction.
- Partition int
- }
- // AddPartitionsToTxnRequest is the request structure fo the AddPartitionsToTxn function.
- type AddPartitionsToTxnRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // The transactional id key
- TransactionalID string
- // The Producer ID (PID) for the current producer session;
- // received from an InitProducerID request.
- ProducerID int
- // The epoch associated with the current producer session for the given PID
- ProducerEpoch int
- // Mappings of topic names to lists of partitions.
- Topics map[string][]AddPartitionToTxn
- }
- // AddPartitionsToTxnResponse is the response structure for the AddPartitionsToTxn function.
- type AddPartitionsToTxnResponse struct {
- // The amount of time that the broker throttled the request.
- Throttle time.Duration
- // Mappings of topic names to partitions being added to a transactions.
- Topics map[string][]AddPartitionToTxnPartition
- }
- // AddPartitionToTxnPartition represents the state of a single partition
- // in response to adding to a transaction.
- type AddPartitionToTxnPartition struct {
- // The ID of the partition.
- Partition int
- // An error that may have occurred when attempting to add the partition
- // to a transaction.
- //
- // The errors contain the kafka error code. Programs may use the standard
- // errors.Is function to test the error against kafka error codes.
- Error error
- }
- // AddPartitionsToTnx sends an add partitions to txn request to a kafka broker and returns the response.
- func (c *Client) AddPartitionsToTxn(
- ctx context.Context,
- req *AddPartitionsToTxnRequest,
- ) (*AddPartitionsToTxnResponse, error) {
- protoReq := &addpartitionstotxn.Request{
- TransactionalID: req.TransactionalID,
- ProducerID: int64(req.ProducerID),
- ProducerEpoch: int16(req.ProducerEpoch),
- }
- protoReq.Topics = make([]addpartitionstotxn.RequestTopic, 0, len(req.Topics))
- for topic, partitions := range req.Topics {
- reqTopic := addpartitionstotxn.RequestTopic{
- Name: topic,
- Partitions: make([]int32, len(partitions)),
- }
- for i, partition := range partitions {
- reqTopic.Partitions[i] = int32(partition.Partition)
- }
- protoReq.Topics = append(protoReq.Topics, reqTopic)
- }
- m, err := c.roundTrip(ctx, req.Addr, protoReq)
- if err != nil {
- return nil, fmt.Errorf("kafka.(*Client).AddPartitionsToTxn: %w", err)
- }
- r := m.(*addpartitionstotxn.Response)
- res := &AddPartitionsToTxnResponse{
- Throttle: makeDuration(r.ThrottleTimeMs),
- Topics: make(map[string][]AddPartitionToTxnPartition, len(r.Results)),
- }
- for _, result := range r.Results {
- partitions := make([]AddPartitionToTxnPartition, 0, len(result.Results))
- for _, rp := range result.Results {
- partitions = append(partitions, AddPartitionToTxnPartition{
- Partition: int(rp.PartitionIndex),
- Error: makeError(rp.ErrorCode, ""),
- })
- }
- res.Topics[result.Name] = partitions
- }
- return res, nil
- }
|