writer.go 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309
  1. package kafka
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "io"
  7. "net"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. metadataAPI "github.com/segmentio/kafka-go/protocol/metadata"
  12. )
  13. // The Writer type provides the implementation of a producer of kafka messages
  14. // that automatically distributes messages across partitions of a single topic
  15. // using a configurable balancing policy.
  16. //
  17. // Writes manage the dispatch of messages across partitions of the topic they
  18. // are configured to write to using a Balancer, and aggregate batches to
  19. // optimize the writes to kafka.
  20. //
  21. // Writers may be configured to be used synchronously or asynchronously. When
  22. // use synchronously, calls to WriteMessages block until the messages have been
  23. // written to kafka. In this mode, the program should inspect the error returned
  24. // by the function and test if it an instance of kafka.WriteErrors in order to
  25. // identify which messages have succeeded or failed, for example:
  26. //
  27. // // Construct a synchronous writer (the default mode).
  28. // w := &kafka.Writer{
  29. // Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
  30. // Topic: "topic-A",
  31. // RequiredAcks: kafka.RequireAll,
  32. // }
  33. //
  34. // ...
  35. //
  36. // // Passing a context can prevent the operation from blocking indefinitely.
  37. // switch err := w.WriteMessages(ctx, msgs...).(type) {
  38. // case nil:
  39. // case kafka.WriteErrors:
  40. // for i := range msgs {
  41. // if err[i] != nil {
  42. // // handle the error writing msgs[i]
  43. // ...
  44. // }
  45. // }
  46. // default:
  47. // // handle other errors
  48. // ...
  49. // }
  50. //
  51. // In asynchronous mode, the program may configure a completion handler on the
  52. // writer to receive notifications of messages being written to kafka:
  53. //
  54. // w := &kafka.Writer{
  55. // Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
  56. // Topic: "topic-A",
  57. // RequiredAcks: kafka.RequireAll,
  58. // Async: true, // make the writer asynchronous
  59. // Completion: func(messages []kafka.Message, err error) {
  60. // ...
  61. // },
  62. // }
  63. //
  64. // ...
  65. //
  66. // // Because the writer is asynchronous, there is no need for the context to
  67. // // be cancelled, the call will never block.
  68. // if err := w.WriteMessages(context.Background(), msgs...); err != nil {
  69. // // Only validation errors would be reported in this case.
  70. // ...
  71. // }
  72. //
  73. // Methods of Writer are safe to use concurrently from multiple goroutines,
  74. // however the writer configuration should not be modified after first use.
  75. type Writer struct {
  76. // Address of the kafka cluster that this writer is configured to send
  77. // messages to.
  78. //
  79. // This field is required, attempting to write messages to a writer with a
  80. // nil address will error.
  81. Addr net.Addr
  82. // Topic is the name of the topic that the writer will produce messages to.
  83. //
  84. // Setting this field or not is a mutually exclusive option. If you set Topic
  85. // here, you must not set Topic for any produced Message. Otherwise, if you do
  86. // not set Topic, every Message must have Topic specified.
  87. Topic string
  88. // The balancer used to distribute messages across partitions.
  89. //
  90. // The default is to use a round-robin distribution.
  91. Balancer Balancer
  92. // Limit on how many attempts will be made to deliver a message.
  93. //
  94. // The default is to try at most 10 times.
  95. MaxAttempts int
  96. // WriteBackoffMin optionally sets the smallest amount of time the writer waits before
  97. // it attempts to write a batch of messages
  98. //
  99. // Default: 100ms
  100. WriteBackoffMin time.Duration
  101. // WriteBackoffMax optionally sets the maximum amount of time the writer waits before
  102. // it attempts to write a batch of messages
  103. //
  104. // Default: 1s
  105. WriteBackoffMax time.Duration
  106. // Limit on how many messages will be buffered before being sent to a
  107. // partition.
  108. //
  109. // The default is to use a target batch size of 100 messages.
  110. BatchSize int
  111. // Limit the maximum size of a request in bytes before being sent to
  112. // a partition.
  113. //
  114. // The default is to use a kafka default value of 1048576.
  115. BatchBytes int64
  116. // Time limit on how often incomplete message batches will be flushed to
  117. // kafka.
  118. //
  119. // The default is to flush at least every second.
  120. BatchTimeout time.Duration
  121. // Timeout for read operations performed by the Writer.
  122. //
  123. // Defaults to 10 seconds.
  124. ReadTimeout time.Duration
  125. // Timeout for write operation performed by the Writer.
  126. //
  127. // Defaults to 10 seconds.
  128. WriteTimeout time.Duration
  129. // Number of acknowledges from partition replicas required before receiving
  130. // a response to a produce request, the following values are supported:
  131. //
  132. // RequireNone (0) fire-and-forget, do not wait for acknowledgements from the
  133. // RequireOne (1) wait for the leader to acknowledge the writes
  134. // RequireAll (-1) wait for the full ISR to acknowledge the writes
  135. //
  136. // Defaults to RequireNone.
  137. RequiredAcks RequiredAcks
  138. // Setting this flag to true causes the WriteMessages method to never block.
  139. // It also means that errors are ignored since the caller will not receive
  140. // the returned value. Use this only if you don't care about guarantees of
  141. // whether the messages were written to kafka.
  142. //
  143. // Defaults to false.
  144. Async bool
  145. // An optional function called when the writer succeeds or fails the
  146. // delivery of messages to a kafka partition. When writing the messages
  147. // fails, the `err` parameter will be non-nil.
  148. //
  149. // The messages that the Completion function is called with have their
  150. // topic, partition, offset, and time set based on the Produce responses
  151. // received from kafka. All messages passed to a call to the function have
  152. // been written to the same partition. The keys and values of messages are
  153. // referencing the original byte slices carried by messages in the calls to
  154. // WriteMessages.
  155. //
  156. // The function is called from goroutines started by the writer. Calls to
  157. // Close will block on the Completion function calls. When the Writer is
  158. // not writing asynchronously, the WriteMessages call will also block on
  159. // Completion function, which is a useful guarantee if the byte slices
  160. // for the message keys and values are intended to be reused after the
  161. // WriteMessages call returned.
  162. //
  163. // If a completion function panics, the program terminates because the
  164. // panic is not recovered by the writer and bubbles up to the top of the
  165. // goroutine's call stack.
  166. Completion func(messages []Message, err error)
  167. // Compression set the compression codec to be used to compress messages.
  168. Compression Compression
  169. // If not nil, specifies a logger used to report internal changes within the
  170. // writer.
  171. Logger Logger
  172. // ErrorLogger is the logger used to report errors. If nil, the writer falls
  173. // back to using Logger instead.
  174. ErrorLogger Logger
  175. // A transport used to send messages to kafka clusters.
  176. //
  177. // If nil, DefaultTransport is used.
  178. Transport RoundTripper
  179. // AllowAutoTopicCreation notifies writer to create topic if missing.
  180. AllowAutoTopicCreation bool
  181. // Manages the current set of partition-topic writers.
  182. group sync.WaitGroup
  183. mutex sync.Mutex
  184. closed bool
  185. writers map[topicPartition]*partitionWriter
  186. // writer stats are all made of atomic values, no need for synchronization.
  187. // Use a pointer to ensure 64-bit alignment of the values. The once value is
  188. // used to lazily create the value when first used, allowing programs to use
  189. // the zero-value value of Writer.
  190. once sync.Once
  191. *writerStats
  192. // If no balancer is configured, the writer uses this one. RoundRobin values
  193. // are safe to use concurrently from multiple goroutines, there is no need
  194. // for extra synchronization to access this field.
  195. roundRobin RoundRobin
  196. // non-nil when a transport was created by NewWriter, remove in 1.0.
  197. transport *Transport
  198. }
  199. // WriterConfig is a configuration type used to create new instances of Writer.
  200. //
  201. // DEPRECATED: writer values should be configured directly by assigning their
  202. // exported fields. This type is kept for backward compatibility, and will be
  203. // removed in version 1.0.
  204. type WriterConfig struct {
  205. // The list of brokers used to discover the partitions available on the
  206. // kafka cluster.
  207. //
  208. // This field is required, attempting to create a writer with an empty list
  209. // of brokers will panic.
  210. Brokers []string
  211. // The topic that the writer will produce messages to.
  212. //
  213. // If provided, this will be used to set the topic for all produced messages.
  214. // If not provided, each Message must specify a topic for itself. This must be
  215. // mutually exclusive, otherwise the Writer will return an error.
  216. Topic string
  217. // The dialer used by the writer to establish connections to the kafka
  218. // cluster.
  219. //
  220. // If nil, the default dialer is used instead.
  221. Dialer *Dialer
  222. // The balancer used to distribute messages across partitions.
  223. //
  224. // The default is to use a round-robin distribution.
  225. Balancer Balancer
  226. // Limit on how many attempts will be made to deliver a message.
  227. //
  228. // The default is to try at most 10 times.
  229. MaxAttempts int
  230. // DEPRECATED: in versions prior to 0.4, the writer used channels internally
  231. // to dispatch messages to partitions. This has been replaced by an in-memory
  232. // aggregation of batches which uses shared state instead of message passing,
  233. // making this option unnecessary.
  234. QueueCapacity int
  235. // Limit on how many messages will be buffered before being sent to a
  236. // partition.
  237. //
  238. // The default is to use a target batch size of 100 messages.
  239. BatchSize int
  240. // Limit the maximum size of a request in bytes before being sent to
  241. // a partition.
  242. //
  243. // The default is to use a kafka default value of 1048576.
  244. BatchBytes int
  245. // Time limit on how often incomplete message batches will be flushed to
  246. // kafka.
  247. //
  248. // The default is to flush at least every second.
  249. BatchTimeout time.Duration
  250. // Timeout for read operations performed by the Writer.
  251. //
  252. // Defaults to 10 seconds.
  253. ReadTimeout time.Duration
  254. // Timeout for write operation performed by the Writer.
  255. //
  256. // Defaults to 10 seconds.
  257. WriteTimeout time.Duration
  258. // DEPRECATED: in versions prior to 0.4, the writer used to maintain a cache
  259. // the topic layout. With the change to use a transport to manage connections,
  260. // the responsibility of syncing the cluster layout has been delegated to the
  261. // transport.
  262. RebalanceInterval time.Duration
  263. // DEPRECATED: in versions prior to 0.4, the writer used to manage connections
  264. // to the kafka cluster directly. With the change to use a transport to manage
  265. // connections, the writer has no connections to manage directly anymore.
  266. IdleConnTimeout time.Duration
  267. // Number of acknowledges from partition replicas required before receiving
  268. // a response to a produce request. The default is -1, which means to wait for
  269. // all replicas, and a value above 0 is required to indicate how many replicas
  270. // should acknowledge a message to be considered successful.
  271. RequiredAcks int
  272. // Setting this flag to true causes the WriteMessages method to never block.
  273. // It also means that errors are ignored since the caller will not receive
  274. // the returned value. Use this only if you don't care about guarantees of
  275. // whether the messages were written to kafka.
  276. Async bool
  277. // CompressionCodec set the codec to be used to compress Kafka messages.
  278. CompressionCodec
  279. // If not nil, specifies a logger used to report internal changes within the
  280. // writer.
  281. Logger Logger
  282. // ErrorLogger is the logger used to report errors. If nil, the writer falls
  283. // back to using Logger instead.
  284. ErrorLogger Logger
  285. }
  286. type topicPartition struct {
  287. topic string
  288. partition int32
  289. }
  290. // Validate method validates WriterConfig properties.
  291. func (config *WriterConfig) Validate() error {
  292. if len(config.Brokers) == 0 {
  293. return errors.New("cannot create a kafka writer with an empty list of brokers")
  294. }
  295. return nil
  296. }
  297. // WriterStats is a data structure returned by a call to Writer.Stats that
  298. // exposes details about the behavior of the writer.
  299. type WriterStats struct {
  300. Writes int64 `metric:"kafka.writer.write.count" type:"counter"`
  301. Messages int64 `metric:"kafka.writer.message.count" type:"counter"`
  302. Bytes int64 `metric:"kafka.writer.message.bytes" type:"counter"`
  303. Errors int64 `metric:"kafka.writer.error.count" type:"counter"`
  304. BatchTime DurationStats `metric:"kafka.writer.batch.seconds"`
  305. BatchQueueTime DurationStats `metric:"kafka.writer.batch.queue.seconds"`
  306. WriteTime DurationStats `metric:"kafka.writer.write.seconds"`
  307. WaitTime DurationStats `metric:"kafka.writer.wait.seconds"`
  308. Retries int64 `metric:"kafka.writer.retries.count" type:"counter"`
  309. BatchSize SummaryStats `metric:"kafka.writer.batch.size"`
  310. BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"`
  311. MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
  312. WriteBackoffMin time.Duration `metric:"kafka.writer.backoff.min" type:"gauge"`
  313. WriteBackoffMax time.Duration `metric:"kafka.writer.backoff.max" type:"gauge"`
  314. MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"`
  315. BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"`
  316. ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"`
  317. WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"`
  318. RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"`
  319. Async bool `metric:"kafka.writer.async" type:"gauge"`
  320. Topic string `tag:"topic"`
  321. // DEPRECATED: these fields will only be reported for backward compatibility
  322. // if the Writer was constructed with NewWriter.
  323. Dials int64 `metric:"kafka.writer.dial.count" type:"counter"`
  324. DialTime DurationStats `metric:"kafka.writer.dial.seconds"`
  325. // DEPRECATED: these fields were meaningful prior to kafka-go 0.4, changes
  326. // to the internal implementation and the introduction of the transport type
  327. // made them unnecessary.
  328. //
  329. // The values will be zero but are left for backward compatibility to avoid
  330. // breaking programs that used these fields.
  331. Rebalances int64
  332. RebalanceInterval time.Duration
  333. QueueLength int64
  334. QueueCapacity int64
  335. ClientID string
  336. }
  337. // writerStats is a struct that contains statistics on a writer.
  338. //
  339. // Since atomic is used to mutate the statistics the values must be 64-bit aligned.
  340. // This is easily accomplished by always allocating this struct directly, (i.e. using a pointer to the struct).
  341. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  342. type writerStats struct {
  343. dials counter
  344. writes counter
  345. messages counter
  346. bytes counter
  347. errors counter
  348. dialTime summary
  349. batchTime summary
  350. batchQueueTime summary
  351. writeTime summary
  352. waitTime summary
  353. retries counter
  354. batchSize summary
  355. batchSizeBytes summary
  356. }
  357. // NewWriter creates and returns a new Writer configured with config.
  358. //
  359. // DEPRECATED: Writer value can be instantiated and configured directly,
  360. // this function is retained for backward compatibility and will be removed
  361. // in version 1.0.
  362. func NewWriter(config WriterConfig) *Writer {
  363. if err := config.Validate(); err != nil {
  364. panic(err)
  365. }
  366. if config.Dialer == nil {
  367. config.Dialer = DefaultDialer
  368. }
  369. if config.Balancer == nil {
  370. config.Balancer = &RoundRobin{}
  371. }
  372. // Converts the pre-0.4 Dialer API into a Transport.
  373. kafkaDialer := DefaultDialer
  374. if config.Dialer != nil {
  375. kafkaDialer = config.Dialer
  376. }
  377. dialer := (&net.Dialer{
  378. Timeout: kafkaDialer.Timeout,
  379. Deadline: kafkaDialer.Deadline,
  380. LocalAddr: kafkaDialer.LocalAddr,
  381. DualStack: kafkaDialer.DualStack,
  382. FallbackDelay: kafkaDialer.FallbackDelay,
  383. KeepAlive: kafkaDialer.KeepAlive,
  384. })
  385. var resolver Resolver
  386. if r, ok := kafkaDialer.Resolver.(*net.Resolver); ok {
  387. dialer.Resolver = r
  388. } else {
  389. resolver = kafkaDialer.Resolver
  390. }
  391. stats := new(writerStats)
  392. // For backward compatibility with the pre-0.4 APIs, support custom
  393. // resolvers by wrapping the dial function.
  394. dial := func(ctx context.Context, network, addr string) (net.Conn, error) {
  395. start := time.Now()
  396. defer func() {
  397. stats.dials.observe(1)
  398. stats.dialTime.observe(int64(time.Since(start)))
  399. }()
  400. address, err := lookupHost(ctx, addr, resolver)
  401. if err != nil {
  402. return nil, err
  403. }
  404. return dialer.DialContext(ctx, network, address)
  405. }
  406. idleTimeout := config.IdleConnTimeout
  407. if idleTimeout == 0 {
  408. // Historical default value of WriterConfig.IdleTimeout, 9 minutes seems
  409. // like it is way too long when there is no ping mechanism in the kafka
  410. // protocol.
  411. idleTimeout = 9 * time.Minute
  412. }
  413. metadataTTL := config.RebalanceInterval
  414. if metadataTTL == 0 {
  415. // Historical default value of WriterConfig.RebalanceInterval.
  416. metadataTTL = 15 * time.Second
  417. }
  418. transport := &Transport{
  419. Dial: dial,
  420. SASL: kafkaDialer.SASLMechanism,
  421. TLS: kafkaDialer.TLS,
  422. ClientID: kafkaDialer.ClientID,
  423. IdleTimeout: idleTimeout,
  424. MetadataTTL: metadataTTL,
  425. }
  426. w := &Writer{
  427. Addr: TCP(config.Brokers...),
  428. Topic: config.Topic,
  429. MaxAttempts: config.MaxAttempts,
  430. BatchSize: config.BatchSize,
  431. Balancer: config.Balancer,
  432. BatchBytes: int64(config.BatchBytes),
  433. BatchTimeout: config.BatchTimeout,
  434. ReadTimeout: config.ReadTimeout,
  435. WriteTimeout: config.WriteTimeout,
  436. RequiredAcks: RequiredAcks(config.RequiredAcks),
  437. Async: config.Async,
  438. Logger: config.Logger,
  439. ErrorLogger: config.ErrorLogger,
  440. Transport: transport,
  441. transport: transport,
  442. writerStats: stats,
  443. }
  444. if config.RequiredAcks == 0 {
  445. // Historically the writers created by NewWriter have used "all" as the
  446. // default value when 0 was specified.
  447. w.RequiredAcks = RequireAll
  448. }
  449. if config.CompressionCodec != nil {
  450. w.Compression = Compression(config.CompressionCodec.Code())
  451. }
  452. return w
  453. }
  454. // enter is called by WriteMessages to indicate that a new inflight operation
  455. // has started, which helps synchronize with Close and ensure that the method
  456. // does not return until all inflight operations were completed.
  457. func (w *Writer) enter() bool {
  458. w.mutex.Lock()
  459. defer w.mutex.Unlock()
  460. if w.closed {
  461. return false
  462. }
  463. w.group.Add(1)
  464. return true
  465. }
  466. // leave is called by WriteMessages to indicate that the inflight operation has
  467. // completed.
  468. func (w *Writer) leave() { w.group.Done() }
  469. // spawn starts a new asynchronous operation on the writer. This method is used
  470. // instead of starting goroutines inline to help manage the state of the
  471. // writer's wait group. The wait group is used to block Close calls until all
  472. // inflight operations have completed, therefore automatically including those
  473. // started with calls to spawn.
  474. func (w *Writer) spawn(f func()) {
  475. w.group.Add(1)
  476. go func() {
  477. defer w.group.Done()
  478. f()
  479. }()
  480. }
  481. // Close flushes pending writes, and waits for all writes to complete before
  482. // returning. Calling Close also prevents new writes from being submitted to
  483. // the writer, further calls to WriteMessages and the like will fail with
  484. // io.ErrClosedPipe.
  485. func (w *Writer) Close() error {
  486. w.mutex.Lock()
  487. // Marking the writer as closed here causes future calls to WriteMessages to
  488. // fail with io.ErrClosedPipe. Mutation of this field is synchronized on the
  489. // writer's mutex to ensure that no more increments of the wait group are
  490. // performed afterwards (which could otherwise race with the Wait below).
  491. w.closed = true
  492. // close all writers to trigger any pending batches
  493. for _, writer := range w.writers {
  494. writer.close()
  495. }
  496. for partition := range w.writers {
  497. delete(w.writers, partition)
  498. }
  499. w.mutex.Unlock()
  500. w.group.Wait()
  501. if w.transport != nil {
  502. w.transport.CloseIdleConnections()
  503. }
  504. return nil
  505. }
  506. // WriteMessages writes a batch of messages to the kafka topic configured on this
  507. // writer.
  508. //
  509. // Unless the writer was configured to write messages asynchronously, the method
  510. // blocks until all messages have been written, or until the maximum number of
  511. // attempts was reached.
  512. //
  513. // When sending synchronously and the writer's batch size is configured to be
  514. // greater than 1, this method blocks until either a full batch can be assembled
  515. // or the batch timeout is reached. The batch size and timeouts are evaluated
  516. // per partition, so the choice of Balancer can also influence the flushing
  517. // behavior. For example, the Hash balancer will require on average N * batch
  518. // size messages to trigger a flush where N is the number of partitions. The
  519. // best way to achieve good batching behavior is to share one Writer amongst
  520. // multiple go routines.
  521. //
  522. // When the method returns an error, it may be of type kafka.WriteError to allow
  523. // the caller to determine the status of each message.
  524. //
  525. // The context passed as first argument may also be used to asynchronously
  526. // cancel the operation. Note that in this case there are no guarantees made on
  527. // whether messages were written to kafka, they might also still be written
  528. // after this method has already returned, therefore it is important to not
  529. // modify byte slices of passed messages if WriteMessages returned early due
  530. // to a canceled context.
  531. // The program should assume that the whole batch failed and re-write the
  532. // messages later (which could then cause duplicates).
  533. func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
  534. if w.Addr == nil {
  535. return errors.New("kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address")
  536. }
  537. if !w.enter() {
  538. return io.ErrClosedPipe
  539. }
  540. defer w.leave()
  541. if len(msgs) == 0 {
  542. return nil
  543. }
  544. balancer := w.balancer()
  545. batchBytes := w.batchBytes()
  546. for i := range msgs {
  547. n := int64(msgs[i].totalSize())
  548. if n > batchBytes {
  549. // This error is left for backward compatibility with historical
  550. // behavior, but it can yield O(N^2) behaviors. The expectations
  551. // are that the program will check if WriteMessages returned a
  552. // MessageTooLargeError, discard the message that was exceeding
  553. // the maximum size, and try again.
  554. return messageTooLarge(msgs, i)
  555. }
  556. }
  557. // We use int32 here to half the memory footprint (compared to using int
  558. // on 64 bits architectures). We map lists of the message indexes instead
  559. // of the message values for the same reason, int32 is 4 bytes, vs a full
  560. // Message value which is 100+ bytes and contains pointers and contributes
  561. // to increasing GC work.
  562. assignments := make(map[topicPartition][]int32)
  563. for i, msg := range msgs {
  564. topic, err := w.chooseTopic(msg)
  565. if err != nil {
  566. return err
  567. }
  568. numPartitions, err := w.partitions(ctx, topic)
  569. if err != nil {
  570. return err
  571. }
  572. partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...)
  573. key := topicPartition{
  574. topic: topic,
  575. partition: int32(partition),
  576. }
  577. assignments[key] = append(assignments[key], int32(i))
  578. }
  579. batches := w.batchMessages(msgs, assignments)
  580. if w.Async {
  581. return nil
  582. }
  583. done := ctx.Done()
  584. hasErrors := false
  585. for batch := range batches {
  586. select {
  587. case <-done:
  588. return ctx.Err()
  589. case <-batch.done:
  590. if batch.err != nil {
  591. hasErrors = true
  592. }
  593. }
  594. }
  595. if !hasErrors {
  596. return nil
  597. }
  598. werr := make(WriteErrors, len(msgs))
  599. for batch, indexes := range batches {
  600. for _, i := range indexes {
  601. werr[i] = batch.err
  602. }
  603. }
  604. return werr
  605. }
  606. func (w *Writer) batchMessages(messages []Message, assignments map[topicPartition][]int32) map[*writeBatch][]int32 {
  607. var batches map[*writeBatch][]int32
  608. if !w.Async {
  609. batches = make(map[*writeBatch][]int32, len(assignments))
  610. }
  611. w.mutex.Lock()
  612. defer w.mutex.Unlock()
  613. if w.writers == nil {
  614. w.writers = map[topicPartition]*partitionWriter{}
  615. }
  616. for key, indexes := range assignments {
  617. writer := w.writers[key]
  618. if writer == nil {
  619. writer = newPartitionWriter(w, key)
  620. w.writers[key] = writer
  621. }
  622. wbatches := writer.writeMessages(messages, indexes)
  623. for batch, idxs := range wbatches {
  624. batches[batch] = idxs
  625. }
  626. }
  627. return batches
  628. }
  629. func (w *Writer) produce(key topicPartition, batch *writeBatch) (*ProduceResponse, error) {
  630. timeout := w.writeTimeout()
  631. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  632. defer cancel()
  633. return w.client(timeout).Produce(ctx, &ProduceRequest{
  634. Partition: int(key.partition),
  635. Topic: key.topic,
  636. RequiredAcks: w.RequiredAcks,
  637. Compression: w.Compression,
  638. Records: &writerRecords{
  639. msgs: batch.msgs,
  640. },
  641. })
  642. }
  643. func (w *Writer) partitions(ctx context.Context, topic string) (int, error) {
  644. client := w.client(w.readTimeout())
  645. // Here we use the transport directly as an optimization to avoid the
  646. // construction of temporary request and response objects made by the
  647. // (*Client).Metadata API.
  648. //
  649. // It is expected that the transport will optimize this request by
  650. // caching recent results (the kafka.Transport types does).
  651. r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{
  652. TopicNames: []string{topic},
  653. AllowAutoTopicCreation: w.AllowAutoTopicCreation,
  654. })
  655. if err != nil {
  656. return 0, err
  657. }
  658. for _, t := range r.(*metadataAPI.Response).Topics {
  659. if t.Name == topic {
  660. // This should always hit, unless kafka has a bug.
  661. if t.ErrorCode != 0 {
  662. return 0, Error(t.ErrorCode)
  663. }
  664. return len(t.Partitions), nil
  665. }
  666. }
  667. return 0, UnknownTopicOrPartition
  668. }
  669. func (w *Writer) client(timeout time.Duration) *Client {
  670. return &Client{
  671. Addr: w.Addr,
  672. Transport: w.Transport,
  673. Timeout: timeout,
  674. }
  675. }
  676. func (w *Writer) balancer() Balancer {
  677. if w.Balancer != nil {
  678. return w.Balancer
  679. }
  680. return &w.roundRobin
  681. }
  682. func (w *Writer) maxAttempts() int {
  683. if w.MaxAttempts > 0 {
  684. return w.MaxAttempts
  685. }
  686. // TODO: this is a very high default, if something has failed 9 times it
  687. // seems unlikely it will succeed on the 10th attempt. However, it does
  688. // carry the risk to greatly increase the volume of requests sent to the
  689. // kafka cluster. We should consider reducing this default (3?).
  690. return 10
  691. }
  692. func (w *Writer) writeBackoffMin() time.Duration {
  693. if w.WriteBackoffMin > 0 {
  694. return w.WriteBackoffMin
  695. }
  696. return 100 * time.Millisecond
  697. }
  698. func (w *Writer) writeBackoffMax() time.Duration {
  699. if w.WriteBackoffMax > 0 {
  700. return w.WriteBackoffMax
  701. }
  702. return 1 * time.Second
  703. }
  704. func (w *Writer) batchSize() int {
  705. if w.BatchSize > 0 {
  706. return w.BatchSize
  707. }
  708. return 100
  709. }
  710. func (w *Writer) batchBytes() int64 {
  711. if w.BatchBytes > 0 {
  712. return w.BatchBytes
  713. }
  714. return 1048576
  715. }
  716. func (w *Writer) batchTimeout() time.Duration {
  717. if w.BatchTimeout > 0 {
  718. return w.BatchTimeout
  719. }
  720. return 1 * time.Second
  721. }
  722. func (w *Writer) readTimeout() time.Duration {
  723. if w.ReadTimeout > 0 {
  724. return w.ReadTimeout
  725. }
  726. return 10 * time.Second
  727. }
  728. func (w *Writer) writeTimeout() time.Duration {
  729. if w.WriteTimeout > 0 {
  730. return w.WriteTimeout
  731. }
  732. return 10 * time.Second
  733. }
  734. func (w *Writer) withLogger(do func(Logger)) {
  735. if w.Logger != nil {
  736. do(w.Logger)
  737. }
  738. }
  739. func (w *Writer) withErrorLogger(do func(Logger)) {
  740. if w.ErrorLogger != nil {
  741. do(w.ErrorLogger)
  742. } else {
  743. w.withLogger(do)
  744. }
  745. }
  746. func (w *Writer) stats() *writerStats {
  747. w.once.Do(func() {
  748. // This field is not nil when the writer was constructed with NewWriter
  749. // to share the value with the dial function and count dials.
  750. if w.writerStats == nil {
  751. w.writerStats = new(writerStats)
  752. }
  753. })
  754. return w.writerStats
  755. }
  756. // Stats returns a snapshot of the writer stats since the last time the method
  757. // was called, or since the writer was created if it is called for the first
  758. // time.
  759. //
  760. // A typical use of this method is to spawn a goroutine that will periodically
  761. // call Stats on a kafka writer and report the metrics to a stats collection
  762. // system.
  763. func (w *Writer) Stats() WriterStats {
  764. stats := w.stats()
  765. return WriterStats{
  766. Dials: stats.dials.snapshot(),
  767. Writes: stats.writes.snapshot(),
  768. Messages: stats.messages.snapshot(),
  769. Bytes: stats.bytes.snapshot(),
  770. Errors: stats.errors.snapshot(),
  771. DialTime: stats.dialTime.snapshotDuration(),
  772. BatchTime: stats.batchTime.snapshotDuration(),
  773. BatchQueueTime: stats.batchQueueTime.snapshotDuration(),
  774. WriteTime: stats.writeTime.snapshotDuration(),
  775. WaitTime: stats.waitTime.snapshotDuration(),
  776. Retries: stats.retries.snapshot(),
  777. BatchSize: stats.batchSize.snapshot(),
  778. BatchBytes: stats.batchSizeBytes.snapshot(),
  779. MaxAttempts: int64(w.maxAttempts()),
  780. WriteBackoffMin: w.writeBackoffMin(),
  781. WriteBackoffMax: w.writeBackoffMax(),
  782. MaxBatchSize: int64(w.batchSize()),
  783. BatchTimeout: w.batchTimeout(),
  784. ReadTimeout: w.readTimeout(),
  785. WriteTimeout: w.writeTimeout(),
  786. RequiredAcks: int64(w.RequiredAcks),
  787. Async: w.Async,
  788. Topic: w.Topic,
  789. }
  790. }
  791. func (w *Writer) chooseTopic(msg Message) (string, error) {
  792. // w.Topic and msg.Topic are mutually exclusive, meaning only 1 must be set
  793. // otherwise we will return an error.
  794. if w.Topic != "" && msg.Topic != "" {
  795. return "", errors.New("kafka.(*Writer): Topic must not be specified for both Writer and Message")
  796. } else if w.Topic == "" && msg.Topic == "" {
  797. return "", errors.New("kafka.(*Writer): Topic must be specified for Writer or Message")
  798. }
  799. // now we choose the topic, depending on which one is not empty
  800. if msg.Topic != "" {
  801. return msg.Topic, nil
  802. }
  803. return w.Topic, nil
  804. }
  805. type batchQueue struct {
  806. queue []*writeBatch
  807. // Pointers are used here to make `go vet` happy, and avoid copying mutexes.
  808. // It may be better to revert these to non-pointers and avoid the copies in
  809. // a different way.
  810. mutex *sync.Mutex
  811. cond *sync.Cond
  812. closed bool
  813. }
  814. func (b *batchQueue) Put(batch *writeBatch) bool {
  815. b.cond.L.Lock()
  816. defer b.cond.L.Unlock()
  817. defer b.cond.Broadcast()
  818. if b.closed {
  819. return false
  820. }
  821. b.queue = append(b.queue, batch)
  822. return true
  823. }
  824. func (b *batchQueue) Get() *writeBatch {
  825. b.cond.L.Lock()
  826. defer b.cond.L.Unlock()
  827. for len(b.queue) == 0 && !b.closed {
  828. b.cond.Wait()
  829. }
  830. if len(b.queue) == 0 {
  831. return nil
  832. }
  833. batch := b.queue[0]
  834. b.queue[0] = nil
  835. b.queue = b.queue[1:]
  836. return batch
  837. }
  838. func (b *batchQueue) Close() {
  839. b.cond.L.Lock()
  840. defer b.cond.L.Unlock()
  841. defer b.cond.Broadcast()
  842. b.closed = true
  843. }
  844. func newBatchQueue(initialSize int) batchQueue {
  845. bq := batchQueue{
  846. queue: make([]*writeBatch, 0, initialSize),
  847. mutex: &sync.Mutex{},
  848. cond: &sync.Cond{},
  849. }
  850. bq.cond.L = bq.mutex
  851. return bq
  852. }
  853. // partitionWriter is a writer for a topic-partion pair. It maintains messaging order
  854. // across batches of messages.
  855. type partitionWriter struct {
  856. meta topicPartition
  857. queue batchQueue
  858. mutex sync.Mutex
  859. currBatch *writeBatch
  860. // reference to the writer that owns this batch. Used for the produce logic
  861. // as well as stat tracking
  862. w *Writer
  863. }
  864. func newPartitionWriter(w *Writer, key topicPartition) *partitionWriter {
  865. writer := &partitionWriter{
  866. meta: key,
  867. queue: newBatchQueue(10),
  868. w: w,
  869. }
  870. w.spawn(writer.writeBatches)
  871. return writer
  872. }
  873. func (ptw *partitionWriter) writeBatches() {
  874. for {
  875. batch := ptw.queue.Get()
  876. // The only time we can return nil is when the queue is closed
  877. // and empty. If the queue is closed that means
  878. // the Writer is closed so once we're here it's time to exit.
  879. if batch == nil {
  880. return
  881. }
  882. ptw.writeBatch(batch)
  883. }
  884. }
  885. func (ptw *partitionWriter) writeMessages(msgs []Message, indexes []int32) map[*writeBatch][]int32 {
  886. ptw.mutex.Lock()
  887. defer ptw.mutex.Unlock()
  888. batchSize := ptw.w.batchSize()
  889. batchBytes := ptw.w.batchBytes()
  890. var batches map[*writeBatch][]int32
  891. if !ptw.w.Async {
  892. batches = make(map[*writeBatch][]int32, 1)
  893. }
  894. for _, i := range indexes {
  895. assignMessage:
  896. batch := ptw.currBatch
  897. if batch == nil {
  898. batch = ptw.newWriteBatch()
  899. ptw.currBatch = batch
  900. }
  901. if !batch.add(msgs[i], batchSize, batchBytes) {
  902. batch.trigger()
  903. ptw.queue.Put(batch)
  904. ptw.currBatch = nil
  905. goto assignMessage
  906. }
  907. if batch.full(batchSize, batchBytes) {
  908. batch.trigger()
  909. ptw.queue.Put(batch)
  910. ptw.currBatch = nil
  911. }
  912. if !ptw.w.Async {
  913. batches[batch] = append(batches[batch], i)
  914. }
  915. }
  916. return batches
  917. }
  918. // ptw.w can be accessed here because this is called with the lock ptw.mutex already held.
  919. func (ptw *partitionWriter) newWriteBatch() *writeBatch {
  920. batch := newWriteBatch(time.Now(), ptw.w.batchTimeout())
  921. ptw.w.spawn(func() { ptw.awaitBatch(batch) })
  922. return batch
  923. }
  924. // awaitBatch waits for a batch to either fill up or time out.
  925. // If the batch is full it only stops the timer, if the timer
  926. // expires it will queue the batch for writing if needed.
  927. func (ptw *partitionWriter) awaitBatch(batch *writeBatch) {
  928. select {
  929. case <-batch.timer.C:
  930. ptw.mutex.Lock()
  931. // detach the batch from the writer if we're still attached
  932. // and queue for writing.
  933. // Only the current batch can expire, all previous batches were already written to the queue.
  934. // If writeMesseages locks pw.mutex after the timer fires but before this goroutine
  935. // can lock pw.mutex it will either have filled the batch and enqueued it which will mean
  936. // pw.currBatch != batch so we just move on.
  937. // Otherwise, we detach the batch from the ptWriter and enqueue it for writing.
  938. if ptw.currBatch == batch {
  939. ptw.queue.Put(batch)
  940. ptw.currBatch = nil
  941. }
  942. ptw.mutex.Unlock()
  943. case <-batch.ready:
  944. // The batch became full, it was removed from the ptwriter and its
  945. // ready channel was closed. We need to close the timer to avoid
  946. // having it leak until it expires.
  947. batch.timer.Stop()
  948. }
  949. stats := ptw.w.stats()
  950. stats.batchQueueTime.observe(int64(time.Since(batch.time)))
  951. }
  952. func (ptw *partitionWriter) writeBatch(batch *writeBatch) {
  953. stats := ptw.w.stats()
  954. stats.batchTime.observe(int64(time.Since(batch.time)))
  955. stats.batchSize.observe(int64(len(batch.msgs)))
  956. stats.batchSizeBytes.observe(batch.bytes)
  957. var res *ProduceResponse
  958. var err error
  959. key := ptw.meta
  960. for attempt, maxAttempts := 0, ptw.w.maxAttempts(); attempt < maxAttempts; attempt++ {
  961. if attempt != 0 {
  962. stats.retries.observe(1)
  963. // TODO: should there be a way to asynchronously cancel this
  964. // operation?
  965. //
  966. // * If all goroutines that added message to this batch have stopped
  967. // waiting for it, should we abort?
  968. //
  969. // * If the writer has been closed? It reduces the durability
  970. // guarantees to abort, but may be better to avoid long wait times
  971. // on close.
  972. //
  973. delay := backoff(attempt, ptw.w.writeBackoffMin(), ptw.w.writeBackoffMax())
  974. ptw.w.withLogger(func(log Logger) {
  975. log.Printf("backing off %s writing %d messages to %s (partition: %d)", delay, len(batch.msgs), key.topic, key.partition)
  976. })
  977. time.Sleep(delay)
  978. }
  979. ptw.w.withLogger(func(log Logger) {
  980. log.Printf("writing %d messages to %s (partition: %d)", len(batch.msgs), key.topic, key.partition)
  981. })
  982. start := time.Now()
  983. res, err = ptw.w.produce(key, batch)
  984. stats.writes.observe(1)
  985. stats.messages.observe(int64(len(batch.msgs)))
  986. stats.bytes.observe(batch.bytes)
  987. // stats.writeTime used to report the duration of WriteMessages, but the
  988. // implementation was broken and reporting values in the nanoseconds
  989. // range. In kafka-go 0.4, we recylced this value to instead report the
  990. // duration of produce requests, and changed the stats.waitTime value to
  991. // report the time that kafka has throttled the requests for.
  992. stats.writeTime.observe(int64(time.Since(start)))
  993. if res != nil {
  994. err = res.Error
  995. stats.waitTime.observe(int64(res.Throttle))
  996. }
  997. if err == nil {
  998. break
  999. }
  1000. stats.errors.observe(1)
  1001. ptw.w.withErrorLogger(func(log Logger) {
  1002. log.Printf("error writing messages to %s (partition %d, attempt %d): %s", key.topic, key.partition, attempt, err)
  1003. })
  1004. if !isTemporary(err) && !isTransientNetworkError(err) {
  1005. break
  1006. }
  1007. }
  1008. if res != nil {
  1009. for i := range batch.msgs {
  1010. m := &batch.msgs[i]
  1011. m.Topic = key.topic
  1012. m.Partition = int(key.partition)
  1013. m.Offset = res.BaseOffset + int64(i)
  1014. if m.Time.IsZero() {
  1015. m.Time = res.LogAppendTime
  1016. }
  1017. }
  1018. }
  1019. if ptw.w.Completion != nil {
  1020. ptw.w.Completion(batch.msgs, err)
  1021. }
  1022. batch.complete(err)
  1023. }
  1024. func (ptw *partitionWriter) close() {
  1025. ptw.mutex.Lock()
  1026. defer ptw.mutex.Unlock()
  1027. if ptw.currBatch != nil {
  1028. batch := ptw.currBatch
  1029. ptw.queue.Put(batch)
  1030. ptw.currBatch = nil
  1031. batch.trigger()
  1032. }
  1033. ptw.queue.Close()
  1034. }
  1035. type writeBatch struct {
  1036. time time.Time
  1037. msgs []Message
  1038. size int
  1039. bytes int64
  1040. ready chan struct{}
  1041. done chan struct{}
  1042. timer *time.Timer
  1043. err error // result of the batch completion
  1044. }
  1045. func newWriteBatch(now time.Time, timeout time.Duration) *writeBatch {
  1046. return &writeBatch{
  1047. time: now,
  1048. ready: make(chan struct{}),
  1049. done: make(chan struct{}),
  1050. timer: time.NewTimer(timeout),
  1051. }
  1052. }
  1053. func (b *writeBatch) add(msg Message, maxSize int, maxBytes int64) bool {
  1054. bytes := int64(msg.totalSize())
  1055. if b.size > 0 && (b.bytes+bytes) > maxBytes {
  1056. return false
  1057. }
  1058. if cap(b.msgs) == 0 {
  1059. b.msgs = make([]Message, 0, maxSize)
  1060. }
  1061. b.msgs = append(b.msgs, msg)
  1062. b.size++
  1063. b.bytes += bytes
  1064. return true
  1065. }
  1066. func (b *writeBatch) full(maxSize int, maxBytes int64) bool {
  1067. return b.size >= maxSize || b.bytes >= maxBytes
  1068. }
  1069. func (b *writeBatch) trigger() {
  1070. close(b.ready)
  1071. }
  1072. func (b *writeBatch) complete(err error) {
  1073. b.err = err
  1074. close(b.done)
  1075. }
  1076. type writerRecords struct {
  1077. msgs []Message
  1078. index int
  1079. record Record
  1080. key bytesReadCloser
  1081. value bytesReadCloser
  1082. }
  1083. func (r *writerRecords) ReadRecord() (*Record, error) {
  1084. if r.index >= 0 && r.index < len(r.msgs) {
  1085. m := &r.msgs[r.index]
  1086. r.index++
  1087. r.record = Record{
  1088. Time: m.Time,
  1089. Headers: m.Headers,
  1090. }
  1091. if m.Key != nil {
  1092. r.key.Reset(m.Key)
  1093. r.record.Key = &r.key
  1094. }
  1095. if m.Value != nil {
  1096. r.value.Reset(m.Value)
  1097. r.record.Value = &r.value
  1098. }
  1099. return &r.record, nil
  1100. }
  1101. return nil, io.EOF
  1102. }
  1103. type bytesReadCloser struct{ bytes.Reader }
  1104. func (*bytesReadCloser) Close() error { return nil }
  1105. // A cache of []int values passed to balancers of writers, used to amortize the
  1106. // heap allocation of the partition index lists.
  1107. //
  1108. // With hindsight, the use of `...int` to pass the partition list to Balancers
  1109. // was not the best design choice: kafka partition numbers are monotonically
  1110. // increasing, we could have simply passed the number of partitions instead.
  1111. // If we ever revisit this API, we can hopefully remove this cache.
  1112. var partitionsCache atomic.Value
  1113. func loadCachedPartitions(numPartitions int) []int {
  1114. partitions, ok := partitionsCache.Load().([]int)
  1115. if ok && len(partitions) >= numPartitions {
  1116. return partitions[:numPartitions]
  1117. }
  1118. const alignment = 128
  1119. n := ((numPartitions / alignment) + 1) * alignment
  1120. partitions = make([]int, n)
  1121. for i := range partitions {
  1122. partitions[i] = i
  1123. }
  1124. partitionsCache.Store(partitions)
  1125. return partitions[:numPartitions]
  1126. }