writer.go 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103
  1. package kafka
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "io"
  7. "net"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. metadataAPI "github.com/segmentio/kafka-go/protocol/metadata"
  12. )
  13. // The Writer type provides the implementation of a producer of kafka messages
  14. // that automatically distributes messages across partitions of a single topic
  15. // using a configurable balancing policy.
  16. //
  17. // Writes manage the dispatch of messages across partitions of the topic they
  18. // are configured to write to using a Balancer, and aggregate batches to
  19. // optimize the writes to kafka.
  20. //
  21. // Writers may be configured to be used synchronously or asynchronously. When
  22. // use synchronously, calls to WriteMessages block until the messages have been
  23. // written to kafka. In this mode, the program should inspect the error returned
  24. // by the function and test if it an instance of kafka.WriteErrors in order to
  25. // identify which messages have succeeded or failed, for example:
  26. //
  27. // // Construct a synchronous writer (the default mode).
  28. // w := &kafka.Writer{
  29. // Addr: kafka.TCP("localhost:9092"),
  30. // Topic: "topic-A",
  31. // RequiredAcks: kafka.RequireAll,
  32. // }
  33. //
  34. // ...
  35. //
  36. // // Passing a context can prevent the operation from blocking indefinitely.
  37. // switch err := w.WriteMessages(ctx, msgs...).(type) {
  38. // case nil:
  39. // case kafka.WriteErrors:
  40. // for i := range msgs {
  41. // if err[i] != nil {
  42. // // handle the error writing msgs[i]
  43. // ...
  44. // }
  45. // }
  46. // default:
  47. // // handle other errors
  48. // ...
  49. // }
  50. //
  51. // In asynchronous mode, the program may configure a completion handler on the
  52. // writer to receive notifications of messages being written to kafka:
  53. //
  54. // w := &kafka.Writer{
  55. // Addr: kafka.TCP("localhost:9092"),
  56. // Topic: "topic-A",
  57. // RequiredAcks: kafka.RequireAll,
  58. // Async: true, // make the writer asynchronous
  59. // Completion: func(messages []kafka.Message, err error) {
  60. // ...
  61. // },
  62. // }
  63. //
  64. // ...
  65. //
  66. // // Because the writer is asynchronous, there is no need for the context to
  67. // // be cancelled, the call will never block.
  68. // if err := w.WriteMessages(context.Background(), msgs...); err != nil {
  69. // // Only validation errors would be reported in this case.
  70. // ...
  71. // }
  72. //
  73. // Methods of Writer are safe to use concurrently from multiple goroutines,
  74. // however the writer configuration should not be modified after first use.
  75. type Writer struct {
  76. // Address of the kafka cluster that this writer is configured to send
  77. // messages to.
  78. //
  79. // This feild is required, attempting to write messages to a writer with a
  80. // nil address will error.
  81. Addr net.Addr
  82. // Topic is the name of the topic that the writer will produce messages to.
  83. //
  84. // Setting this field or not is a mutually exclusive option. If you set Topic
  85. // here, you must not set Topic for any produced Message. Otherwise, if you do
  86. // not set Topic, every Message must have Topic specified.
  87. Topic string
  88. // The balancer used to distribute messages across partitions.
  89. //
  90. // The default is to use a round-robin distribution.
  91. Balancer Balancer
  92. // Limit on how many attempts will be made to deliver a message.
  93. //
  94. // The default is to try at most 10 times.
  95. MaxAttempts int
  96. // Limit on how many messages will be buffered before being sent to a
  97. // partition.
  98. //
  99. // The default is to use a target batch size of 100 messages.
  100. BatchSize int
  101. // Limit the maximum size of a request in bytes before being sent to
  102. // a partition.
  103. //
  104. // The default is to use a kafka default value of 1048576.
  105. BatchBytes int64
  106. // Time limit on how often incomplete message batches will be flushed to
  107. // kafka.
  108. //
  109. // The default is to flush at least every second.
  110. BatchTimeout time.Duration
  111. // Timeout for read operations performed by the Writer.
  112. //
  113. // Defaults to 10 seconds.
  114. ReadTimeout time.Duration
  115. // Timeout for write operation performed by the Writer.
  116. //
  117. // Defaults to 10 seconds.
  118. WriteTimeout time.Duration
  119. // Number of acknowledges from partition replicas required before receiving
  120. // a response to a produce request, the following values are supported:
  121. //
  122. // RequireNone (0) fire-and-forget, do not wait for acknowledgements from the
  123. // RequireOne (1) wait for the leader to acknowledge the writes
  124. // RequireAll (-1) wait for the full ISR to acknowledge the writes
  125. //
  126. // Defaults to RequireNone.
  127. RequiredAcks RequiredAcks
  128. // Setting this flag to true causes the WriteMessages method to never block.
  129. // It also means that errors are ignored since the caller will not receive
  130. // the returned value. Use this only if you don't care about guarantees of
  131. // whether the messages were written to kafka.
  132. //
  133. // Defaults to false.
  134. Async bool
  135. // An optional function called when the writer succeeds or fails the
  136. // delivery of messages to a kafka partition. When writing the messages
  137. // fails, the `err` parameter will be non-nil.
  138. //
  139. // The messages that the Completion function is called with have their
  140. // topic, partition, offset, and time set based on the Produce responses
  141. // received from kafka. All messages passed to a call to the function have
  142. // been written to the same partition. The keys and values of messages are
  143. // referencing the original byte slices carried by messages in the calls to
  144. // WriteMessages.
  145. //
  146. // The function is called from goroutines started by the writer. Calls to
  147. // Close will block on the Completion function calls. When the Writer is
  148. // not writing asynchronously, the WriteMessages call will also block on
  149. // Completion function, which is a useful guarantee if the byte slices
  150. // for the message keys and values are intended to be reused after the
  151. // WriteMessages call returned.
  152. //
  153. // If a completion function panics, the program terminates because the
  154. // panic is not recovered by the writer and bubbles up to the top of the
  155. // goroutine's call stack.
  156. Completion func(messages []Message, err error)
  157. // Compression set the compression codec to be used to compress messages.
  158. Compression Compression
  159. // If not nil, specifies a logger used to report internal changes within the
  160. // writer.
  161. Logger Logger
  162. // ErrorLogger is the logger used to report errors. If nil, the writer falls
  163. // back to using Logger instead.
  164. ErrorLogger Logger
  165. // A transport used to send messages to kafka clusters.
  166. //
  167. // If nil, DefaultTransport is used.
  168. Transport RoundTripper
  169. // Atomic flag indicating whether the writer has been closed.
  170. closed uint32
  171. group sync.WaitGroup
  172. // Manages the current batch being aggregated on the writer.
  173. mutex sync.Mutex
  174. batches map[topicPartition]*writeBatch
  175. // writer stats are all made of atomic values, no need for synchronization.
  176. // Use a pointer to ensure 64-bit alignment of the values. The once value is
  177. // used to lazily create the value when first used, allowing programs to use
  178. // the zero-value value of Writer.
  179. once sync.Once
  180. *writerStats
  181. // If no balancer is configured, the writer uses this one. RoundRobin values
  182. // are safe to use concurrently from multiple goroutines, there is no need
  183. // for extra synchronization to access this field.
  184. roundRobin RoundRobin
  185. // non-nil when a transport was created by NewWriter, remove in 1.0.
  186. transport *Transport
  187. }
  188. // WriterConfig is a configuration type used to create new instances of Writer.
  189. //
  190. // DEPRECATED: writer values should be configured directly by assigning their
  191. // exported fields. This type is kept for backward compatibility, and will be
  192. // removed in version 1.0.
  193. type WriterConfig struct {
  194. // The list of brokers used to discover the partitions available on the
  195. // kafka cluster.
  196. //
  197. // This field is required, attempting to create a writer with an empty list
  198. // of brokers will panic.
  199. Brokers []string
  200. // The topic that the writer will produce messages to.
  201. //
  202. // If provided, this will be used to set the topic for all produced messages.
  203. // If not provided, each Message must specify a topic for itself. This must be
  204. // mutually exclusive, otherwise the Writer will return an error.
  205. Topic string
  206. // The dialer used by the writer to establish connections to the kafka
  207. // cluster.
  208. //
  209. // If nil, the default dialer is used instead.
  210. Dialer *Dialer
  211. // The balancer used to distribute messages across partitions.
  212. //
  213. // The default is to use a round-robin distribution.
  214. Balancer Balancer
  215. // Limit on how many attempts will be made to deliver a message.
  216. //
  217. // The default is to try at most 10 times.
  218. MaxAttempts int
  219. // DEPRECATED: in versions prior to 0.4, the writer used channels internally
  220. // to dispatch messages to partitions. This has been replaced by an in-memory
  221. // aggregation of batches which uses shared state instead of message passing,
  222. // making this option unnecessary.
  223. QueueCapacity int
  224. // Limit on how many messages will be buffered before being sent to a
  225. // partition.
  226. //
  227. // The default is to use a target batch size of 100 messages.
  228. BatchSize int
  229. // Limit the maximum size of a request in bytes before being sent to
  230. // a partition.
  231. //
  232. // The default is to use a kafka default value of 1048576.
  233. BatchBytes int
  234. // Time limit on how often incomplete message batches will be flushed to
  235. // kafka.
  236. //
  237. // The default is to flush at least every second.
  238. BatchTimeout time.Duration
  239. // Timeout for read operations performed by the Writer.
  240. //
  241. // Defaults to 10 seconds.
  242. ReadTimeout time.Duration
  243. // Timeout for write operation performed by the Writer.
  244. //
  245. // Defaults to 10 seconds.
  246. WriteTimeout time.Duration
  247. // DEPRECATED: in versions prior to 0.4, the writer used to maintain a cache
  248. // the topic layout. With the change to use a transport to manage connections,
  249. // the responsibility of syncing the cluster layout has been delegated to the
  250. // transport.
  251. RebalanceInterval time.Duration
  252. // DEPRECATED: in versions prior to 0.4, the writer used to manage connections
  253. // to the kafka cluster directly. With the change to use a transport to manage
  254. // connections, the writer has no connections to manage directly anymore.
  255. IdleConnTimeout time.Duration
  256. // Number of acknowledges from partition replicas required before receiving
  257. // a response to a produce request. The default is -1, which means to wait for
  258. // all replicas, and a value above 0 is required to indicate how many replicas
  259. // should acknowledge a message to be considered successful.
  260. //
  261. // This version of kafka-go (v0.3) does not support 0 required acks, due to
  262. // some internal complexity implementing this with the Kafka protocol. If you
  263. // need that functionality specifically, you'll need to upgrade to v0.4.
  264. RequiredAcks int
  265. // Setting this flag to true causes the WriteMessages method to never block.
  266. // It also means that errors are ignored since the caller will not receive
  267. // the returned value. Use this only if you don't care about guarantees of
  268. // whether the messages were written to kafka.
  269. Async bool
  270. // CompressionCodec set the codec to be used to compress Kafka messages.
  271. CompressionCodec
  272. // If not nil, specifies a logger used to report internal changes within the
  273. // writer.
  274. Logger Logger
  275. // ErrorLogger is the logger used to report errors. If nil, the writer falls
  276. // back to using Logger instead.
  277. ErrorLogger Logger
  278. }
  279. type topicPartition struct {
  280. topic string
  281. partition int32
  282. }
  283. // Validate method validates WriterConfig properties.
  284. func (config *WriterConfig) Validate() error {
  285. if len(config.Brokers) == 0 {
  286. return errors.New("cannot create a kafka writer with an empty list of brokers")
  287. }
  288. return nil
  289. }
  290. // WriterStats is a data structure returned by a call to Writer.Stats that
  291. // exposes details about the behavior of the writer.
  292. type WriterStats struct {
  293. Writes int64 `metric:"kafka.writer.write.count" type:"counter"`
  294. Messages int64 `metric:"kafka.writer.message.count" type:"counter"`
  295. Bytes int64 `metric:"kafka.writer.message.bytes" type:"counter"`
  296. Errors int64 `metric:"kafka.writer.error.count" type:"counter"`
  297. BatchTime DurationStats `metric:"kafka.writer.batch.seconds"`
  298. WriteTime DurationStats `metric:"kafka.writer.write.seconds"`
  299. WaitTime DurationStats `metric:"kafka.writer.wait.seconds"`
  300. Retries SummaryStats `metric:"kafka.writer.retries.count"`
  301. BatchSize SummaryStats `metric:"kafka.writer.batch.size"`
  302. BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"`
  303. MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
  304. MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"`
  305. BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"`
  306. ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"`
  307. WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"`
  308. RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"`
  309. Async bool `metric:"kafka.writer.async" type:"gauge"`
  310. Topic string `tag:"topic"`
  311. // DEPRECATED: these fields will only be reported for backward compatibility
  312. // if the Writer was constructed with NewWriter.
  313. Dials int64 `metric:"kafka.writer.dial.count" type:"counter"`
  314. DialTime DurationStats `metric:"kafka.writer.dial.seconds"`
  315. // DEPRECATED: these fields were meaningful prior to kafka-go 0.4, changes
  316. // to the internal implementation and the introduction of the transport type
  317. // made them unnecessary.
  318. //
  319. // The values will be zero but are left for backward compatibility to avoid
  320. // breaking programs that used these fields.
  321. Rebalances int64
  322. RebalanceInterval time.Duration
  323. QueueLength int64
  324. QueueCapacity int64
  325. ClientID string
  326. }
  327. // writerStats is a struct that contains statistics on a writer.
  328. //
  329. // Since atomic is used to mutate the statistics the values must be 64-bit aligned.
  330. // This is easily accomplished by always allocating this struct directly, (i.e. using a pointer to the struct).
  331. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  332. type writerStats struct {
  333. dials counter
  334. writes counter
  335. messages counter
  336. bytes counter
  337. errors counter
  338. dialTime summary
  339. batchTime summary
  340. writeTime summary
  341. waitTime summary
  342. retries summary
  343. batchSize summary
  344. batchSizeBytes summary
  345. }
  346. // NewWriter creates and returns a new Writer configured with config.
  347. //
  348. // DEPRECATED: Writer value can be instantiated and configured directly,
  349. // this function is retained for backward compatibility and will be removed
  350. // in version 1.0.
  351. func NewWriter(config WriterConfig) *Writer {
  352. if err := config.Validate(); err != nil {
  353. panic(err)
  354. }
  355. if config.Dialer == nil {
  356. config.Dialer = DefaultDialer
  357. }
  358. if config.Balancer == nil {
  359. config.Balancer = &RoundRobin{}
  360. }
  361. // Converts the pre-0.4 Dialer API into a Transport.
  362. kafkaDialer := DefaultDialer
  363. if config.Dialer != nil {
  364. kafkaDialer = config.Dialer
  365. }
  366. dialer := (&net.Dialer{
  367. Timeout: kafkaDialer.Timeout,
  368. Deadline: kafkaDialer.Deadline,
  369. LocalAddr: kafkaDialer.LocalAddr,
  370. DualStack: kafkaDialer.DualStack,
  371. FallbackDelay: kafkaDialer.FallbackDelay,
  372. KeepAlive: kafkaDialer.KeepAlive,
  373. })
  374. var resolver Resolver
  375. if r, ok := kafkaDialer.Resolver.(*net.Resolver); ok {
  376. dialer.Resolver = r
  377. } else {
  378. resolver = kafkaDialer.Resolver
  379. }
  380. stats := new(writerStats)
  381. // For backward compatibility with the pre-0.4 APIs, support custom
  382. // resolvers by wrapping the dial function.
  383. dial := func(ctx context.Context, network, addr string) (net.Conn, error) {
  384. start := time.Now()
  385. defer func() {
  386. stats.dials.observe(1)
  387. stats.dialTime.observe(int64(time.Since(start)))
  388. }()
  389. address, err := lookupHost(ctx, addr, resolver)
  390. if err != nil {
  391. return nil, err
  392. }
  393. return dialer.DialContext(ctx, network, address)
  394. }
  395. idleTimeout := config.IdleConnTimeout
  396. if idleTimeout == 0 {
  397. // Historical default value of WriterConfig.IdleTimeout, 9 minutes seems
  398. // like it is way too long when there is no ping mechanism in the kafka
  399. // protocol.
  400. idleTimeout = 9 * time.Minute
  401. }
  402. metadataTTL := config.RebalanceInterval
  403. if metadataTTL == 0 {
  404. // Historical default value of WriterConfig.RebalanceInterval.
  405. metadataTTL = 15 * time.Second
  406. }
  407. transport := &Transport{
  408. Dial: dial,
  409. SASL: kafkaDialer.SASLMechanism,
  410. TLS: kafkaDialer.TLS,
  411. ClientID: kafkaDialer.ClientID,
  412. IdleTimeout: idleTimeout,
  413. MetadataTTL: metadataTTL,
  414. }
  415. w := &Writer{
  416. Addr: TCP(config.Brokers...),
  417. Topic: config.Topic,
  418. MaxAttempts: config.MaxAttempts,
  419. BatchSize: config.BatchSize,
  420. Balancer: config.Balancer,
  421. BatchBytes: int64(config.BatchBytes),
  422. BatchTimeout: config.BatchTimeout,
  423. ReadTimeout: config.ReadTimeout,
  424. WriteTimeout: config.WriteTimeout,
  425. RequiredAcks: RequiredAcks(config.RequiredAcks),
  426. Async: config.Async,
  427. Logger: config.Logger,
  428. ErrorLogger: config.ErrorLogger,
  429. Transport: transport,
  430. transport: transport,
  431. writerStats: stats,
  432. }
  433. if config.RequiredAcks == 0 {
  434. // Historically the writers created by NewWriter have used "all" as the
  435. // default value when 0 was specified.
  436. w.RequiredAcks = RequireAll
  437. }
  438. if config.CompressionCodec != nil {
  439. w.Compression = Compression(config.CompressionCodec.Code())
  440. }
  441. return w
  442. }
  443. // Close flushes pending writes, and waits for all writes to complete before
  444. // returning. Calling Close also prevents new writes from being submitted to
  445. // the writer, further calls to WriteMessages and the like will fail with
  446. // io.ErrClosedPipe.
  447. func (w *Writer) Close() error {
  448. w.markClosed()
  449. // If batches are pending, trigger them so messages get sent.
  450. w.mutex.Lock()
  451. for _, batch := range w.batches {
  452. batch.trigger()
  453. }
  454. for partition := range w.batches {
  455. delete(w.batches, partition)
  456. }
  457. w.mutex.Unlock()
  458. w.group.Wait()
  459. if w.transport != nil {
  460. w.transport.CloseIdleConnections()
  461. }
  462. return nil
  463. }
  464. // WriteMessages writes a batch of messages to the kafka topic configured on this
  465. // writer.
  466. //
  467. // Unless the writer was configured to write messages asynchronously, the method
  468. // blocks until all messages have been written, or until the maximum number of
  469. // attempts was reached.
  470. //
  471. // When sending synchronously and the writer's batch size is configured to be
  472. // greater than 1, this method blocks until either a full batch can be assembled
  473. // or the batch timeout is reached. The batch size and timeouts are evaluated
  474. // per partition, so the choice of Balancer can also influence the flushing
  475. // behavior. For example, the Hash balancer will require on average N * batch
  476. // size messages to trigger a flush where N is the number of partitions. The
  477. // best way to achieve good batching behavior is to share one Writer amongst
  478. // multiple go routines.
  479. //
  480. // When the method returns an error, it may be of type kafka.WriteError to allow
  481. // the caller to determine the status of each message.
  482. //
  483. // The context passed as first argument may also be used to asynchronously
  484. // cancel the operation. Note that in this case there are no guarantees made on
  485. // whether messages were written to kafka. The program should assume that the
  486. // whole batch failed and re-write the messages later (which could then cause
  487. // duplicates).
  488. func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
  489. if w.Addr == nil {
  490. return errors.New("kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address")
  491. }
  492. w.group.Add(1)
  493. defer w.group.Done()
  494. if w.isClosed() {
  495. return io.ErrClosedPipe
  496. }
  497. if len(msgs) == 0 {
  498. return nil
  499. }
  500. balancer := w.balancer()
  501. batchBytes := w.batchBytes()
  502. for i := range msgs {
  503. n := int64(msgs[i].size())
  504. if n > batchBytes {
  505. // This error is left for backward compatibility with historical
  506. // behavior, but it can yield O(N^2) behaviors. The expectations
  507. // are that the program will check if WriteMessages returned a
  508. // MessageTooLargeError, discard the message that was exceeding
  509. // the maximum size, and try again.
  510. return messageTooLarge(msgs, i)
  511. }
  512. }
  513. // We use int32 here to half the memory footprint (compared to using int
  514. // on 64 bits architectures). We map lists of the message indexes instead
  515. // of the message values for the same reason, int32 is 4 bytes, vs a full
  516. // Message value which is 100+ bytes and contains pointers and contributes
  517. // to increasing GC work.
  518. assignments := make(map[topicPartition][]int32)
  519. for i, msg := range msgs {
  520. topic, err := w.chooseTopic(msg)
  521. if err != nil {
  522. return err
  523. }
  524. numPartitions, err := w.partitions(ctx, topic)
  525. if err != nil {
  526. return err
  527. }
  528. partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...)
  529. key := topicPartition{
  530. topic: topic,
  531. partition: int32(partition),
  532. }
  533. assignments[key] = append(assignments[key], int32(i))
  534. }
  535. batches := w.batchMessages(msgs, assignments)
  536. if w.Async {
  537. return nil
  538. }
  539. done := ctx.Done()
  540. hasErrors := false
  541. for batch := range batches {
  542. select {
  543. case <-done:
  544. return ctx.Err()
  545. case <-batch.done:
  546. if batch.err != nil {
  547. hasErrors = true
  548. }
  549. }
  550. }
  551. if !hasErrors {
  552. return nil
  553. }
  554. werr := make(WriteErrors, len(msgs))
  555. for batch, indexes := range batches {
  556. for _, i := range indexes {
  557. werr[i] = batch.err
  558. }
  559. }
  560. return werr
  561. }
  562. func (w *Writer) batchMessages(messages []Message, assignments map[topicPartition][]int32) map[*writeBatch][]int32 {
  563. var batches map[*writeBatch][]int32
  564. if !w.Async {
  565. batches = make(map[*writeBatch][]int32, len(assignments))
  566. }
  567. batchSize := w.batchSize()
  568. batchBytes := w.batchBytes()
  569. w.mutex.Lock()
  570. defer w.mutex.Unlock()
  571. if w.batches == nil {
  572. w.batches = map[topicPartition]*writeBatch{}
  573. }
  574. for key, indexes := range assignments {
  575. for _, i := range indexes {
  576. assignMessage:
  577. batch := w.batches[key]
  578. if batch == nil {
  579. batch = w.newWriteBatch(key)
  580. w.batches[key] = batch
  581. }
  582. if !batch.add(messages[i], batchSize, batchBytes) {
  583. batch.trigger()
  584. delete(w.batches, key)
  585. goto assignMessage
  586. }
  587. if batch.full(batchSize, batchBytes) {
  588. batch.trigger()
  589. delete(w.batches, key)
  590. }
  591. if !w.Async {
  592. batches[batch] = append(batches[batch], i)
  593. }
  594. }
  595. }
  596. return batches
  597. }
  598. func (w *Writer) newWriteBatch(key topicPartition) *writeBatch {
  599. batch := newWriteBatch(time.Now(), w.batchTimeout())
  600. w.group.Add(1)
  601. go func() {
  602. defer w.group.Done()
  603. w.writeBatch(key, batch)
  604. }()
  605. return batch
  606. }
  607. func (w *Writer) writeBatch(key topicPartition, batch *writeBatch) {
  608. // This goroutine has taken ownership of the batch, it is responsible
  609. // for waiting for the batch to be ready (because it became full), or
  610. // to timeout.
  611. select {
  612. case <-batch.timer.C:
  613. // The batch timed out, we want to detach it from the writer to
  614. // prevent more messages from being added.
  615. w.mutex.Lock()
  616. if batch == w.batches[key] {
  617. delete(w.batches, key)
  618. }
  619. w.mutex.Unlock()
  620. case <-batch.ready:
  621. // The batch became full, it was removed from the writer and its
  622. // ready channel was closed. We need to close the timer to avoid
  623. // having it leak until it expires.
  624. batch.timer.Stop()
  625. }
  626. stats := w.stats()
  627. stats.batchTime.observe(int64(time.Since(batch.time)))
  628. stats.batchSize.observe(int64(len(batch.msgs)))
  629. stats.batchSizeBytes.observe(batch.bytes)
  630. var res *ProduceResponse
  631. var err error
  632. for attempt, maxAttempts := 0, w.maxAttempts(); attempt < maxAttempts; attempt++ {
  633. if attempt != 0 {
  634. stats.retries.observe(1)
  635. // TODO: should there be a way to asynchronously cancel this
  636. // operation?
  637. //
  638. // * If all goroutines that added message to this batch have stopped
  639. // waiting for it, should we abort?
  640. //
  641. // * If the writer has been closed? It reduces the durability
  642. // guarantees to abort, but may be better to avoid long wait times
  643. // on close.
  644. //
  645. delay := backoff(attempt, 100*time.Millisecond, 1*time.Second)
  646. w.withLogger(func(log Logger) {
  647. log.Printf("backing off %s writing %d messages to %s (partition: %d)", delay, len(batch.msgs), key.topic, key.partition)
  648. })
  649. time.Sleep(delay)
  650. }
  651. w.withLogger(func(log Logger) {
  652. log.Printf("writing %d messages to %s (partition: %d)", len(batch.msgs), key.topic, key.partition)
  653. })
  654. start := time.Now()
  655. res, err = w.produce(key, batch)
  656. stats.writes.observe(1)
  657. stats.messages.observe(int64(len(batch.msgs)))
  658. stats.bytes.observe(batch.bytes)
  659. // stats.writeTime used to report the duration of WriteMessages, but the
  660. // implementation was broken and reporting values in the nanoseconds
  661. // range. In kafka-go 0.4, we recylced this value to instead report the
  662. // duration of produce requests, and changed the stats.waitTime value to
  663. // report the time that kafka has throttled the requests for.
  664. stats.writeTime.observe(int64(time.Since(start)))
  665. if res != nil {
  666. err = res.Error
  667. stats.waitTime.observe(int64(res.Throttle))
  668. }
  669. if err == nil {
  670. break
  671. }
  672. stats.errors.observe(1)
  673. w.withErrorLogger(func(log Logger) {
  674. log.Printf("error writing messages to %s (partition %d): %s", key.topic, key.partition, err)
  675. })
  676. if !isTemporary(err) && !isTransientNetworkError(err) {
  677. break
  678. }
  679. }
  680. if res != nil {
  681. for i := range batch.msgs {
  682. m := &batch.msgs[i]
  683. m.Topic = key.topic
  684. m.Partition = int(key.partition)
  685. m.Offset = res.BaseOffset + int64(i)
  686. if m.Time.IsZero() {
  687. m.Time = res.LogAppendTime
  688. }
  689. }
  690. }
  691. if w.Completion != nil {
  692. w.Completion(batch.msgs, err)
  693. }
  694. batch.complete(err)
  695. }
  696. func (w *Writer) produce(key topicPartition, batch *writeBatch) (*ProduceResponse, error) {
  697. timeout := w.writeTimeout()
  698. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  699. defer cancel()
  700. return w.client(timeout).Produce(ctx, &ProduceRequest{
  701. Partition: int(key.partition),
  702. Topic: key.topic,
  703. RequiredAcks: w.RequiredAcks,
  704. Compression: w.Compression,
  705. Records: &writerRecords{
  706. msgs: batch.msgs,
  707. },
  708. })
  709. }
  710. func (w *Writer) partitions(ctx context.Context, topic string) (int, error) {
  711. client := w.client(w.readTimeout())
  712. // Here we use the transport directly as an optimization to avoid the
  713. // construction of temporary request and response objects made by the
  714. // (*Client).Metadata API.
  715. //
  716. // It is expected that the transport will optimize this request by
  717. // caching recent results (the kafka.Transport types does).
  718. r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{
  719. TopicNames: []string{topic},
  720. })
  721. if err != nil {
  722. return 0, err
  723. }
  724. for _, t := range r.(*metadataAPI.Response).Topics {
  725. if t.Name == topic {
  726. // This should always hit, unless kafka has a bug.
  727. if t.ErrorCode != 0 {
  728. return 0, Error(t.ErrorCode)
  729. }
  730. return len(t.Partitions), nil
  731. }
  732. }
  733. return 0, UnknownTopicOrPartition
  734. }
  735. func (w *Writer) markClosed() {
  736. atomic.StoreUint32(&w.closed, 1)
  737. }
  738. func (w *Writer) isClosed() bool {
  739. return atomic.LoadUint32(&w.closed) != 0
  740. }
  741. func (w *Writer) client(timeout time.Duration) *Client {
  742. return &Client{
  743. Addr: w.Addr,
  744. Transport: w.Transport,
  745. Timeout: timeout,
  746. }
  747. }
  748. func (w *Writer) balancer() Balancer {
  749. if w.Balancer != nil {
  750. return w.Balancer
  751. }
  752. return &w.roundRobin
  753. }
  754. func (w *Writer) maxAttempts() int {
  755. if w.MaxAttempts > 0 {
  756. return w.MaxAttempts
  757. }
  758. // TODO: this is a very high default, if something has failed 9 times it
  759. // seems unlikely it will succeed on the 10th attempt. However, it does
  760. // carry the risk to greatly increase the volume of requests sent to the
  761. // kafka cluster. We should consider reducing this default (3?).
  762. return 10
  763. }
  764. func (w *Writer) batchSize() int {
  765. if w.BatchSize > 0 {
  766. return w.BatchSize
  767. }
  768. return 100
  769. }
  770. func (w *Writer) batchBytes() int64 {
  771. if w.BatchBytes > 0 {
  772. return w.BatchBytes
  773. }
  774. return 1048576
  775. }
  776. func (w *Writer) batchTimeout() time.Duration {
  777. if w.BatchTimeout > 0 {
  778. return w.BatchTimeout
  779. }
  780. return 1 * time.Second
  781. }
  782. func (w *Writer) readTimeout() time.Duration {
  783. if w.ReadTimeout > 0 {
  784. return w.ReadTimeout
  785. }
  786. return 10 * time.Second
  787. }
  788. func (w *Writer) writeTimeout() time.Duration {
  789. if w.WriteTimeout > 0 {
  790. return w.WriteTimeout
  791. }
  792. return 10 * time.Second
  793. }
  794. func (w *Writer) withLogger(do func(Logger)) {
  795. if w.Logger != nil {
  796. do(w.Logger)
  797. }
  798. }
  799. func (w *Writer) withErrorLogger(do func(Logger)) {
  800. if w.ErrorLogger != nil {
  801. do(w.ErrorLogger)
  802. } else {
  803. w.withLogger(do)
  804. }
  805. }
  806. func (w *Writer) stats() *writerStats {
  807. w.once.Do(func() {
  808. // This field is not nil when the writer was constructed with NewWriter
  809. // to share the value with the dial function and count dials.
  810. if w.writerStats == nil {
  811. w.writerStats = new(writerStats)
  812. }
  813. })
  814. return w.writerStats
  815. }
  816. // Stats returns a snapshot of the writer stats since the last time the method
  817. // was called, or since the writer was created if it is called for the first
  818. // time.
  819. //
  820. // A typical use of this method is to spawn a goroutine that will periodically
  821. // call Stats on a kafka writer and report the metrics to a stats collection
  822. // system.
  823. func (w *Writer) Stats() WriterStats {
  824. stats := w.stats()
  825. return WriterStats{
  826. Dials: stats.dials.snapshot(),
  827. Writes: stats.writes.snapshot(),
  828. Messages: stats.messages.snapshot(),
  829. Bytes: stats.bytes.snapshot(),
  830. Errors: stats.errors.snapshot(),
  831. DialTime: stats.dialTime.snapshotDuration(),
  832. BatchTime: stats.batchTime.snapshotDuration(),
  833. WriteTime: stats.writeTime.snapshotDuration(),
  834. WaitTime: stats.waitTime.snapshotDuration(),
  835. Retries: stats.retries.snapshot(),
  836. BatchSize: stats.batchSize.snapshot(),
  837. BatchBytes: stats.batchSizeBytes.snapshot(),
  838. MaxAttempts: int64(w.MaxAttempts),
  839. MaxBatchSize: int64(w.BatchSize),
  840. BatchTimeout: w.BatchTimeout,
  841. ReadTimeout: w.ReadTimeout,
  842. WriteTimeout: w.WriteTimeout,
  843. RequiredAcks: int64(w.RequiredAcks),
  844. Async: w.Async,
  845. Topic: w.Topic,
  846. }
  847. }
  848. func (w *Writer) chooseTopic(msg Message) (string, error) {
  849. // w.Topic and msg.Topic are mutually exclusive, meaning only 1 must be set
  850. // otherwise we will return an error.
  851. if w.Topic != "" && msg.Topic != "" {
  852. return "", errors.New("kafka.(*Writer): Topic must not be specified for both Writer and Message")
  853. } else if w.Topic == "" && msg.Topic == "" {
  854. return "", errors.New("kafka.(*Writer): Topic must be specified for Writer or Message")
  855. }
  856. // now we choose the topic, depending on which one is not empty
  857. if msg.Topic != "" {
  858. return msg.Topic, nil
  859. }
  860. return w.Topic, nil
  861. }
  862. type writeBatch struct {
  863. time time.Time
  864. msgs []Message
  865. size int
  866. bytes int64
  867. ready chan struct{}
  868. done chan struct{}
  869. timer *time.Timer
  870. err error // result of the batch completion
  871. }
  872. func newWriteBatch(now time.Time, timeout time.Duration) *writeBatch {
  873. return &writeBatch{
  874. time: now,
  875. ready: make(chan struct{}),
  876. done: make(chan struct{}),
  877. timer: time.NewTimer(timeout),
  878. }
  879. }
  880. func (b *writeBatch) add(msg Message, maxSize int, maxBytes int64) bool {
  881. bytes := int64(msg.size())
  882. if b.size > 0 && (b.bytes+bytes) > maxBytes {
  883. return false
  884. }
  885. if cap(b.msgs) == 0 {
  886. b.msgs = make([]Message, 0, maxSize)
  887. }
  888. b.msgs = append(b.msgs, msg)
  889. b.size++
  890. b.bytes += bytes
  891. return true
  892. }
  893. func (b *writeBatch) full(maxSize int, maxBytes int64) bool {
  894. return b.size >= maxSize || b.bytes >= maxBytes
  895. }
  896. func (b *writeBatch) trigger() {
  897. close(b.ready)
  898. }
  899. func (b *writeBatch) complete(err error) {
  900. b.err = err
  901. close(b.done)
  902. }
  903. type writerRecords struct {
  904. msgs []Message
  905. index int
  906. record Record
  907. key bytesReadCloser
  908. value bytesReadCloser
  909. }
  910. func (r *writerRecords) ReadRecord() (*Record, error) {
  911. if r.index >= 0 && r.index < len(r.msgs) {
  912. m := &r.msgs[r.index]
  913. r.index++
  914. r.record = Record{
  915. Time: m.Time,
  916. Headers: m.Headers,
  917. }
  918. if m.Key != nil {
  919. r.key.Reset(m.Key)
  920. r.record.Key = &r.key
  921. }
  922. if m.Value != nil {
  923. r.value.Reset(m.Value)
  924. r.record.Value = &r.value
  925. }
  926. return &r.record, nil
  927. }
  928. return nil, io.EOF
  929. }
  930. type bytesReadCloser struct{ bytes.Reader }
  931. func (*bytesReadCloser) Close() error { return nil }
  932. // A cache of []int values passed to balancers of writers, used to amortize the
  933. // heap allocation of the partition index lists.
  934. //
  935. // With hindsight, the use of `...int` to pass the partition list to Balancers
  936. // was not the best design choice: kafka partition numbers are monotonically
  937. // increasing, we could have simply passed the number of partitions instead.
  938. // If we ever revisit this API, we can hopefully remove this cache.
  939. var partitionsCache atomic.Value
  940. func loadCachedPartitions(numPartitions int) []int {
  941. partitions, ok := partitionsCache.Load().([]int)
  942. if ok && len(partitions) >= numPartitions {
  943. return partitions[:numPartitions]
  944. }
  945. const alignment = 128
  946. n := ((numPartitions / alignment) + 1) * alignment
  947. partitions = make([]int, n)
  948. for i := range partitions {
  949. partitions[i] = i
  950. }
  951. partitionsCache.Store(partitions)
  952. return partitions[:numPartitions]
  953. }