addoffsetstotxn.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/addoffsetstotxn"
  8. )
  9. // AddOffsetsToTxnRequest is the request structure for the AddOffsetsToTxn function.
  10. type AddOffsetsToTxnRequest struct {
  11. // Address of the kafka broker to send the request to.
  12. Addr net.Addr
  13. // The transactional id key
  14. TransactionalID string
  15. // The Producer ID (PID) for the current producer session;
  16. // received from an InitProducerID request.
  17. ProducerID int
  18. // The epoch associated with the current producer session for the given PID
  19. ProducerEpoch int
  20. // The unique group identifier.
  21. GroupID string
  22. }
  23. // AddOffsetsToTxnResponse is the response structure for the AddOffsetsToTxn function.
  24. type AddOffsetsToTxnResponse struct {
  25. // The amount of time that the broker throttled the request.
  26. Throttle time.Duration
  27. // An error that may have occurred when attempting to add the offsets
  28. // to a transaction.
  29. //
  30. // The errors contain the kafka error code. Programs may use the standard
  31. // errors.Is function to test the error against kafka error codes.
  32. Error error
  33. }
  34. // AddOffsetsToTnx sends an add offsets to txn request to a kafka broker and returns the response.
  35. func (c *Client) AddOffsetsToTxn(
  36. ctx context.Context,
  37. req *AddOffsetsToTxnRequest,
  38. ) (*AddOffsetsToTxnResponse, error) {
  39. m, err := c.roundTrip(ctx, req.Addr, &addoffsetstotxn.Request{
  40. TransactionalID: req.TransactionalID,
  41. ProducerID: int64(req.ProducerID),
  42. ProducerEpoch: int16(req.ProducerEpoch),
  43. GroupID: req.GroupID,
  44. })
  45. if err != nil {
  46. return nil, fmt.Errorf("kafka.(*Client).AddOffsetsToTxn: %w", err)
  47. }
  48. r := m.(*addoffsetstotxn.Response)
  49. res := &AddOffsetsToTxnResponse{
  50. Throttle: makeDuration(r.ThrottleTimeMs),
  51. Error: makeError(r.ErrorCode, ""),
  52. }
  53. return res, nil
  54. }