1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309 |
- package kafka
- import (
- "bytes"
- "context"
- "errors"
- "io"
- "net"
- "sync"
- "sync/atomic"
- "time"
- metadataAPI "github.com/segmentio/kafka-go/protocol/metadata"
- )
- // The Writer type provides the implementation of a producer of kafka messages
- // that automatically distributes messages across partitions of a single topic
- // using a configurable balancing policy.
- //
- // Writes manage the dispatch of messages across partitions of the topic they
- // are configured to write to using a Balancer, and aggregate batches to
- // optimize the writes to kafka.
- //
- // Writers may be configured to be used synchronously or asynchronously. When
- // use synchronously, calls to WriteMessages block until the messages have been
- // written to kafka. In this mode, the program should inspect the error returned
- // by the function and test if it an instance of kafka.WriteErrors in order to
- // identify which messages have succeeded or failed, for example:
- //
- // // Construct a synchronous writer (the default mode).
- // w := &kafka.Writer{
- // Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
- // Topic: "topic-A",
- // RequiredAcks: kafka.RequireAll,
- // }
- //
- // ...
- //
- // // Passing a context can prevent the operation from blocking indefinitely.
- // switch err := w.WriteMessages(ctx, msgs...).(type) {
- // case nil:
- // case kafka.WriteErrors:
- // for i := range msgs {
- // if err[i] != nil {
- // // handle the error writing msgs[i]
- // ...
- // }
- // }
- // default:
- // // handle other errors
- // ...
- // }
- //
- // In asynchronous mode, the program may configure a completion handler on the
- // writer to receive notifications of messages being written to kafka:
- //
- // w := &kafka.Writer{
- // Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
- // Topic: "topic-A",
- // RequiredAcks: kafka.RequireAll,
- // Async: true, // make the writer asynchronous
- // Completion: func(messages []kafka.Message, err error) {
- // ...
- // },
- // }
- //
- // ...
- //
- // // Because the writer is asynchronous, there is no need for the context to
- // // be cancelled, the call will never block.
- // if err := w.WriteMessages(context.Background(), msgs...); err != nil {
- // // Only validation errors would be reported in this case.
- // ...
- // }
- //
- // Methods of Writer are safe to use concurrently from multiple goroutines,
- // however the writer configuration should not be modified after first use.
- type Writer struct {
- // Address of the kafka cluster that this writer is configured to send
- // messages to.
- //
- // This field is required, attempting to write messages to a writer with a
- // nil address will error.
- Addr net.Addr
- // Topic is the name of the topic that the writer will produce messages to.
- //
- // Setting this field or not is a mutually exclusive option. If you set Topic
- // here, you must not set Topic for any produced Message. Otherwise, if you do
- // not set Topic, every Message must have Topic specified.
- Topic string
- // The balancer used to distribute messages across partitions.
- //
- // The default is to use a round-robin distribution.
- Balancer Balancer
- // Limit on how many attempts will be made to deliver a message.
- //
- // The default is to try at most 10 times.
- MaxAttempts int
- // WriteBackoffMin optionally sets the smallest amount of time the writer waits before
- // it attempts to write a batch of messages
- //
- // Default: 100ms
- WriteBackoffMin time.Duration
- // WriteBackoffMax optionally sets the maximum amount of time the writer waits before
- // it attempts to write a batch of messages
- //
- // Default: 1s
- WriteBackoffMax time.Duration
- // Limit on how many messages will be buffered before being sent to a
- // partition.
- //
- // The default is to use a target batch size of 100 messages.
- BatchSize int
- // Limit the maximum size of a request in bytes before being sent to
- // a partition.
- //
- // The default is to use a kafka default value of 1048576.
- BatchBytes int64
- // Time limit on how often incomplete message batches will be flushed to
- // kafka.
- //
- // The default is to flush at least every second.
- BatchTimeout time.Duration
- // Timeout for read operations performed by the Writer.
- //
- // Defaults to 10 seconds.
- ReadTimeout time.Duration
- // Timeout for write operation performed by the Writer.
- //
- // Defaults to 10 seconds.
- WriteTimeout time.Duration
- // Number of acknowledges from partition replicas required before receiving
- // a response to a produce request, the following values are supported:
- //
- // RequireNone (0) fire-and-forget, do not wait for acknowledgements from the
- // RequireOne (1) wait for the leader to acknowledge the writes
- // RequireAll (-1) wait for the full ISR to acknowledge the writes
- //
- // Defaults to RequireNone.
- RequiredAcks RequiredAcks
- // Setting this flag to true causes the WriteMessages method to never block.
- // It also means that errors are ignored since the caller will not receive
- // the returned value. Use this only if you don't care about guarantees of
- // whether the messages were written to kafka.
- //
- // Defaults to false.
- Async bool
- // An optional function called when the writer succeeds or fails the
- // delivery of messages to a kafka partition. When writing the messages
- // fails, the `err` parameter will be non-nil.
- //
- // The messages that the Completion function is called with have their
- // topic, partition, offset, and time set based on the Produce responses
- // received from kafka. All messages passed to a call to the function have
- // been written to the same partition. The keys and values of messages are
- // referencing the original byte slices carried by messages in the calls to
- // WriteMessages.
- //
- // The function is called from goroutines started by the writer. Calls to
- // Close will block on the Completion function calls. When the Writer is
- // not writing asynchronously, the WriteMessages call will also block on
- // Completion function, which is a useful guarantee if the byte slices
- // for the message keys and values are intended to be reused after the
- // WriteMessages call returned.
- //
- // If a completion function panics, the program terminates because the
- // panic is not recovered by the writer and bubbles up to the top of the
- // goroutine's call stack.
- Completion func(messages []Message, err error)
- // Compression set the compression codec to be used to compress messages.
- Compression Compression
- // If not nil, specifies a logger used to report internal changes within the
- // writer.
- Logger Logger
- // ErrorLogger is the logger used to report errors. If nil, the writer falls
- // back to using Logger instead.
- ErrorLogger Logger
- // A transport used to send messages to kafka clusters.
- //
- // If nil, DefaultTransport is used.
- Transport RoundTripper
- // AllowAutoTopicCreation notifies writer to create topic if missing.
- AllowAutoTopicCreation bool
- // Manages the current set of partition-topic writers.
- group sync.WaitGroup
- mutex sync.Mutex
- closed bool
- writers map[topicPartition]*partitionWriter
- // writer stats are all made of atomic values, no need for synchronization.
- // Use a pointer to ensure 64-bit alignment of the values. The once value is
- // used to lazily create the value when first used, allowing programs to use
- // the zero-value value of Writer.
- once sync.Once
- *writerStats
- // If no balancer is configured, the writer uses this one. RoundRobin values
- // are safe to use concurrently from multiple goroutines, there is no need
- // for extra synchronization to access this field.
- roundRobin RoundRobin
- // non-nil when a transport was created by NewWriter, remove in 1.0.
- transport *Transport
- }
- // WriterConfig is a configuration type used to create new instances of Writer.
- //
- // DEPRECATED: writer values should be configured directly by assigning their
- // exported fields. This type is kept for backward compatibility, and will be
- // removed in version 1.0.
- type WriterConfig struct {
- // The list of brokers used to discover the partitions available on the
- // kafka cluster.
- //
- // This field is required, attempting to create a writer with an empty list
- // of brokers will panic.
- Brokers []string
- // The topic that the writer will produce messages to.
- //
- // If provided, this will be used to set the topic for all produced messages.
- // If not provided, each Message must specify a topic for itself. This must be
- // mutually exclusive, otherwise the Writer will return an error.
- Topic string
- // The dialer used by the writer to establish connections to the kafka
- // cluster.
- //
- // If nil, the default dialer is used instead.
- Dialer *Dialer
- // The balancer used to distribute messages across partitions.
- //
- // The default is to use a round-robin distribution.
- Balancer Balancer
- // Limit on how many attempts will be made to deliver a message.
- //
- // The default is to try at most 10 times.
- MaxAttempts int
- // DEPRECATED: in versions prior to 0.4, the writer used channels internally
- // to dispatch messages to partitions. This has been replaced by an in-memory
- // aggregation of batches which uses shared state instead of message passing,
- // making this option unnecessary.
- QueueCapacity int
- // Limit on how many messages will be buffered before being sent to a
- // partition.
- //
- // The default is to use a target batch size of 100 messages.
- BatchSize int
- // Limit the maximum size of a request in bytes before being sent to
- // a partition.
- //
- // The default is to use a kafka default value of 1048576.
- BatchBytes int
- // Time limit on how often incomplete message batches will be flushed to
- // kafka.
- //
- // The default is to flush at least every second.
- BatchTimeout time.Duration
- // Timeout for read operations performed by the Writer.
- //
- // Defaults to 10 seconds.
- ReadTimeout time.Duration
- // Timeout for write operation performed by the Writer.
- //
- // Defaults to 10 seconds.
- WriteTimeout time.Duration
- // DEPRECATED: in versions prior to 0.4, the writer used to maintain a cache
- // the topic layout. With the change to use a transport to manage connections,
- // the responsibility of syncing the cluster layout has been delegated to the
- // transport.
- RebalanceInterval time.Duration
- // DEPRECATED: in versions prior to 0.4, the writer used to manage connections
- // to the kafka cluster directly. With the change to use a transport to manage
- // connections, the writer has no connections to manage directly anymore.
- IdleConnTimeout time.Duration
- // Number of acknowledges from partition replicas required before receiving
- // a response to a produce request. The default is -1, which means to wait for
- // all replicas, and a value above 0 is required to indicate how many replicas
- // should acknowledge a message to be considered successful.
- RequiredAcks int
- // Setting this flag to true causes the WriteMessages method to never block.
- // It also means that errors are ignored since the caller will not receive
- // the returned value. Use this only if you don't care about guarantees of
- // whether the messages were written to kafka.
- Async bool
- // CompressionCodec set the codec to be used to compress Kafka messages.
- CompressionCodec
- // If not nil, specifies a logger used to report internal changes within the
- // writer.
- Logger Logger
- // ErrorLogger is the logger used to report errors. If nil, the writer falls
- // back to using Logger instead.
- ErrorLogger Logger
- }
- type topicPartition struct {
- topic string
- partition int32
- }
- // Validate method validates WriterConfig properties.
- func (config *WriterConfig) Validate() error {
- if len(config.Brokers) == 0 {
- return errors.New("cannot create a kafka writer with an empty list of brokers")
- }
- return nil
- }
- // WriterStats is a data structure returned by a call to Writer.Stats that
- // exposes details about the behavior of the writer.
- type WriterStats struct {
- Writes int64 `metric:"kafka.writer.write.count" type:"counter"`
- Messages int64 `metric:"kafka.writer.message.count" type:"counter"`
- Bytes int64 `metric:"kafka.writer.message.bytes" type:"counter"`
- Errors int64 `metric:"kafka.writer.error.count" type:"counter"`
- BatchTime DurationStats `metric:"kafka.writer.batch.seconds"`
- BatchQueueTime DurationStats `metric:"kafka.writer.batch.queue.seconds"`
- WriteTime DurationStats `metric:"kafka.writer.write.seconds"`
- WaitTime DurationStats `metric:"kafka.writer.wait.seconds"`
- Retries int64 `metric:"kafka.writer.retries.count" type:"counter"`
- BatchSize SummaryStats `metric:"kafka.writer.batch.size"`
- BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"`
- MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
- WriteBackoffMin time.Duration `metric:"kafka.writer.backoff.min" type:"gauge"`
- WriteBackoffMax time.Duration `metric:"kafka.writer.backoff.max" type:"gauge"`
- MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"`
- BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"`
- ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"`
- WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"`
- RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"`
- Async bool `metric:"kafka.writer.async" type:"gauge"`
- Topic string `tag:"topic"`
- // DEPRECATED: these fields will only be reported for backward compatibility
- // if the Writer was constructed with NewWriter.
- Dials int64 `metric:"kafka.writer.dial.count" type:"counter"`
- DialTime DurationStats `metric:"kafka.writer.dial.seconds"`
- // DEPRECATED: these fields were meaningful prior to kafka-go 0.4, changes
- // to the internal implementation and the introduction of the transport type
- // made them unnecessary.
- //
- // The values will be zero but are left for backward compatibility to avoid
- // breaking programs that used these fields.
- Rebalances int64
- RebalanceInterval time.Duration
- QueueLength int64
- QueueCapacity int64
- ClientID string
- }
- // writerStats is a struct that contains statistics on a writer.
- //
- // Since atomic is used to mutate the statistics the values must be 64-bit aligned.
- // This is easily accomplished by always allocating this struct directly, (i.e. using a pointer to the struct).
- // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
- type writerStats struct {
- dials counter
- writes counter
- messages counter
- bytes counter
- errors counter
- dialTime summary
- batchTime summary
- batchQueueTime summary
- writeTime summary
- waitTime summary
- retries counter
- batchSize summary
- batchSizeBytes summary
- }
- // NewWriter creates and returns a new Writer configured with config.
- //
- // DEPRECATED: Writer value can be instantiated and configured directly,
- // this function is retained for backward compatibility and will be removed
- // in version 1.0.
- func NewWriter(config WriterConfig) *Writer {
- if err := config.Validate(); err != nil {
- panic(err)
- }
- if config.Dialer == nil {
- config.Dialer = DefaultDialer
- }
- if config.Balancer == nil {
- config.Balancer = &RoundRobin{}
- }
- // Converts the pre-0.4 Dialer API into a Transport.
- kafkaDialer := DefaultDialer
- if config.Dialer != nil {
- kafkaDialer = config.Dialer
- }
- dialer := (&net.Dialer{
- Timeout: kafkaDialer.Timeout,
- Deadline: kafkaDialer.Deadline,
- LocalAddr: kafkaDialer.LocalAddr,
- DualStack: kafkaDialer.DualStack,
- FallbackDelay: kafkaDialer.FallbackDelay,
- KeepAlive: kafkaDialer.KeepAlive,
- })
- var resolver Resolver
- if r, ok := kafkaDialer.Resolver.(*net.Resolver); ok {
- dialer.Resolver = r
- } else {
- resolver = kafkaDialer.Resolver
- }
- stats := new(writerStats)
- // For backward compatibility with the pre-0.4 APIs, support custom
- // resolvers by wrapping the dial function.
- dial := func(ctx context.Context, network, addr string) (net.Conn, error) {
- start := time.Now()
- defer func() {
- stats.dials.observe(1)
- stats.dialTime.observe(int64(time.Since(start)))
- }()
- address, err := lookupHost(ctx, addr, resolver)
- if err != nil {
- return nil, err
- }
- return dialer.DialContext(ctx, network, address)
- }
- idleTimeout := config.IdleConnTimeout
- if idleTimeout == 0 {
- // Historical default value of WriterConfig.IdleTimeout, 9 minutes seems
- // like it is way too long when there is no ping mechanism in the kafka
- // protocol.
- idleTimeout = 9 * time.Minute
- }
- metadataTTL := config.RebalanceInterval
- if metadataTTL == 0 {
- // Historical default value of WriterConfig.RebalanceInterval.
- metadataTTL = 15 * time.Second
- }
- transport := &Transport{
- Dial: dial,
- SASL: kafkaDialer.SASLMechanism,
- TLS: kafkaDialer.TLS,
- ClientID: kafkaDialer.ClientID,
- IdleTimeout: idleTimeout,
- MetadataTTL: metadataTTL,
- }
- w := &Writer{
- Addr: TCP(config.Brokers...),
- Topic: config.Topic,
- MaxAttempts: config.MaxAttempts,
- BatchSize: config.BatchSize,
- Balancer: config.Balancer,
- BatchBytes: int64(config.BatchBytes),
- BatchTimeout: config.BatchTimeout,
- ReadTimeout: config.ReadTimeout,
- WriteTimeout: config.WriteTimeout,
- RequiredAcks: RequiredAcks(config.RequiredAcks),
- Async: config.Async,
- Logger: config.Logger,
- ErrorLogger: config.ErrorLogger,
- Transport: transport,
- transport: transport,
- writerStats: stats,
- }
- if config.RequiredAcks == 0 {
- // Historically the writers created by NewWriter have used "all" as the
- // default value when 0 was specified.
- w.RequiredAcks = RequireAll
- }
- if config.CompressionCodec != nil {
- w.Compression = Compression(config.CompressionCodec.Code())
- }
- return w
- }
- // enter is called by WriteMessages to indicate that a new inflight operation
- // has started, which helps synchronize with Close and ensure that the method
- // does not return until all inflight operations were completed.
- func (w *Writer) enter() bool {
- w.mutex.Lock()
- defer w.mutex.Unlock()
- if w.closed {
- return false
- }
- w.group.Add(1)
- return true
- }
- // leave is called by WriteMessages to indicate that the inflight operation has
- // completed.
- func (w *Writer) leave() { w.group.Done() }
- // spawn starts a new asynchronous operation on the writer. This method is used
- // instead of starting goroutines inline to help manage the state of the
- // writer's wait group. The wait group is used to block Close calls until all
- // inflight operations have completed, therefore automatically including those
- // started with calls to spawn.
- func (w *Writer) spawn(f func()) {
- w.group.Add(1)
- go func() {
- defer w.group.Done()
- f()
- }()
- }
- // Close flushes pending writes, and waits for all writes to complete before
- // returning. Calling Close also prevents new writes from being submitted to
- // the writer, further calls to WriteMessages and the like will fail with
- // io.ErrClosedPipe.
- func (w *Writer) Close() error {
- w.mutex.Lock()
- // Marking the writer as closed here causes future calls to WriteMessages to
- // fail with io.ErrClosedPipe. Mutation of this field is synchronized on the
- // writer's mutex to ensure that no more increments of the wait group are
- // performed afterwards (which could otherwise race with the Wait below).
- w.closed = true
- // close all writers to trigger any pending batches
- for _, writer := range w.writers {
- writer.close()
- }
- for partition := range w.writers {
- delete(w.writers, partition)
- }
- w.mutex.Unlock()
- w.group.Wait()
- if w.transport != nil {
- w.transport.CloseIdleConnections()
- }
- return nil
- }
- // WriteMessages writes a batch of messages to the kafka topic configured on this
- // writer.
- //
- // Unless the writer was configured to write messages asynchronously, the method
- // blocks until all messages have been written, or until the maximum number of
- // attempts was reached.
- //
- // When sending synchronously and the writer's batch size is configured to be
- // greater than 1, this method blocks until either a full batch can be assembled
- // or the batch timeout is reached. The batch size and timeouts are evaluated
- // per partition, so the choice of Balancer can also influence the flushing
- // behavior. For example, the Hash balancer will require on average N * batch
- // size messages to trigger a flush where N is the number of partitions. The
- // best way to achieve good batching behavior is to share one Writer amongst
- // multiple go routines.
- //
- // When the method returns an error, it may be of type kafka.WriteError to allow
- // the caller to determine the status of each message.
- //
- // The context passed as first argument may also be used to asynchronously
- // cancel the operation. Note that in this case there are no guarantees made on
- // whether messages were written to kafka, they might also still be written
- // after this method has already returned, therefore it is important to not
- // modify byte slices of passed messages if WriteMessages returned early due
- // to a canceled context.
- // The program should assume that the whole batch failed and re-write the
- // messages later (which could then cause duplicates).
- func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
- if w.Addr == nil {
- return errors.New("kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address")
- }
- if !w.enter() {
- return io.ErrClosedPipe
- }
- defer w.leave()
- if len(msgs) == 0 {
- return nil
- }
- balancer := w.balancer()
- batchBytes := w.batchBytes()
- for i := range msgs {
- n := int64(msgs[i].totalSize())
- if n > batchBytes {
- // This error is left for backward compatibility with historical
- // behavior, but it can yield O(N^2) behaviors. The expectations
- // are that the program will check if WriteMessages returned a
- // MessageTooLargeError, discard the message that was exceeding
- // the maximum size, and try again.
- return messageTooLarge(msgs, i)
- }
- }
- // We use int32 here to half the memory footprint (compared to using int
- // on 64 bits architectures). We map lists of the message indexes instead
- // of the message values for the same reason, int32 is 4 bytes, vs a full
- // Message value which is 100+ bytes and contains pointers and contributes
- // to increasing GC work.
- assignments := make(map[topicPartition][]int32)
- for i, msg := range msgs {
- topic, err := w.chooseTopic(msg)
- if err != nil {
- return err
- }
- numPartitions, err := w.partitions(ctx, topic)
- if err != nil {
- return err
- }
- partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...)
- key := topicPartition{
- topic: topic,
- partition: int32(partition),
- }
- assignments[key] = append(assignments[key], int32(i))
- }
- batches := w.batchMessages(msgs, assignments)
- if w.Async {
- return nil
- }
- done := ctx.Done()
- hasErrors := false
- for batch := range batches {
- select {
- case <-done:
- return ctx.Err()
- case <-batch.done:
- if batch.err != nil {
- hasErrors = true
- }
- }
- }
- if !hasErrors {
- return nil
- }
- werr := make(WriteErrors, len(msgs))
- for batch, indexes := range batches {
- for _, i := range indexes {
- werr[i] = batch.err
- }
- }
- return werr
- }
- func (w *Writer) batchMessages(messages []Message, assignments map[topicPartition][]int32) map[*writeBatch][]int32 {
- var batches map[*writeBatch][]int32
- if !w.Async {
- batches = make(map[*writeBatch][]int32, len(assignments))
- }
- w.mutex.Lock()
- defer w.mutex.Unlock()
- if w.writers == nil {
- w.writers = map[topicPartition]*partitionWriter{}
- }
- for key, indexes := range assignments {
- writer := w.writers[key]
- if writer == nil {
- writer = newPartitionWriter(w, key)
- w.writers[key] = writer
- }
- wbatches := writer.writeMessages(messages, indexes)
- for batch, idxs := range wbatches {
- batches[batch] = idxs
- }
- }
- return batches
- }
- func (w *Writer) produce(key topicPartition, batch *writeBatch) (*ProduceResponse, error) {
- timeout := w.writeTimeout()
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- return w.client(timeout).Produce(ctx, &ProduceRequest{
- Partition: int(key.partition),
- Topic: key.topic,
- RequiredAcks: w.RequiredAcks,
- Compression: w.Compression,
- Records: &writerRecords{
- msgs: batch.msgs,
- },
- })
- }
- func (w *Writer) partitions(ctx context.Context, topic string) (int, error) {
- client := w.client(w.readTimeout())
- // Here we use the transport directly as an optimization to avoid the
- // construction of temporary request and response objects made by the
- // (*Client).Metadata API.
- //
- // It is expected that the transport will optimize this request by
- // caching recent results (the kafka.Transport types does).
- r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{
- TopicNames: []string{topic},
- AllowAutoTopicCreation: w.AllowAutoTopicCreation,
- })
- if err != nil {
- return 0, err
- }
- for _, t := range r.(*metadataAPI.Response).Topics {
- if t.Name == topic {
- // This should always hit, unless kafka has a bug.
- if t.ErrorCode != 0 {
- return 0, Error(t.ErrorCode)
- }
- return len(t.Partitions), nil
- }
- }
- return 0, UnknownTopicOrPartition
- }
- func (w *Writer) client(timeout time.Duration) *Client {
- return &Client{
- Addr: w.Addr,
- Transport: w.Transport,
- Timeout: timeout,
- }
- }
- func (w *Writer) balancer() Balancer {
- if w.Balancer != nil {
- return w.Balancer
- }
- return &w.roundRobin
- }
- func (w *Writer) maxAttempts() int {
- if w.MaxAttempts > 0 {
- return w.MaxAttempts
- }
- // TODO: this is a very high default, if something has failed 9 times it
- // seems unlikely it will succeed on the 10th attempt. However, it does
- // carry the risk to greatly increase the volume of requests sent to the
- // kafka cluster. We should consider reducing this default (3?).
- return 10
- }
- func (w *Writer) writeBackoffMin() time.Duration {
- if w.WriteBackoffMin > 0 {
- return w.WriteBackoffMin
- }
- return 100 * time.Millisecond
- }
- func (w *Writer) writeBackoffMax() time.Duration {
- if w.WriteBackoffMax > 0 {
- return w.WriteBackoffMax
- }
- return 1 * time.Second
- }
- func (w *Writer) batchSize() int {
- if w.BatchSize > 0 {
- return w.BatchSize
- }
- return 100
- }
- func (w *Writer) batchBytes() int64 {
- if w.BatchBytes > 0 {
- return w.BatchBytes
- }
- return 1048576
- }
- func (w *Writer) batchTimeout() time.Duration {
- if w.BatchTimeout > 0 {
- return w.BatchTimeout
- }
- return 1 * time.Second
- }
- func (w *Writer) readTimeout() time.Duration {
- if w.ReadTimeout > 0 {
- return w.ReadTimeout
- }
- return 10 * time.Second
- }
- func (w *Writer) writeTimeout() time.Duration {
- if w.WriteTimeout > 0 {
- return w.WriteTimeout
- }
- return 10 * time.Second
- }
- func (w *Writer) withLogger(do func(Logger)) {
- if w.Logger != nil {
- do(w.Logger)
- }
- }
- func (w *Writer) withErrorLogger(do func(Logger)) {
- if w.ErrorLogger != nil {
- do(w.ErrorLogger)
- } else {
- w.withLogger(do)
- }
- }
- func (w *Writer) stats() *writerStats {
- w.once.Do(func() {
- // This field is not nil when the writer was constructed with NewWriter
- // to share the value with the dial function and count dials.
- if w.writerStats == nil {
- w.writerStats = new(writerStats)
- }
- })
- return w.writerStats
- }
- // Stats returns a snapshot of the writer stats since the last time the method
- // was called, or since the writer was created if it is called for the first
- // time.
- //
- // A typical use of this method is to spawn a goroutine that will periodically
- // call Stats on a kafka writer and report the metrics to a stats collection
- // system.
- func (w *Writer) Stats() WriterStats {
- stats := w.stats()
- return WriterStats{
- Dials: stats.dials.snapshot(),
- Writes: stats.writes.snapshot(),
- Messages: stats.messages.snapshot(),
- Bytes: stats.bytes.snapshot(),
- Errors: stats.errors.snapshot(),
- DialTime: stats.dialTime.snapshotDuration(),
- BatchTime: stats.batchTime.snapshotDuration(),
- BatchQueueTime: stats.batchQueueTime.snapshotDuration(),
- WriteTime: stats.writeTime.snapshotDuration(),
- WaitTime: stats.waitTime.snapshotDuration(),
- Retries: stats.retries.snapshot(),
- BatchSize: stats.batchSize.snapshot(),
- BatchBytes: stats.batchSizeBytes.snapshot(),
- MaxAttempts: int64(w.maxAttempts()),
- WriteBackoffMin: w.writeBackoffMin(),
- WriteBackoffMax: w.writeBackoffMax(),
- MaxBatchSize: int64(w.batchSize()),
- BatchTimeout: w.batchTimeout(),
- ReadTimeout: w.readTimeout(),
- WriteTimeout: w.writeTimeout(),
- RequiredAcks: int64(w.RequiredAcks),
- Async: w.Async,
- Topic: w.Topic,
- }
- }
- func (w *Writer) chooseTopic(msg Message) (string, error) {
- // w.Topic and msg.Topic are mutually exclusive, meaning only 1 must be set
- // otherwise we will return an error.
- if w.Topic != "" && msg.Topic != "" {
- return "", errors.New("kafka.(*Writer): Topic must not be specified for both Writer and Message")
- } else if w.Topic == "" && msg.Topic == "" {
- return "", errors.New("kafka.(*Writer): Topic must be specified for Writer or Message")
- }
- // now we choose the topic, depending on which one is not empty
- if msg.Topic != "" {
- return msg.Topic, nil
- }
- return w.Topic, nil
- }
- type batchQueue struct {
- queue []*writeBatch
- // Pointers are used here to make `go vet` happy, and avoid copying mutexes.
- // It may be better to revert these to non-pointers and avoid the copies in
- // a different way.
- mutex *sync.Mutex
- cond *sync.Cond
- closed bool
- }
- func (b *batchQueue) Put(batch *writeBatch) bool {
- b.cond.L.Lock()
- defer b.cond.L.Unlock()
- defer b.cond.Broadcast()
- if b.closed {
- return false
- }
- b.queue = append(b.queue, batch)
- return true
- }
- func (b *batchQueue) Get() *writeBatch {
- b.cond.L.Lock()
- defer b.cond.L.Unlock()
- for len(b.queue) == 0 && !b.closed {
- b.cond.Wait()
- }
- if len(b.queue) == 0 {
- return nil
- }
- batch := b.queue[0]
- b.queue[0] = nil
- b.queue = b.queue[1:]
- return batch
- }
- func (b *batchQueue) Close() {
- b.cond.L.Lock()
- defer b.cond.L.Unlock()
- defer b.cond.Broadcast()
- b.closed = true
- }
- func newBatchQueue(initialSize int) batchQueue {
- bq := batchQueue{
- queue: make([]*writeBatch, 0, initialSize),
- mutex: &sync.Mutex{},
- cond: &sync.Cond{},
- }
- bq.cond.L = bq.mutex
- return bq
- }
- // partitionWriter is a writer for a topic-partion pair. It maintains messaging order
- // across batches of messages.
- type partitionWriter struct {
- meta topicPartition
- queue batchQueue
- mutex sync.Mutex
- currBatch *writeBatch
- // reference to the writer that owns this batch. Used for the produce logic
- // as well as stat tracking
- w *Writer
- }
- func newPartitionWriter(w *Writer, key topicPartition) *partitionWriter {
- writer := &partitionWriter{
- meta: key,
- queue: newBatchQueue(10),
- w: w,
- }
- w.spawn(writer.writeBatches)
- return writer
- }
- func (ptw *partitionWriter) writeBatches() {
- for {
- batch := ptw.queue.Get()
- // The only time we can return nil is when the queue is closed
- // and empty. If the queue is closed that means
- // the Writer is closed so once we're here it's time to exit.
- if batch == nil {
- return
- }
- ptw.writeBatch(batch)
- }
- }
- func (ptw *partitionWriter) writeMessages(msgs []Message, indexes []int32) map[*writeBatch][]int32 {
- ptw.mutex.Lock()
- defer ptw.mutex.Unlock()
- batchSize := ptw.w.batchSize()
- batchBytes := ptw.w.batchBytes()
- var batches map[*writeBatch][]int32
- if !ptw.w.Async {
- batches = make(map[*writeBatch][]int32, 1)
- }
- for _, i := range indexes {
- assignMessage:
- batch := ptw.currBatch
- if batch == nil {
- batch = ptw.newWriteBatch()
- ptw.currBatch = batch
- }
- if !batch.add(msgs[i], batchSize, batchBytes) {
- batch.trigger()
- ptw.queue.Put(batch)
- ptw.currBatch = nil
- goto assignMessage
- }
- if batch.full(batchSize, batchBytes) {
- batch.trigger()
- ptw.queue.Put(batch)
- ptw.currBatch = nil
- }
- if !ptw.w.Async {
- batches[batch] = append(batches[batch], i)
- }
- }
- return batches
- }
- // ptw.w can be accessed here because this is called with the lock ptw.mutex already held.
- func (ptw *partitionWriter) newWriteBatch() *writeBatch {
- batch := newWriteBatch(time.Now(), ptw.w.batchTimeout())
- ptw.w.spawn(func() { ptw.awaitBatch(batch) })
- return batch
- }
- // awaitBatch waits for a batch to either fill up or time out.
- // If the batch is full it only stops the timer, if the timer
- // expires it will queue the batch for writing if needed.
- func (ptw *partitionWriter) awaitBatch(batch *writeBatch) {
- select {
- case <-batch.timer.C:
- ptw.mutex.Lock()
- // detach the batch from the writer if we're still attached
- // and queue for writing.
- // Only the current batch can expire, all previous batches were already written to the queue.
- // If writeMesseages locks pw.mutex after the timer fires but before this goroutine
- // can lock pw.mutex it will either have filled the batch and enqueued it which will mean
- // pw.currBatch != batch so we just move on.
- // Otherwise, we detach the batch from the ptWriter and enqueue it for writing.
- if ptw.currBatch == batch {
- ptw.queue.Put(batch)
- ptw.currBatch = nil
- }
- ptw.mutex.Unlock()
- case <-batch.ready:
- // The batch became full, it was removed from the ptwriter and its
- // ready channel was closed. We need to close the timer to avoid
- // having it leak until it expires.
- batch.timer.Stop()
- }
- stats := ptw.w.stats()
- stats.batchQueueTime.observe(int64(time.Since(batch.time)))
- }
- func (ptw *partitionWriter) writeBatch(batch *writeBatch) {
- stats := ptw.w.stats()
- stats.batchTime.observe(int64(time.Since(batch.time)))
- stats.batchSize.observe(int64(len(batch.msgs)))
- stats.batchSizeBytes.observe(batch.bytes)
- var res *ProduceResponse
- var err error
- key := ptw.meta
- for attempt, maxAttempts := 0, ptw.w.maxAttempts(); attempt < maxAttempts; attempt++ {
- if attempt != 0 {
- stats.retries.observe(1)
- // TODO: should there be a way to asynchronously cancel this
- // operation?
- //
- // * If all goroutines that added message to this batch have stopped
- // waiting for it, should we abort?
- //
- // * If the writer has been closed? It reduces the durability
- // guarantees to abort, but may be better to avoid long wait times
- // on close.
- //
- delay := backoff(attempt, ptw.w.writeBackoffMin(), ptw.w.writeBackoffMax())
- ptw.w.withLogger(func(log Logger) {
- log.Printf("backing off %s writing %d messages to %s (partition: %d)", delay, len(batch.msgs), key.topic, key.partition)
- })
- time.Sleep(delay)
- }
- ptw.w.withLogger(func(log Logger) {
- log.Printf("writing %d messages to %s (partition: %d)", len(batch.msgs), key.topic, key.partition)
- })
- start := time.Now()
- res, err = ptw.w.produce(key, batch)
- stats.writes.observe(1)
- stats.messages.observe(int64(len(batch.msgs)))
- stats.bytes.observe(batch.bytes)
- // stats.writeTime used to report the duration of WriteMessages, but the
- // implementation was broken and reporting values in the nanoseconds
- // range. In kafka-go 0.4, we recylced this value to instead report the
- // duration of produce requests, and changed the stats.waitTime value to
- // report the time that kafka has throttled the requests for.
- stats.writeTime.observe(int64(time.Since(start)))
- if res != nil {
- err = res.Error
- stats.waitTime.observe(int64(res.Throttle))
- }
- if err == nil {
- break
- }
- stats.errors.observe(1)
- ptw.w.withErrorLogger(func(log Logger) {
- log.Printf("error writing messages to %s (partition %d, attempt %d): %s", key.topic, key.partition, attempt, err)
- })
- if !isTemporary(err) && !isTransientNetworkError(err) {
- break
- }
- }
- if res != nil {
- for i := range batch.msgs {
- m := &batch.msgs[i]
- m.Topic = key.topic
- m.Partition = int(key.partition)
- m.Offset = res.BaseOffset + int64(i)
- if m.Time.IsZero() {
- m.Time = res.LogAppendTime
- }
- }
- }
- if ptw.w.Completion != nil {
- ptw.w.Completion(batch.msgs, err)
- }
- batch.complete(err)
- }
- func (ptw *partitionWriter) close() {
- ptw.mutex.Lock()
- defer ptw.mutex.Unlock()
- if ptw.currBatch != nil {
- batch := ptw.currBatch
- ptw.queue.Put(batch)
- ptw.currBatch = nil
- batch.trigger()
- }
- ptw.queue.Close()
- }
- type writeBatch struct {
- time time.Time
- msgs []Message
- size int
- bytes int64
- ready chan struct{}
- done chan struct{}
- timer *time.Timer
- err error // result of the batch completion
- }
- func newWriteBatch(now time.Time, timeout time.Duration) *writeBatch {
- return &writeBatch{
- time: now,
- ready: make(chan struct{}),
- done: make(chan struct{}),
- timer: time.NewTimer(timeout),
- }
- }
- func (b *writeBatch) add(msg Message, maxSize int, maxBytes int64) bool {
- bytes := int64(msg.totalSize())
- if b.size > 0 && (b.bytes+bytes) > maxBytes {
- return false
- }
- if cap(b.msgs) == 0 {
- b.msgs = make([]Message, 0, maxSize)
- }
- b.msgs = append(b.msgs, msg)
- b.size++
- b.bytes += bytes
- return true
- }
- func (b *writeBatch) full(maxSize int, maxBytes int64) bool {
- return b.size >= maxSize || b.bytes >= maxBytes
- }
- func (b *writeBatch) trigger() {
- close(b.ready)
- }
- func (b *writeBatch) complete(err error) {
- b.err = err
- close(b.done)
- }
- type writerRecords struct {
- msgs []Message
- index int
- record Record
- key bytesReadCloser
- value bytesReadCloser
- }
- func (r *writerRecords) ReadRecord() (*Record, error) {
- if r.index >= 0 && r.index < len(r.msgs) {
- m := &r.msgs[r.index]
- r.index++
- r.record = Record{
- Time: m.Time,
- Headers: m.Headers,
- }
- if m.Key != nil {
- r.key.Reset(m.Key)
- r.record.Key = &r.key
- }
- if m.Value != nil {
- r.value.Reset(m.Value)
- r.record.Value = &r.value
- }
- return &r.record, nil
- }
- return nil, io.EOF
- }
- type bytesReadCloser struct{ bytes.Reader }
- func (*bytesReadCloser) Close() error { return nil }
- // A cache of []int values passed to balancers of writers, used to amortize the
- // heap allocation of the partition index lists.
- //
- // With hindsight, the use of `...int` to pass the partition list to Balancers
- // was not the best design choice: kafka partition numbers are monotonically
- // increasing, we could have simply passed the number of partitions instead.
- // If we ever revisit this API, we can hopefully remove this cache.
- var partitionsCache atomic.Value
- func loadCachedPartitions(numPartitions int) []int {
- partitions, ok := partitionsCache.Load().([]int)
- if ok && len(partitions) >= numPartitions {
- return partitions[:numPartitions]
- }
- const alignment = 128
- n := ((numPartitions / alignment) + 1) * alignment
- partitions = make([]int, n)
- for i := range partitions {
- partitions[i] = i
- }
- partitionsCache.Store(partitions)
- return partitions[:numPartitions]
- }
|