rawproduce.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package kafka
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "github.com/segmentio/kafka-go/protocol"
  8. produceAPI "github.com/segmentio/kafka-go/protocol/produce"
  9. "github.com/segmentio/kafka-go/protocol/rawproduce"
  10. )
  11. // RawProduceRequest represents a request sent to a kafka broker to produce records
  12. // to a topic partition. The request contains a pre-encoded/raw record set.
  13. type RawProduceRequest struct {
  14. // Address of the kafka broker to send the request to.
  15. Addr net.Addr
  16. // The topic to produce the records to.
  17. Topic string
  18. // The partition to produce the records to.
  19. Partition int
  20. // The level of required acknowledgements to ask the kafka broker for.
  21. RequiredAcks RequiredAcks
  22. // The message format version used when encoding the records.
  23. //
  24. // By default, the client automatically determine which version should be
  25. // used based on the version of the Produce API supported by the server.
  26. MessageVersion int
  27. // An optional transaction id when producing to the kafka broker is part of
  28. // a transaction.
  29. TransactionalID string
  30. // The sequence of records to produce to the topic partition.
  31. RawRecords protocol.RawRecordSet
  32. }
  33. // RawProduce sends a raw produce request to a kafka broker and returns the response.
  34. //
  35. // If the request contained no records, an error wrapping protocol.ErrNoRecord
  36. // is returned.
  37. //
  38. // When the request is configured with RequiredAcks=none, both the response and
  39. // the error will be nil on success.
  40. func (c *Client) RawProduce(ctx context.Context, req *RawProduceRequest) (*ProduceResponse, error) {
  41. m, err := c.roundTrip(ctx, req.Addr, &rawproduce.Request{
  42. TransactionalID: req.TransactionalID,
  43. Acks: int16(req.RequiredAcks),
  44. Timeout: c.timeoutMs(ctx, defaultProduceTimeout),
  45. Topics: []rawproduce.RequestTopic{{
  46. Topic: req.Topic,
  47. Partitions: []rawproduce.RequestPartition{{
  48. Partition: int32(req.Partition),
  49. RecordSet: req.RawRecords,
  50. }},
  51. }},
  52. })
  53. switch {
  54. case err == nil:
  55. case errors.Is(err, protocol.ErrNoRecord):
  56. return new(ProduceResponse), nil
  57. default:
  58. return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", err)
  59. }
  60. if req.RequiredAcks == RequireNone {
  61. return nil, nil
  62. }
  63. res := m.(*produceAPI.Response)
  64. if len(res.Topics) == 0 {
  65. return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", protocol.ErrNoTopic)
  66. }
  67. topic := &res.Topics[0]
  68. if len(topic.Partitions) == 0 {
  69. return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", protocol.ErrNoPartition)
  70. }
  71. partition := &topic.Partitions[0]
  72. ret := &ProduceResponse{
  73. Throttle: makeDuration(res.ThrottleTimeMs),
  74. Error: makeError(partition.ErrorCode, partition.ErrorMessage),
  75. BaseOffset: partition.BaseOffset,
  76. LogAppendTime: makeTime(partition.LogAppendTime),
  77. LogStartOffset: partition.LogStartOffset,
  78. }
  79. if len(partition.RecordErrors) != 0 {
  80. ret.RecordErrors = make(map[int]error, len(partition.RecordErrors))
  81. for _, recErr := range partition.RecordErrors {
  82. ret.RecordErrors[int(recErr.BatchIndex)] = errors.New(recErr.BatchIndexErrorMessage)
  83. }
  84. }
  85. return ret, nil
  86. }