initproducerid.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/initproducerid"
  8. )
  9. // InitProducerIDRequest is the request structure for the InitProducerId function.
  10. type InitProducerIDRequest struct {
  11. // Address of the kafka broker to send the request to.
  12. Addr net.Addr
  13. // The transactional id key.
  14. TransactionalID string
  15. // Time after which a transaction should time out
  16. TransactionTimeoutMs int
  17. // The Producer ID (PID).
  18. // This is used to disambiguate requests if a transactional id is reused following its expiration.
  19. // Only supported in version >=3 of the request, will be ignore otherwise.
  20. ProducerID int
  21. // The producer's current epoch.
  22. // This will be checked against the producer epoch on the broker,
  23. // and the request will return an error if they do not match.
  24. // Only supported in version >=3 of the request, will be ignore otherwise.
  25. ProducerEpoch int
  26. }
  27. // ProducerSession contains useful information about the producer session from the broker's response.
  28. type ProducerSession struct {
  29. // The Producer ID (PID) for the current producer session
  30. ProducerID int
  31. // The epoch associated with the current producer session for the given PID
  32. ProducerEpoch int
  33. }
  34. // InitProducerIDResponse is the response structure for the InitProducerId function.
  35. type InitProducerIDResponse struct {
  36. // The Transaction/Group Coordinator details
  37. Producer *ProducerSession
  38. // The amount of time that the broker throttled the request.
  39. Throttle time.Duration
  40. // An error that may have occurred while attempting to retrieve initProducerId
  41. //
  42. // The error contains both the kafka error code, and an error message
  43. // returned by the kafka broker.
  44. Error error
  45. }
  46. // InitProducerID sends a initProducerId request to a kafka broker and returns the
  47. // response.
  48. func (c *Client) InitProducerID(ctx context.Context, req *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  49. m, err := c.roundTrip(ctx, req.Addr, &initproducerid.Request{
  50. TransactionalID: req.TransactionalID,
  51. TransactionTimeoutMs: int32(req.TransactionTimeoutMs),
  52. ProducerID: int64(req.ProducerID),
  53. ProducerEpoch: int16(req.ProducerEpoch),
  54. })
  55. if err != nil {
  56. return nil, fmt.Errorf("kafka.(*Client).InitProducerId: %w", err)
  57. }
  58. res := m.(*initproducerid.Response)
  59. return &InitProducerIDResponse{
  60. Producer: &ProducerSession{
  61. ProducerID: int(res.ProducerID),
  62. ProducerEpoch: int(res.ProducerEpoch),
  63. },
  64. Throttle: makeDuration(res.ThrottleTimeMs),
  65. Error: makeError(res.ErrorCode, ""),
  66. }, nil
  67. }