addpartitionstotxn.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/addpartitionstotxn"
  8. )
  9. // AddPartitionToTxn represents a partition to be added
  10. // to a transaction.
  11. type AddPartitionToTxn struct {
  12. // Partition is the ID of a partition to add to the transaction.
  13. Partition int
  14. }
  15. // AddPartitionsToTxnRequest is the request structure fo the AddPartitionsToTxn function.
  16. type AddPartitionsToTxnRequest struct {
  17. // Address of the kafka broker to send the request to.
  18. Addr net.Addr
  19. // The transactional id key
  20. TransactionalID string
  21. // The Producer ID (PID) for the current producer session;
  22. // received from an InitProducerID request.
  23. ProducerID int
  24. // The epoch associated with the current producer session for the given PID
  25. ProducerEpoch int
  26. // Mappings of topic names to lists of partitions.
  27. Topics map[string][]AddPartitionToTxn
  28. }
  29. // AddPartitionsToTxnResponse is the response structure for the AddPartitionsToTxn function.
  30. type AddPartitionsToTxnResponse struct {
  31. // The amount of time that the broker throttled the request.
  32. Throttle time.Duration
  33. // Mappings of topic names to partitions being added to a transactions.
  34. Topics map[string][]AddPartitionToTxnPartition
  35. }
  36. // AddPartitionToTxnPartition represents the state of a single partition
  37. // in response to adding to a transaction.
  38. type AddPartitionToTxnPartition struct {
  39. // The ID of the partition.
  40. Partition int
  41. // An error that may have occurred when attempting to add the partition
  42. // to a transaction.
  43. //
  44. // The errors contain the kafka error code. Programs may use the standard
  45. // errors.Is function to test the error against kafka error codes.
  46. Error error
  47. }
  48. // AddPartitionsToTnx sends an add partitions to txn request to a kafka broker and returns the response.
  49. func (c *Client) AddPartitionsToTxn(
  50. ctx context.Context,
  51. req *AddPartitionsToTxnRequest,
  52. ) (*AddPartitionsToTxnResponse, error) {
  53. protoReq := &addpartitionstotxn.Request{
  54. TransactionalID: req.TransactionalID,
  55. ProducerID: int64(req.ProducerID),
  56. ProducerEpoch: int16(req.ProducerEpoch),
  57. }
  58. protoReq.Topics = make([]addpartitionstotxn.RequestTopic, 0, len(req.Topics))
  59. for topic, partitions := range req.Topics {
  60. reqTopic := addpartitionstotxn.RequestTopic{
  61. Name: topic,
  62. Partitions: make([]int32, len(partitions)),
  63. }
  64. for i, partition := range partitions {
  65. reqTopic.Partitions[i] = int32(partition.Partition)
  66. }
  67. protoReq.Topics = append(protoReq.Topics, reqTopic)
  68. }
  69. m, err := c.roundTrip(ctx, req.Addr, protoReq)
  70. if err != nil {
  71. return nil, fmt.Errorf("kafka.(*Client).AddPartitionsToTxn: %w", err)
  72. }
  73. r := m.(*addpartitionstotxn.Response)
  74. res := &AddPartitionsToTxnResponse{
  75. Throttle: makeDuration(r.ThrottleTimeMs),
  76. Topics: make(map[string][]AddPartitionToTxnPartition, len(r.Results)),
  77. }
  78. for _, result := range r.Results {
  79. partitions := make([]AddPartitionToTxnPartition, 0, len(result.Results))
  80. for _, rp := range result.Results {
  81. partitions = append(partitions, AddPartitionToTxnPartition{
  82. Partition: int(rp.PartitionIndex),
  83. Error: makeError(rp.ErrorCode, ""),
  84. })
  85. }
  86. res.Topics[result.Name] = partitions
  87. }
  88. return res, nil
  89. }