produce.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. package kafka
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding"
  6. "errors"
  7. "fmt"
  8. "net"
  9. "strconv"
  10. "time"
  11. "github.com/segmentio/kafka-go/protocol"
  12. produceAPI "github.com/segmentio/kafka-go/protocol/produce"
  13. )
  14. type RequiredAcks int
  15. const (
  16. RequireNone RequiredAcks = 0
  17. RequireOne RequiredAcks = 1
  18. RequireAll RequiredAcks = -1
  19. )
  20. func (acks RequiredAcks) String() string {
  21. switch acks {
  22. case RequireNone:
  23. return "none"
  24. case RequireOne:
  25. return "one"
  26. case RequireAll:
  27. return "all"
  28. default:
  29. return "unknown"
  30. }
  31. }
  32. func (acks RequiredAcks) MarshalText() ([]byte, error) {
  33. return []byte(acks.String()), nil
  34. }
  35. func (acks *RequiredAcks) UnmarshalText(b []byte) error {
  36. switch string(b) {
  37. case "none":
  38. *acks = RequireNone
  39. case "one":
  40. *acks = RequireOne
  41. case "all":
  42. *acks = RequireAll
  43. default:
  44. x, err := strconv.ParseInt(string(b), 10, 64)
  45. parsed := RequiredAcks(x)
  46. if err != nil || (parsed != RequireNone && parsed != RequireOne && parsed != RequireAll) {
  47. return fmt.Errorf("required acks must be one of none, one, or all, not %q", b)
  48. }
  49. *acks = parsed
  50. }
  51. return nil
  52. }
  53. var (
  54. _ encoding.TextMarshaler = RequiredAcks(0)
  55. _ encoding.TextUnmarshaler = (*RequiredAcks)(nil)
  56. )
  57. // ProduceRequest represents a request sent to a kafka broker to produce records
  58. // to a topic partition.
  59. type ProduceRequest struct {
  60. // Address of the kafka broker to send the request to.
  61. Addr net.Addr
  62. // The topic to produce the records to.
  63. Topic string
  64. // The partition to produce the records to.
  65. Partition int
  66. // The level of required acknowledgements to ask the kafka broker for.
  67. RequiredAcks RequiredAcks
  68. // The message format version used when encoding the records.
  69. //
  70. // By default, the client automatically determine which version should be
  71. // used based on the version of the Produce API supported by the server.
  72. MessageVersion int
  73. // An optional transaction id when producing to the kafka broker is part of
  74. // a transaction.
  75. TransactionalID string
  76. // The sequence of records to produce to the topic partition.
  77. Records RecordReader
  78. // An optional compression algorithm to apply to the batch of records sent
  79. // to the kafka broker.
  80. Compression Compression
  81. }
  82. // ProduceResponse represents a response from a kafka broker to a produce
  83. // request.
  84. type ProduceResponse struct {
  85. // The amount of time that the broker throttled the request.
  86. Throttle time.Duration
  87. // An error that may have occurred while attempting to produce the records.
  88. //
  89. // The error contains both the kafka error code, and an error message
  90. // returned by the kafka broker. Programs may use the standard errors.Is
  91. // function to test the error against kafka error codes.
  92. Error error
  93. // Offset of the first record that was written to the topic partition.
  94. //
  95. // This field will be zero if the kafka broker did not support Produce API
  96. // version 3 or above.
  97. BaseOffset int64
  98. // Time at which the broker wrote the records to the topic partition.
  99. //
  100. // This field will be zero if the kafka broker did not support Produce API
  101. // version 2 or above.
  102. LogAppendTime time.Time
  103. // First offset in the topic partition that the records were written to.
  104. //
  105. // This field will be zero if the kafka broker did not support Produce
  106. // API version 5 or above (or if the first offset is zero).
  107. LogStartOffset int64
  108. // If errors occurred writing specific records, they will be reported in
  109. // this map.
  110. //
  111. // This field will always be empty if the kafka broker did not support the
  112. // Produce API in version 8 or above.
  113. RecordErrors map[int]error
  114. }
  115. // Produce sends a produce request to a kafka broker and returns the response.
  116. //
  117. // If the request contained no records, an error wrapping protocol.ErrNoRecord
  118. // is returned.
  119. //
  120. // When the request is configured with RequiredAcks=none, both the response and
  121. // the error will be nil on success.
  122. func (c *Client) Produce(ctx context.Context, req *ProduceRequest) (*ProduceResponse, error) {
  123. attributes := protocol.Attributes(req.Compression) & 0x7
  124. m, err := c.roundTrip(ctx, req.Addr, &produceAPI.Request{
  125. TransactionalID: req.TransactionalID,
  126. Acks: int16(req.RequiredAcks),
  127. Timeout: c.timeoutMs(ctx, defaultProduceTimeout),
  128. Topics: []produceAPI.RequestTopic{{
  129. Topic: req.Topic,
  130. Partitions: []produceAPI.RequestPartition{{
  131. Partition: int32(req.Partition),
  132. RecordSet: protocol.RecordSet{
  133. Attributes: attributes,
  134. Records: req.Records,
  135. },
  136. }},
  137. }},
  138. })
  139. switch {
  140. case err == nil:
  141. case errors.Is(err, protocol.ErrNoRecord):
  142. return new(ProduceResponse), nil
  143. default:
  144. return nil, fmt.Errorf("kafka.(*Client).Produce: %w", err)
  145. }
  146. if req.RequiredAcks == RequireNone {
  147. return nil, nil
  148. }
  149. res := m.(*produceAPI.Response)
  150. if len(res.Topics) == 0 {
  151. return nil, fmt.Errorf("kafka.(*Client).Produce: %w", protocol.ErrNoTopic)
  152. }
  153. topic := &res.Topics[0]
  154. if len(topic.Partitions) == 0 {
  155. return nil, fmt.Errorf("kafka.(*Client).Produce: %w", protocol.ErrNoPartition)
  156. }
  157. partition := &topic.Partitions[0]
  158. ret := &ProduceResponse{
  159. Throttle: makeDuration(res.ThrottleTimeMs),
  160. Error: makeError(partition.ErrorCode, partition.ErrorMessage),
  161. BaseOffset: partition.BaseOffset,
  162. LogAppendTime: makeTime(partition.LogAppendTime),
  163. LogStartOffset: partition.LogStartOffset,
  164. }
  165. if len(partition.RecordErrors) != 0 {
  166. ret.RecordErrors = make(map[int]error, len(partition.RecordErrors))
  167. for _, recErr := range partition.RecordErrors {
  168. ret.RecordErrors[int(recErr.BatchIndex)] = errors.New(recErr.BatchIndexErrorMessage)
  169. }
  170. }
  171. return ret, nil
  172. }
  173. type produceRequestV2 struct {
  174. RequiredAcks int16
  175. Timeout int32
  176. Topics []produceRequestTopicV2
  177. }
  178. func (r produceRequestV2) size() int32 {
  179. return 2 + 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
  180. }
  181. func (r produceRequestV2) writeTo(wb *writeBuffer) {
  182. wb.writeInt16(r.RequiredAcks)
  183. wb.writeInt32(r.Timeout)
  184. wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
  185. }
  186. type produceRequestTopicV2 struct {
  187. TopicName string
  188. Partitions []produceRequestPartitionV2
  189. }
  190. func (t produceRequestTopicV2) size() int32 {
  191. return sizeofString(t.TopicName) +
  192. sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
  193. }
  194. func (t produceRequestTopicV2) writeTo(wb *writeBuffer) {
  195. wb.writeString(t.TopicName)
  196. wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
  197. }
  198. type produceRequestPartitionV2 struct {
  199. Partition int32
  200. MessageSetSize int32
  201. MessageSet messageSet
  202. }
  203. func (p produceRequestPartitionV2) size() int32 {
  204. return 4 + 4 + p.MessageSet.size()
  205. }
  206. func (p produceRequestPartitionV2) writeTo(wb *writeBuffer) {
  207. wb.writeInt32(p.Partition)
  208. wb.writeInt32(p.MessageSetSize)
  209. p.MessageSet.writeTo(wb)
  210. }
  211. type produceResponsePartitionV2 struct {
  212. Partition int32
  213. ErrorCode int16
  214. Offset int64
  215. Timestamp int64
  216. }
  217. func (p produceResponsePartitionV2) size() int32 {
  218. return 4 + 2 + 8 + 8
  219. }
  220. func (p produceResponsePartitionV2) writeTo(wb *writeBuffer) {
  221. wb.writeInt32(p.Partition)
  222. wb.writeInt16(p.ErrorCode)
  223. wb.writeInt64(p.Offset)
  224. wb.writeInt64(p.Timestamp)
  225. }
  226. func (p *produceResponsePartitionV2) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
  227. if remain, err = readInt32(r, sz, &p.Partition); err != nil {
  228. return
  229. }
  230. if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
  231. return
  232. }
  233. if remain, err = readInt64(r, remain, &p.Offset); err != nil {
  234. return
  235. }
  236. if remain, err = readInt64(r, remain, &p.Timestamp); err != nil {
  237. return
  238. }
  239. return
  240. }
  241. type produceResponsePartitionV7 struct {
  242. Partition int32
  243. ErrorCode int16
  244. Offset int64
  245. Timestamp int64
  246. StartOffset int64
  247. }
  248. func (p produceResponsePartitionV7) size() int32 {
  249. return 4 + 2 + 8 + 8 + 8
  250. }
  251. func (p produceResponsePartitionV7) writeTo(wb *writeBuffer) {
  252. wb.writeInt32(p.Partition)
  253. wb.writeInt16(p.ErrorCode)
  254. wb.writeInt64(p.Offset)
  255. wb.writeInt64(p.Timestamp)
  256. wb.writeInt64(p.StartOffset)
  257. }
  258. func (p *produceResponsePartitionV7) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
  259. if remain, err = readInt32(r, sz, &p.Partition); err != nil {
  260. return
  261. }
  262. if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
  263. return
  264. }
  265. if remain, err = readInt64(r, remain, &p.Offset); err != nil {
  266. return
  267. }
  268. if remain, err = readInt64(r, remain, &p.Timestamp); err != nil {
  269. return
  270. }
  271. if remain, err = readInt64(r, remain, &p.StartOffset); err != nil {
  272. return
  273. }
  274. return
  275. }