txnoffsetcommit.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/txnoffsetcommit"
  8. )
  9. // TxnOffsetCommitRequest represents a request sent to a kafka broker to commit
  10. // offsets for a partition within a transaction.
  11. type TxnOffsetCommitRequest struct {
  12. // Address of the kafka broker to send the request to.
  13. Addr net.Addr
  14. // The transactional id key.
  15. TransactionalID string
  16. // ID of the consumer group to publish the offsets for.
  17. GroupID string
  18. // The Producer ID (PID) for the current producer session;
  19. // received from an InitProducerID request.
  20. ProducerID int
  21. // The epoch associated with the current producer session for the given PID
  22. ProducerEpoch int
  23. // GenerationID is the current generation for the group.
  24. GenerationID int
  25. // ID of the group member submitting the offsets.
  26. MemberID string
  27. // GroupInstanceID is a unique identifier for the consumer.
  28. GroupInstanceID string
  29. // Set of topic partitions to publish the offsets for.
  30. //
  31. // Not that offset commits need to be submitted to the broker acting as the
  32. // group coordinator. This will be automatically resolved by the transport.
  33. Topics map[string][]TxnOffsetCommit
  34. }
  35. // TxnOffsetCommit represent the commit of an offset to a partition within a transaction.
  36. //
  37. // The extra metadata is opaque to the kafka protocol, it is intended to hold
  38. // information like an identifier for the process that committed the offset,
  39. // or the time at which the commit was made.
  40. type TxnOffsetCommit struct {
  41. Partition int
  42. Offset int64
  43. Metadata string
  44. }
  45. // TxnOffsetFetchResponse represents a response from a kafka broker to an offset
  46. // commit request within a transaction.
  47. type TxnOffsetCommitResponse struct {
  48. // The amount of time that the broker throttled the request.
  49. Throttle time.Duration
  50. // Set of topic partitions that the kafka broker has accepted offset commits
  51. // for.
  52. Topics map[string][]TxnOffsetCommitPartition
  53. }
  54. // TxnOffsetFetchPartition represents the state of a single partition in responses
  55. // to committing offsets within a transaction.
  56. type TxnOffsetCommitPartition struct {
  57. // ID of the partition.
  58. Partition int
  59. // An error that may have occurred while attempting to publish consumer
  60. // group offsets for this partition.
  61. //
  62. // The error contains both the kafka error code, and an error message
  63. // returned by the kafka broker. Programs may use the standard errors.Is
  64. // function to test the error against kafka error codes.
  65. Error error
  66. }
  67. // TxnOffsetCommit sends an txn offset commit request to a kafka broker and returns the
  68. // response.
  69. func (c *Client) TxnOffsetCommit(
  70. ctx context.Context,
  71. req *TxnOffsetCommitRequest,
  72. ) (*TxnOffsetCommitResponse, error) {
  73. protoReq := &txnoffsetcommit.Request{
  74. TransactionalID: req.TransactionalID,
  75. GroupID: req.GroupID,
  76. ProducerID: int64(req.ProducerID),
  77. ProducerEpoch: int16(req.ProducerEpoch),
  78. GenerationID: int32(req.GenerationID),
  79. MemberID: req.MemberID,
  80. GroupInstanceID: req.GroupInstanceID,
  81. Topics: make([]txnoffsetcommit.RequestTopic, 0, len(req.Topics)),
  82. }
  83. for topic, partitions := range req.Topics {
  84. parts := make([]txnoffsetcommit.RequestPartition, len(partitions))
  85. for i, partition := range partitions {
  86. parts[i] = txnoffsetcommit.RequestPartition{
  87. Partition: int32(partition.Partition),
  88. CommittedOffset: int64(partition.Offset),
  89. CommittedMetadata: partition.Metadata,
  90. }
  91. }
  92. t := txnoffsetcommit.RequestTopic{
  93. Name: topic,
  94. Partitions: parts,
  95. }
  96. protoReq.Topics = append(protoReq.Topics, t)
  97. }
  98. m, err := c.roundTrip(ctx, req.Addr, protoReq)
  99. if err != nil {
  100. return nil, fmt.Errorf("kafka.(*Client).TxnOffsetCommit: %w", err)
  101. }
  102. r := m.(*txnoffsetcommit.Response)
  103. res := &TxnOffsetCommitResponse{
  104. Throttle: makeDuration(r.ThrottleTimeMs),
  105. Topics: make(map[string][]TxnOffsetCommitPartition, len(r.Topics)),
  106. }
  107. for _, topic := range r.Topics {
  108. partitions := make([]TxnOffsetCommitPartition, 0, len(topic.Partitions))
  109. for _, partition := range topic.Partitions {
  110. partitions = append(partitions, TxnOffsetCommitPartition{
  111. Partition: int(partition.Partition),
  112. Error: makeError(partition.ErrorCode, ""),
  113. })
  114. }
  115. res.Topics[topic.Name] = partitions
  116. }
  117. return res, nil
  118. }