initproducerid.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/initproducerid"
  8. )
  9. // InitProducerIDRequest is the request structure for the InitProducerId function
  10. type InitProducerIDRequest struct {
  11. // Address of the kafka broker to send the request to.
  12. Addr net.Addr
  13. // The transactional id key.
  14. TransactionalID string
  15. // Time after which a transaction should time out
  16. TransactionTimeoutMs int
  17. }
  18. // ProducerSession contains useful information about the producer session from the broker's response
  19. type ProducerSession struct {
  20. // The Producer ID (PID) for the current producer session
  21. ProducerID int
  22. // The epoch associated with the current producer session for the given PID
  23. ProducerEpoch int
  24. }
  25. // InitProducerIDResponse is the response structure for the InitProducerId function
  26. type InitProducerIDResponse struct {
  27. // The Transaction/Group Coordinator details
  28. Producer *ProducerSession
  29. // The amount of time that the broker throttled the request.
  30. Throttle time.Duration
  31. // An error that may have occurred while attempting to retrieve initProducerId
  32. //
  33. // The error contains both the kafka error code, and an error message
  34. // returned by the kafka broker.
  35. Error error
  36. }
  37. // InitProducerID sends a initProducerId request to a kafka broker and returns the
  38. // response.
  39. func (c *Client) InitProducerID(ctx context.Context, req *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  40. m, err := c.roundTrip(ctx, req.Addr, &initproducerid.Request{
  41. TransactionalID: req.TransactionalID,
  42. TransactionTimeoutMs: int32(req.TransactionTimeoutMs),
  43. })
  44. if err != nil {
  45. return nil, fmt.Errorf("kafka.(*Client).InitProducerId: %w", err)
  46. }
  47. res := m.(*initproducerid.Response)
  48. return &InitProducerIDResponse{
  49. Producer: &ProducerSession{
  50. ProducerID: int(res.ProducerID),
  51. ProducerEpoch: int(res.ProducerEpoch),
  52. },
  53. Throttle: makeDuration(res.ThrottleTimeMs),
  54. Error: makeError(res.ErrorCode, ""),
  55. }, nil
  56. }