endtxn.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/endtxn"
  8. )
  9. // EndTxnRequest represets a request sent to a kafka broker to end a transaction.
  10. type EndTxnRequest struct {
  11. // Address of the kafka broker to send the request to.
  12. Addr net.Addr
  13. // The transactional id key.
  14. TransactionalID string
  15. // The Producer ID (PID) for the current producer session
  16. ProducerID int
  17. // The epoch associated with the current producer session for the given PID
  18. ProducerEpoch int
  19. // Committed should be set to true if the transaction was committed, false otherwise.
  20. Committed bool
  21. }
  22. // EndTxnResponse represents a resposne from a kafka broker to an end transaction request.
  23. type EndTxnResponse struct {
  24. // The amount of time that the broker throttled the request.
  25. Throttle time.Duration
  26. // Error is non-nil if an error occureda and contains the kafka error code.
  27. // Programs may use the standard errors.Is function to test the error
  28. // against kafka error codes.
  29. Error error
  30. }
  31. // EndTxn sends an EndTxn request to a kafka broker and returns its response.
  32. func (c *Client) EndTxn(ctx context.Context, req *EndTxnRequest) (*EndTxnResponse, error) {
  33. m, err := c.roundTrip(ctx, req.Addr, &endtxn.Request{
  34. TransactionalID: req.TransactionalID,
  35. ProducerID: int64(req.ProducerID),
  36. ProducerEpoch: int16(req.ProducerEpoch),
  37. Committed: req.Committed,
  38. })
  39. if err != nil {
  40. return nil, fmt.Errorf("kafka.(*Client).EndTxn: %w", err)
  41. }
  42. r := m.(*endtxn.Response)
  43. res := &EndTxnResponse{
  44. Throttle: makeDuration(r.ThrottleTimeMs),
  45. Error: makeError(r.ErrorCode, ""),
  46. }
  47. return res, nil
  48. }