1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252 |
- package kafka
- import (
- "bufio"
- "bytes"
- "context"
- "errors"
- "fmt"
- "io"
- "math"
- "net"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- // ErrGroupClosed is returned by ConsumerGroup.Next when the group has already
- // been closed.
- var ErrGroupClosed = errors.New("consumer group is closed")
- // ErrGenerationEnded is returned by the context.Context issued by the
- // Generation's Start function when the context has been closed.
- var ErrGenerationEnded = errors.New("consumer group generation has ended")
- const (
- // defaultProtocolType holds the default protocol type documented in the
- // kafka protocol
- //
- // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI
- defaultProtocolType = "consumer"
- // defaultHeartbeatInterval contains the default time between heartbeats. If
- // the coordinator does not receive a heartbeat within the session timeout interval,
- // the consumer will be considered dead and the coordinator will rebalance the
- // group.
- //
- // As a rule, the heartbeat interval should be no greater than 1/3 the session timeout.
- defaultHeartbeatInterval = 3 * time.Second
- // defaultSessionTimeout contains the default interval the coordinator will wait
- // for a heartbeat before marking a consumer as dead.
- defaultSessionTimeout = 30 * time.Second
- // defaultRebalanceTimeout contains the amount of time the coordinator will wait
- // for consumers to issue a join group once a rebalance has been requested.
- defaultRebalanceTimeout = 30 * time.Second
- // defaultJoinGroupBackoff is the amount of time to wait after a failed
- // consumer group generation before attempting to re-join.
- defaultJoinGroupBackoff = 5 * time.Second
- // defaultRetentionTime holds the length of time a the consumer group will be
- // saved by kafka. This value tells the broker to use its configured value.
- defaultRetentionTime = -1 * time.Millisecond
- // defaultPartitionWatchTime contains the amount of time the kafka-go will wait to
- // query the brokers looking for partition changes.
- defaultPartitionWatchTime = 5 * time.Second
- // defaultTimeout is the deadline to set when interacting with the
- // consumer group coordinator.
- defaultTimeout = 5 * time.Second
- )
- // ConsumerGroupConfig is a configuration object used to create new instances of
- // ConsumerGroup.
- type ConsumerGroupConfig struct {
- // ID is the consumer group ID. It must not be empty.
- ID string
- // The list of broker addresses used to connect to the kafka cluster. It
- // must not be empty.
- Brokers []string
- // An dialer used to open connections to the kafka server. This field is
- // optional, if nil, the default dialer is used instead.
- Dialer *Dialer
- // Topics is the list of topics that will be consumed by this group. It
- // will usually have a single value, but it is permitted to have multiple
- // for more complex use cases.
- Topics []string
- // GroupBalancers is the priority-ordered list of client-side consumer group
- // balancing strategies that will be offered to the coordinator. The first
- // strategy that all group members support will be chosen by the leader.
- //
- // Default: [Range, RoundRobin]
- GroupBalancers []GroupBalancer
- // HeartbeatInterval sets the optional frequency at which the reader sends the consumer
- // group heartbeat update.
- //
- // Default: 3s
- HeartbeatInterval time.Duration
- // PartitionWatchInterval indicates how often a reader checks for partition changes.
- // If a reader sees a partition change (such as a partition add) it will rebalance the group
- // picking up new partitions.
- //
- // Default: 5s
- PartitionWatchInterval time.Duration
- // WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
- // polling the brokers and rebalancing if any partition changes happen to the topic.
- WatchPartitionChanges bool
- // SessionTimeout optionally sets the length of time that may pass without a heartbeat
- // before the coordinator considers the consumer dead and initiates a rebalance.
- //
- // Default: 30s
- SessionTimeout time.Duration
- // RebalanceTimeout optionally sets the length of time the coordinator will wait
- // for members to join as part of a rebalance. For kafka servers under higher
- // load, it may be useful to set this value higher.
- //
- // Default: 30s
- RebalanceTimeout time.Duration
- // JoinGroupBackoff optionally sets the length of time to wait before re-joining
- // the consumer group after an error.
- //
- // Default: 5s
- JoinGroupBackoff time.Duration
- // RetentionTime optionally sets the length of time the consumer group will
- // be saved by the broker. -1 will disable the setting and leave the
- // retention up to the broker's offsets.retention.minutes property. By
- // default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >=
- // 2.0.
- //
- // Default: -1
- RetentionTime time.Duration
- // StartOffset determines from whence the consumer group should begin
- // consuming when it finds a partition without a committed offset. If
- // non-zero, it must be set to one of FirstOffset or LastOffset.
- //
- // Default: FirstOffset
- StartOffset int64
- // If not nil, specifies a logger used to report internal changes within the
- // reader.
- Logger Logger
- // ErrorLogger is the logger used to report errors. If nil, the reader falls
- // back to using Logger instead.
- ErrorLogger Logger
- // Timeout is the network timeout used when communicating with the consumer
- // group coordinator. This value should not be too small since errors
- // communicating with the broker will generally cause a consumer group
- // rebalance, and it's undesirable that a transient network error intoduce
- // that overhead. Similarly, it should not be too large or the consumer
- // group may be slow to respond to the coordinator failing over to another
- // broker.
- //
- // Default: 5s
- Timeout time.Duration
- // connect is a function for dialing the coordinator. This is provided for
- // unit testing to mock broker connections.
- connect func(dialer *Dialer, brokers ...string) (coordinator, error)
- }
- // Validate method validates ConsumerGroupConfig properties and sets relevant
- // defaults.
- func (config *ConsumerGroupConfig) Validate() error {
- if len(config.Brokers) == 0 {
- return errors.New("cannot create a consumer group with an empty list of broker addresses")
- }
- if len(config.Topics) == 0 {
- return errors.New("cannot create a consumer group without a topic")
- }
- if config.ID == "" {
- return errors.New("cannot create a consumer group without an ID")
- }
- if config.Dialer == nil {
- config.Dialer = DefaultDialer
- }
- if len(config.GroupBalancers) == 0 {
- config.GroupBalancers = []GroupBalancer{
- RangeGroupBalancer{},
- RoundRobinGroupBalancer{},
- }
- }
- if config.HeartbeatInterval == 0 {
- config.HeartbeatInterval = defaultHeartbeatInterval
- }
- if config.SessionTimeout == 0 {
- config.SessionTimeout = defaultSessionTimeout
- }
- if config.PartitionWatchInterval == 0 {
- config.PartitionWatchInterval = defaultPartitionWatchTime
- }
- if config.RebalanceTimeout == 0 {
- config.RebalanceTimeout = defaultRebalanceTimeout
- }
- if config.JoinGroupBackoff == 0 {
- config.JoinGroupBackoff = defaultJoinGroupBackoff
- }
- if config.RetentionTime == 0 {
- config.RetentionTime = defaultRetentionTime
- }
- if config.HeartbeatInterval < 0 || (config.HeartbeatInterval/time.Millisecond) >= math.MaxInt32 {
- return fmt.Errorf("HeartbeatInterval out of bounds: %d", config.HeartbeatInterval)
- }
- if config.SessionTimeout < 0 || (config.SessionTimeout/time.Millisecond) >= math.MaxInt32 {
- return fmt.Errorf("SessionTimeout out of bounds: %d", config.SessionTimeout)
- }
- if config.RebalanceTimeout < 0 || (config.RebalanceTimeout/time.Millisecond) >= math.MaxInt32 {
- return fmt.Errorf("RebalanceTimeout out of bounds: %d", config.RebalanceTimeout)
- }
- if config.JoinGroupBackoff < 0 || (config.JoinGroupBackoff/time.Millisecond) >= math.MaxInt32 {
- return fmt.Errorf("JoinGroupBackoff out of bounds: %d", config.JoinGroupBackoff)
- }
- if config.RetentionTime < 0 && config.RetentionTime != defaultRetentionTime {
- return fmt.Errorf("RetentionTime out of bounds: %d", config.RetentionTime)
- }
- if config.PartitionWatchInterval < 0 || (config.PartitionWatchInterval/time.Millisecond) >= math.MaxInt32 {
- return fmt.Errorf("PartitionWachInterval out of bounds %d", config.PartitionWatchInterval)
- }
- if config.StartOffset == 0 {
- config.StartOffset = FirstOffset
- }
- if config.StartOffset != FirstOffset && config.StartOffset != LastOffset {
- return fmt.Errorf("StartOffset is not valid %d", config.StartOffset)
- }
- if config.Timeout == 0 {
- config.Timeout = defaultTimeout
- }
- if config.connect == nil {
- config.connect = makeConnect(*config)
- }
- return nil
- }
- // PartitionAssignment represents the starting state of a partition that has
- // been assigned to a consumer.
- type PartitionAssignment struct {
- // ID is the partition ID.
- ID int
- // Offset is the initial offset at which this assignment begins. It will
- // either be an absolute offset if one has previously been committed for
- // the consumer group or a relative offset such as FirstOffset when this
- // is the first time the partition have been assigned to a member of the
- // group.
- Offset int64
- }
- // genCtx adapts the done channel of the generation to a context.Context. This
- // is used by Generation.Start so that we can pass a context to go routines
- // instead of passing around channels.
- type genCtx struct {
- gen *Generation
- }
- func (c genCtx) Done() <-chan struct{} {
- return c.gen.done
- }
- func (c genCtx) Err() error {
- select {
- case <-c.gen.done:
- return ErrGenerationEnded
- default:
- return nil
- }
- }
- func (c genCtx) Deadline() (time.Time, bool) {
- return time.Time{}, false
- }
- func (c genCtx) Value(interface{}) interface{} {
- return nil
- }
- // Generation represents a single consumer group generation. The generation
- // carries the topic+partition assignments for the given. It also provides
- // facilities for committing offsets and for running functions whose lifecycles
- // are bound to the generation.
- type Generation struct {
- // ID is the generation ID as assigned by the consumer group coordinator.
- ID int32
- // GroupID is the name of the consumer group.
- GroupID string
- // MemberID is the ID assigned to this consumer by the consumer group
- // coordinator.
- MemberID string
- // Assignments is the initial state of this Generation. The partition
- // assignments are grouped by topic.
- Assignments map[string][]PartitionAssignment
- conn coordinator
- // the following fields are used for process accounting to synchronize
- // between Start and close. lock protects all of them. done is closed
- // when the generation is ending in order to signal that the generation
- // should start self-desructing. closed protects against double-closing
- // the done chan. routines is a count of running go routines that have been
- // launched by Start. joined will be closed by the last go routine to exit.
- lock sync.Mutex
- done chan struct{}
- closed bool
- routines int
- joined chan struct{}
- retentionMillis int64
- log func(func(Logger))
- logError func(func(Logger))
- }
- // close stops the generation and waits for all functions launched via Start to
- // terminate.
- func (g *Generation) close() {
- g.lock.Lock()
- if !g.closed {
- close(g.done)
- g.closed = true
- }
- // determine whether any go routines are running that we need to wait for.
- // waiting needs to happen outside of the critical section.
- r := g.routines
- g.lock.Unlock()
- // NOTE: r will be zero if no go routines were ever launched. no need to
- // wait in that case.
- if r > 0 {
- <-g.joined
- }
- }
- // Start launches the provided function in a go routine and adds accounting such
- // that when the function exits, it stops the current generation (if not
- // already in the process of doing so).
- //
- // The provided function MUST support cancellation via the ctx argument and exit
- // in a timely manner once the ctx is complete. When the context is closed, the
- // context's Error() function will return ErrGenerationEnded.
- //
- // When closing out a generation, the consumer group will wait for all functions
- // launched by Start to exit before the group can move on and join the next
- // generation. If the function does not exit promptly, it will stop forward
- // progress for this consumer and potentially cause consumer group membership
- // churn.
- func (g *Generation) Start(fn func(ctx context.Context)) {
- g.lock.Lock()
- defer g.lock.Unlock()
- // this is an edge case: if the generation has already closed, then it's
- // possible that the close func has already waited on outstanding go
- // routines and exited.
- //
- // nonetheless, it's important to honor that the fn is invoked in case the
- // calling function is waiting e.g. on a channel send or a WaitGroup. in
- // such a case, fn should immediately exit because ctx.Err() will return
- // ErrGenerationEnded.
- if g.closed {
- go fn(genCtx{g})
- return
- }
- // register that there is one more go routine that's part of this gen.
- g.routines++
- go func() {
- fn(genCtx{g})
- g.lock.Lock()
- // shut down the generation as soon as one function exits. this is
- // different from close() in that it doesn't wait for all go routines in
- // the generation to exit.
- if !g.closed {
- close(g.done)
- g.closed = true
- }
- g.routines--
- // if this was the last go routine in the generation, close the joined
- // chan so that close() can exit if it's waiting.
- if g.routines == 0 {
- close(g.joined)
- }
- g.lock.Unlock()
- }()
- }
- // CommitOffsets commits the provided topic+partition+offset combos to the
- // consumer group coordinator. This can be used to reset the consumer to
- // explicit offsets.
- func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
- if len(offsets) == 0 {
- return nil
- }
- topics := make([]offsetCommitRequestV2Topic, 0, len(offsets))
- for topic, partitions := range offsets {
- t := offsetCommitRequestV2Topic{Topic: topic}
- for partition, offset := range partitions {
- t.Partitions = append(t.Partitions, offsetCommitRequestV2Partition{
- Partition: int32(partition),
- Offset: offset,
- })
- }
- topics = append(topics, t)
- }
- request := offsetCommitRequestV2{
- GroupID: g.GroupID,
- GenerationID: g.ID,
- MemberID: g.MemberID,
- RetentionTime: g.retentionMillis,
- Topics: topics,
- }
- _, err := g.conn.offsetCommit(request)
- if err == nil {
- // if logging is enabled, print out the partitions that were committed.
- g.log(func(l Logger) {
- var report []string
- for _, t := range request.Topics {
- report = append(report, fmt.Sprintf("\ttopic: %s", t.Topic))
- for _, p := range t.Partitions {
- report = append(report, fmt.Sprintf("\t\tpartition %d: %d", p.Partition, p.Offset))
- }
- }
- l.Printf("committed offsets for group %s: \n%s", g.GroupID, strings.Join(report, "\n"))
- })
- }
- return err
- }
- // heartbeatLoop checks in with the consumer group coordinator at the provided
- // interval. It exits if it ever encounters an error, which would signal the
- // end of the generation.
- func (g *Generation) heartbeatLoop(interval time.Duration) {
- g.Start(func(ctx context.Context) {
- g.log(func(l Logger) {
- l.Printf("started heartbeat for group, %v [%v]", g.GroupID, interval)
- })
- defer g.log(func(l Logger) {
- l.Printf("stopped heartbeat for group %s\n", g.GroupID)
- })
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case <-ticker.C:
- _, err := g.conn.heartbeat(heartbeatRequestV0{
- GroupID: g.GroupID,
- GenerationID: g.ID,
- MemberID: g.MemberID,
- })
- if err != nil {
- return
- }
- }
- }
- })
- }
- // partitionWatcher queries kafka and watches for partition changes, triggering
- // a rebalance if changes are found. Similar to heartbeat it's okay to return on
- // error here as if you are unable to ask a broker for basic metadata you're in
- // a bad spot and should rebalance. Commonly you will see an error here if there
- // is a problem with the connection to the coordinator and a rebalance will
- // establish a new connection to the coordinator.
- func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
- g.Start(func(ctx context.Context) {
- g.log(func(l Logger) {
- l.Printf("started partition watcher for group, %v, topic %v [%v]", g.GroupID, topic, interval)
- })
- defer g.log(func(l Logger) {
- l.Printf("stopped partition watcher for group, %v, topic %v", g.GroupID, topic)
- })
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- ops, err := g.conn.readPartitions(topic)
- if err != nil {
- g.logError(func(l Logger) {
- l.Printf("Problem getting partitions during startup, %v\n, Returning and setting up nextGeneration", err)
- })
- return
- }
- oParts := len(ops)
- for {
- select {
- case <-ctx.Done():
- return
- case <-ticker.C:
- ops, err := g.conn.readPartitions(topic)
- switch {
- case err == nil, errors.Is(err, UnknownTopicOrPartition):
- if len(ops) != oParts {
- g.log(func(l Logger) {
- l.Printf("Partition changes found, reblancing group: %v.", g.GroupID)
- })
- return
- }
- default:
- g.logError(func(l Logger) {
- l.Printf("Problem getting partitions while checking for changes, %v", err)
- })
- var kafkaError Error
- if errors.As(err, &kafkaError) {
- continue
- }
- // other errors imply that we lost the connection to the coordinator, so we
- // should abort and reconnect.
- return
- }
- }
- }
- })
- }
- // coordinator is a subset of the functionality in Conn in order to facilitate
- // testing the consumer group...especially for error conditions that are
- // difficult to instigate with a live broker running in docker.
- type coordinator interface {
- io.Closer
- findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error)
- joinGroup(joinGroupRequestV1) (joinGroupResponseV1, error)
- syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error)
- leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error)
- heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error)
- offsetFetch(offsetFetchRequestV1) (offsetFetchResponseV1, error)
- offsetCommit(offsetCommitRequestV2) (offsetCommitResponseV2, error)
- readPartitions(...string) ([]Partition, error)
- }
- // timeoutCoordinator wraps the Conn to ensure that every operation has a
- // deadline. Otherwise, it would be possible for requests to block indefinitely
- // if the remote server never responds. There are many spots where the consumer
- // group needs to interact with the broker, so it feels less error prone to
- // factor all of the deadline management into this shared location as opposed to
- // peppering it all through where the code actually interacts with the broker.
- type timeoutCoordinator struct {
- timeout time.Duration
- sessionTimeout time.Duration
- rebalanceTimeout time.Duration
- conn *Conn
- }
- func (t *timeoutCoordinator) Close() error {
- return t.conn.Close()
- }
- func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoordinatorResponseV0, error) {
- if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
- return findCoordinatorResponseV0{}, err
- }
- return t.conn.findCoordinator(req)
- }
- func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) {
- // in the case of join group, the consumer group coordinator may wait up
- // to rebalance timeout in order to wait for all members to join.
- if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil {
- return joinGroupResponseV1{}, err
- }
- return t.conn.joinGroup(req)
- }
- func (t *timeoutCoordinator) syncGroup(req syncGroupRequestV0) (syncGroupResponseV0, error) {
- // in the case of sync group, the consumer group leader is given up to
- // the session timeout to respond before the coordinator will give up.
- if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.sessionTimeout)); err != nil {
- return syncGroupResponseV0{}, err
- }
- return t.conn.syncGroup(req)
- }
- func (t *timeoutCoordinator) leaveGroup(req leaveGroupRequestV0) (leaveGroupResponseV0, error) {
- if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
- return leaveGroupResponseV0{}, err
- }
- return t.conn.leaveGroup(req)
- }
- func (t *timeoutCoordinator) heartbeat(req heartbeatRequestV0) (heartbeatResponseV0, error) {
- if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
- return heartbeatResponseV0{}, err
- }
- return t.conn.heartbeat(req)
- }
- func (t *timeoutCoordinator) offsetFetch(req offsetFetchRequestV1) (offsetFetchResponseV1, error) {
- if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
- return offsetFetchResponseV1{}, err
- }
- return t.conn.offsetFetch(req)
- }
- func (t *timeoutCoordinator) offsetCommit(req offsetCommitRequestV2) (offsetCommitResponseV2, error) {
- if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
- return offsetCommitResponseV2{}, err
- }
- return t.conn.offsetCommit(req)
- }
- func (t *timeoutCoordinator) readPartitions(topics ...string) ([]Partition, error) {
- if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
- return nil, err
- }
- return t.conn.ReadPartitions(topics...)
- }
- // NewConsumerGroup creates a new ConsumerGroup. It returns an error if the
- // provided configuration is invalid. It does not attempt to connect to the
- // Kafka cluster. That happens asynchronously, and any errors will be reported
- // by Next.
- func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) {
- if err := config.Validate(); err != nil {
- return nil, err
- }
- cg := &ConsumerGroup{
- config: config,
- next: make(chan *Generation),
- errs: make(chan error),
- done: make(chan struct{}),
- }
- cg.wg.Add(1)
- go func() {
- cg.run()
- cg.wg.Done()
- }()
- return cg, nil
- }
- // ConsumerGroup models a Kafka consumer group. A caller doesn't interact with
- // the group directly. Rather, they interact with a Generation. Every time a
- // member enters or exits the group, it results in a new Generation. The
- // Generation is where partition assignments and offset management occur.
- // Callers will use Next to get a handle to the Generation.
- type ConsumerGroup struct {
- config ConsumerGroupConfig
- next chan *Generation
- errs chan error
- closeOnce sync.Once
- wg sync.WaitGroup
- done chan struct{}
- }
- // Close terminates the current generation by causing this member to leave and
- // releases all local resources used to participate in the consumer group.
- // Close will also end the current generation if it is still active.
- func (cg *ConsumerGroup) Close() error {
- cg.closeOnce.Do(func() {
- close(cg.done)
- })
- cg.wg.Wait()
- return nil
- }
- // Next waits for the next consumer group generation. There will never be two
- // active generations. Next will never return a new generation until the
- // previous one has completed.
- //
- // If there are errors setting up the next generation, they will be surfaced
- // here.
- //
- // If the ConsumerGroup has been closed, then Next will return ErrGroupClosed.
- func (cg *ConsumerGroup) Next(ctx context.Context) (*Generation, error) {
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case <-cg.done:
- return nil, ErrGroupClosed
- case err := <-cg.errs:
- return nil, err
- case next := <-cg.next:
- return next, nil
- }
- }
- func (cg *ConsumerGroup) run() {
- // the memberID is the only piece of information that is maintained across
- // generations. it starts empty and will be assigned on the first nextGeneration
- // when the joinGroup request is processed. it may change again later if
- // the CG coordinator fails over or if the member is evicted. otherwise, it
- // will be constant for the lifetime of this group.
- var memberID string
- var err error
- for {
- memberID, err = cg.nextGeneration(memberID)
- // backoff will be set if this go routine should sleep before continuing
- // to the next generation. it will be non-nil in the case of an error
- // joining or syncing the group.
- var backoff <-chan time.Time
- switch {
- case err == nil:
- // no error...the previous generation finished normally.
- continue
- case errors.Is(err, ErrGroupClosed):
- // the CG has been closed...leave the group and exit loop.
- _ = cg.leaveGroup(memberID)
- return
- case errors.Is(err, RebalanceInProgress):
- // in case of a RebalanceInProgress, don't leave the group or
- // change the member ID, but report the error. the next attempt
- // to join the group will then be subject to the rebalance
- // timeout, so the broker will be responsible for throttling
- // this loop.
- default:
- // leave the group and report the error if we had gotten far
- // enough so as to have a member ID. also clear the member id
- // so we don't attempt to use it again. in order to avoid
- // a tight error loop, backoff before the next attempt to join
- // the group.
- _ = cg.leaveGroup(memberID)
- memberID = ""
- backoff = time.After(cg.config.JoinGroupBackoff)
- }
- // ensure that we exit cleanly in case the CG is done and no one is
- // waiting to receive on the unbuffered error channel.
- select {
- case <-cg.done:
- return
- case cg.errs <- err:
- }
- // backoff if needed, being sure to exit cleanly if the CG is done.
- if backoff != nil {
- select {
- case <-cg.done:
- // exit cleanly if the group is closed.
- return
- case <-backoff:
- }
- }
- }
- }
- func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
- // get a new connection to the coordinator on each loop. the previous
- // generation could have exited due to losing the connection, so this
- // ensures that we always have a clean starting point. it means we will
- // re-connect in certain cases, but that shouldn't be an issue given that
- // rebalances are relatively infrequent under normal operating
- // conditions.
- conn, err := cg.coordinator()
- if err != nil {
- cg.withErrorLogger(func(log Logger) {
- log.Printf("Unable to establish connection to consumer group coordinator for group %s: %v", cg.config.ID, err)
- })
- return memberID, err // a prior memberID may still be valid, so don't return ""
- }
- defer conn.Close()
- var generationID int32
- var groupAssignments GroupMemberAssignments
- var assignments map[string][]int32
- // join group. this will join the group and prepare assignments if our
- // consumer is elected leader. it may also change or assign the member ID.
- memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID)
- if err != nil {
- cg.withErrorLogger(func(log Logger) {
- log.Printf("Failed to join group %s: %v", cg.config.ID, err)
- })
- return memberID, err
- }
- cg.withLogger(func(log Logger) {
- log.Printf("Joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
- })
- // sync group
- assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments)
- if err != nil {
- cg.withErrorLogger(func(log Logger) {
- log.Printf("Failed to sync group %s: %v", cg.config.ID, err)
- })
- return memberID, err
- }
- // fetch initial offsets.
- var offsets map[string]map[int]int64
- offsets, err = cg.fetchOffsets(conn, assignments)
- if err != nil {
- cg.withErrorLogger(func(log Logger) {
- log.Printf("Failed to fetch offsets for group %s: %v", cg.config.ID, err)
- })
- return memberID, err
- }
- // create the generation.
- gen := Generation{
- ID: generationID,
- GroupID: cg.config.ID,
- MemberID: memberID,
- Assignments: cg.makeAssignments(assignments, offsets),
- conn: conn,
- done: make(chan struct{}),
- joined: make(chan struct{}),
- retentionMillis: int64(cg.config.RetentionTime / time.Millisecond),
- log: cg.withLogger,
- logError: cg.withErrorLogger,
- }
- // spawn all of the go routines required to facilitate this generation. if
- // any of these functions exit, then the generation is determined to be
- // complete.
- gen.heartbeatLoop(cg.config.HeartbeatInterval)
- if cg.config.WatchPartitionChanges {
- for _, topic := range cg.config.Topics {
- gen.partitionWatcher(cg.config.PartitionWatchInterval, topic)
- }
- }
- // make this generation available for retrieval. if the CG is closed before
- // we can send it on the channel, exit. that case is required b/c the next
- // channel is unbuffered. if the caller to Next has already bailed because
- // it's own teardown logic has been invoked, this would deadlock otherwise.
- select {
- case <-cg.done:
- gen.close()
- return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
- case cg.next <- &gen:
- }
- // wait for generation to complete. if the CG is closed before the
- // generation is finished, exit and leave the group.
- select {
- case <-cg.done:
- gen.close()
- return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
- case <-gen.done:
- // time for next generation! make sure all the current go routines exit
- // before continuing onward.
- gen.close()
- return memberID, nil
- }
- }
- // connect returns a connection to ANY broker.
- func makeConnect(config ConsumerGroupConfig) func(dialer *Dialer, brokers ...string) (coordinator, error) {
- return func(dialer *Dialer, brokers ...string) (coordinator, error) {
- var err error
- for _, broker := range brokers {
- var conn *Conn
- if conn, err = dialer.Dial("tcp", broker); err == nil {
- return &timeoutCoordinator{
- conn: conn,
- timeout: config.Timeout,
- sessionTimeout: config.SessionTimeout,
- rebalanceTimeout: config.RebalanceTimeout,
- }, nil
- }
- }
- return nil, err // err will be non-nil
- }
- }
- // coordinator establishes a connection to the coordinator for this consumer
- // group.
- func (cg *ConsumerGroup) coordinator() (coordinator, error) {
- // NOTE : could try to cache the coordinator to avoid the double connect
- // here. since consumer group balances happen infrequently and are
- // an expensive operation, we're not currently optimizing that case
- // in order to keep the code simpler.
- conn, err := cg.config.connect(cg.config.Dialer, cg.config.Brokers...)
- if err != nil {
- return nil, err
- }
- defer conn.Close()
- out, err := conn.findCoordinator(findCoordinatorRequestV0{
- CoordinatorKey: cg.config.ID,
- })
- if err == nil && out.ErrorCode != 0 {
- err = Error(out.ErrorCode)
- }
- if err != nil {
- return nil, err
- }
- address := net.JoinHostPort(out.Coordinator.Host, strconv.Itoa(int(out.Coordinator.Port)))
- return cg.config.connect(cg.config.Dialer, address)
- }
- // joinGroup attempts to join the reader to the consumer group.
- // Returns GroupMemberAssignments is this Reader was selected as
- // the leader. Otherwise, GroupMemberAssignments will be nil.
- //
- // Possible kafka error codes returned:
- // * GroupLoadInProgress:
- // * GroupCoordinatorNotAvailable:
- // * NotCoordinatorForGroup:
- // * InconsistentGroupProtocol:
- // * InvalidSessionTimeout:
- // * GroupAuthorizationFailed:
- func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
- request, err := cg.makeJoinGroupRequestV1(memberID)
- if err != nil {
- return "", 0, nil, err
- }
- response, err := conn.joinGroup(request)
- if err == nil && response.ErrorCode != 0 {
- err = Error(response.ErrorCode)
- }
- if err != nil {
- return "", 0, nil, err
- }
- memberID = response.MemberID
- generationID := response.GenerationID
- cg.withLogger(func(l Logger) {
- l.Printf("joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
- })
- var assignments GroupMemberAssignments
- if iAmLeader := response.MemberID == response.LeaderID; iAmLeader {
- v, err := cg.assignTopicPartitions(conn, response)
- if err != nil {
- return memberID, 0, nil, err
- }
- assignments = v
- cg.withLogger(func(l Logger) {
- for memberID, assignment := range assignments {
- for topic, partitions := range assignment {
- l.Printf("assigned member/topic/partitions %v/%v/%v", memberID, topic, partitions)
- }
- }
- })
- }
- cg.withLogger(func(l Logger) {
- l.Printf("joinGroup succeeded for response, %v. generationID=%v, memberID=%v", cg.config.ID, response.GenerationID, response.MemberID)
- })
- return memberID, generationID, assignments, nil
- }
- // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup
- // request.
- func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) {
- request := joinGroupRequestV1{
- GroupID: cg.config.ID,
- MemberID: memberID,
- SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond),
- RebalanceTimeout: int32(cg.config.RebalanceTimeout / time.Millisecond),
- ProtocolType: defaultProtocolType,
- }
- for _, balancer := range cg.config.GroupBalancers {
- userData, err := balancer.UserData()
- if err != nil {
- return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
- }
- request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
- ProtocolName: balancer.ProtocolName(),
- ProtocolMetadata: groupMetadata{
- Version: 1,
- Topics: cg.config.Topics,
- UserData: userData,
- }.bytes(),
- })
- }
- return request, nil
- }
- // assignTopicPartitions uses the selected GroupBalancer to assign members to
- // their various partitions.
- func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) {
- cg.withLogger(func(l Logger) {
- l.Printf("selected as leader for group, %s\n", cg.config.ID)
- })
- balancer, ok := findGroupBalancer(group.GroupProtocol, cg.config.GroupBalancers)
- if !ok {
- // NOTE : this shouldn't happen in practice...the broker should not
- // return successfully from joinGroup unless all members support
- // at least one common protocol.
- return nil, fmt.Errorf("unable to find selected balancer, %v, for group, %v", group.GroupProtocol, cg.config.ID)
- }
- members, err := cg.makeMemberProtocolMetadata(group.Members)
- if err != nil {
- return nil, err
- }
- topics := extractTopics(members)
- partitions, err := conn.readPartitions(topics...)
- // it's not a failure if the topic doesn't exist yet. it results in no
- // assignments for the topic. this matches the behavior of the official
- // clients: java, python, and librdkafka.
- // a topic watcher can trigger a rebalance when the topic comes into being.
- if err != nil && !errors.Is(err, UnknownTopicOrPartition) {
- return nil, err
- }
- cg.withLogger(func(l Logger) {
- l.Printf("using '%v' balancer to assign group, %v", group.GroupProtocol, cg.config.ID)
- for _, member := range members {
- l.Printf("found member: %v/%#v", member.ID, member.UserData)
- }
- for _, partition := range partitions {
- l.Printf("found topic/partition: %v/%v", partition.Topic, partition.ID)
- }
- })
- return balancer.AssignGroups(members, partitions), nil
- }
- // makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember.
- func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) {
- members := make([]GroupMember, 0, len(in))
- for _, item := range in {
- metadata := groupMetadata{}
- reader := bufio.NewReader(bytes.NewReader(item.MemberMetadata))
- if remain, err := (&metadata).readFrom(reader, len(item.MemberMetadata)); err != nil || remain != 0 {
- return nil, fmt.Errorf("unable to read metadata for member, %v: %w", item.MemberID, err)
- }
- members = append(members, GroupMember{
- ID: item.MemberID,
- Topics: metadata.Topics,
- UserData: metadata.UserData,
- })
- }
- return members, nil
- }
- // syncGroup completes the consumer group nextGeneration by accepting the
- // memberAssignments (if this Reader is the leader) and returning this
- // Readers subscriptions topic => partitions
- //
- // Possible kafka error codes returned:
- // * GroupCoordinatorNotAvailable:
- // * NotCoordinatorForGroup:
- // * IllegalGeneration:
- // * RebalanceInProgress:
- // * GroupAuthorizationFailed:
- func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
- request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
- response, err := conn.syncGroup(request)
- if err == nil && response.ErrorCode != 0 {
- err = Error(response.ErrorCode)
- }
- if err != nil {
- return nil, err
- }
- assignments := groupAssignment{}
- reader := bufio.NewReader(bytes.NewReader(response.MemberAssignments))
- if _, err := (&assignments).readFrom(reader, len(response.MemberAssignments)); err != nil {
- return nil, err
- }
- if len(assignments.Topics) == 0 {
- cg.withLogger(func(l Logger) {
- l.Printf("received empty assignments for group, %v as member %s for generation %d", cg.config.ID, memberID, generationID)
- })
- }
- cg.withLogger(func(l Logger) {
- l.Printf("sync group finished for group, %v", cg.config.ID)
- })
- return assignments.Topics, nil
- }
- func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID int32, memberAssignments GroupMemberAssignments) syncGroupRequestV0 {
- request := syncGroupRequestV0{
- GroupID: cg.config.ID,
- GenerationID: generationID,
- MemberID: memberID,
- }
- if memberAssignments != nil {
- request.GroupAssignments = make([]syncGroupRequestGroupAssignmentV0, 0, 1)
- for memberID, topics := range memberAssignments {
- topics32 := make(map[string][]int32)
- for topic, partitions := range topics {
- partitions32 := make([]int32, len(partitions))
- for i := range partitions {
- partitions32[i] = int32(partitions[i])
- }
- topics32[topic] = partitions32
- }
- request.GroupAssignments = append(request.GroupAssignments, syncGroupRequestGroupAssignmentV0{
- MemberID: memberID,
- MemberAssignments: groupAssignment{
- Version: 1,
- Topics: topics32,
- }.bytes(),
- })
- }
- cg.withLogger(func(logger Logger) {
- logger.Printf("Syncing %d assignments for generation %d as member %s", len(request.GroupAssignments), generationID, memberID)
- })
- }
- return request
- }
- func (cg *ConsumerGroup) fetchOffsets(conn coordinator, subs map[string][]int32) (map[string]map[int]int64, error) {
- req := offsetFetchRequestV1{
- GroupID: cg.config.ID,
- Topics: make([]offsetFetchRequestV1Topic, 0, len(cg.config.Topics)),
- }
- for _, topic := range cg.config.Topics {
- req.Topics = append(req.Topics, offsetFetchRequestV1Topic{
- Topic: topic,
- Partitions: subs[topic],
- })
- }
- offsets, err := conn.offsetFetch(req)
- if err != nil {
- return nil, err
- }
- offsetsByTopic := make(map[string]map[int]int64)
- for _, res := range offsets.Responses {
- offsetsByPartition := map[int]int64{}
- offsetsByTopic[res.Topic] = offsetsByPartition
- for _, pr := range res.PartitionResponses {
- for _, partition := range subs[res.Topic] {
- if partition == pr.Partition {
- offset := pr.Offset
- if offset < 0 {
- offset = cg.config.StartOffset
- }
- offsetsByPartition[int(partition)] = offset
- }
- }
- }
- }
- return offsetsByTopic, nil
- }
- func (cg *ConsumerGroup) makeAssignments(assignments map[string][]int32, offsets map[string]map[int]int64) map[string][]PartitionAssignment {
- topicAssignments := make(map[string][]PartitionAssignment)
- for _, topic := range cg.config.Topics {
- topicPartitions := assignments[topic]
- topicAssignments[topic] = make([]PartitionAssignment, 0, len(topicPartitions))
- for _, partition := range topicPartitions {
- var offset int64
- partitionOffsets, ok := offsets[topic]
- if ok {
- offset, ok = partitionOffsets[int(partition)]
- }
- if !ok {
- offset = cg.config.StartOffset
- }
- topicAssignments[topic] = append(topicAssignments[topic], PartitionAssignment{
- ID: int(partition),
- Offset: offset,
- })
- }
- }
- return topicAssignments
- }
- func (cg *ConsumerGroup) leaveGroup(memberID string) error {
- // don't attempt to leave the group if no memberID was ever assigned.
- if memberID == "" {
- return nil
- }
- cg.withLogger(func(log Logger) {
- log.Printf("Leaving group %s, member %s", cg.config.ID, memberID)
- })
- // IMPORTANT : leaveGroup establishes its own connection to the coordinator
- // because it is often called after some other operation failed.
- // said failure could be the result of connection-level issues,
- // so we want to re-establish the connection to ensure that we
- // are able to process the cleanup step.
- coordinator, err := cg.coordinator()
- if err != nil {
- return err
- }
- _, err = coordinator.leaveGroup(leaveGroupRequestV0{
- GroupID: cg.config.ID,
- MemberID: memberID,
- })
- if err != nil {
- cg.withErrorLogger(func(log Logger) {
- log.Printf("leave group failed for group, %v, and member, %v: %v", cg.config.ID, memberID, err)
- })
- }
- _ = coordinator.Close()
- return err
- }
- func (cg *ConsumerGroup) withLogger(do func(Logger)) {
- if cg.config.Logger != nil {
- do(cg.config.Logger)
- }
- }
- func (cg *ConsumerGroup) withErrorLogger(do func(Logger)) {
- if cg.config.ErrorLogger != nil {
- do(cg.config.ErrorLogger)
- } else {
- cg.withLogger(do)
- }
- }
|