consumergroup.go 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252
  1. package kafka
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "math"
  10. "net"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. // ErrGroupClosed is returned by ConsumerGroup.Next when the group has already
  17. // been closed.
  18. var ErrGroupClosed = errors.New("consumer group is closed")
  19. // ErrGenerationEnded is returned by the context.Context issued by the
  20. // Generation's Start function when the context has been closed.
  21. var ErrGenerationEnded = errors.New("consumer group generation has ended")
  22. const (
  23. // defaultProtocolType holds the default protocol type documented in the
  24. // kafka protocol
  25. //
  26. // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI
  27. defaultProtocolType = "consumer"
  28. // defaultHeartbeatInterval contains the default time between heartbeats. If
  29. // the coordinator does not receive a heartbeat within the session timeout interval,
  30. // the consumer will be considered dead and the coordinator will rebalance the
  31. // group.
  32. //
  33. // As a rule, the heartbeat interval should be no greater than 1/3 the session timeout.
  34. defaultHeartbeatInterval = 3 * time.Second
  35. // defaultSessionTimeout contains the default interval the coordinator will wait
  36. // for a heartbeat before marking a consumer as dead.
  37. defaultSessionTimeout = 30 * time.Second
  38. // defaultRebalanceTimeout contains the amount of time the coordinator will wait
  39. // for consumers to issue a join group once a rebalance has been requested.
  40. defaultRebalanceTimeout = 30 * time.Second
  41. // defaultJoinGroupBackoff is the amount of time to wait after a failed
  42. // consumer group generation before attempting to re-join.
  43. defaultJoinGroupBackoff = 5 * time.Second
  44. // defaultRetentionTime holds the length of time a the consumer group will be
  45. // saved by kafka. This value tells the broker to use its configured value.
  46. defaultRetentionTime = -1 * time.Millisecond
  47. // defaultPartitionWatchTime contains the amount of time the kafka-go will wait to
  48. // query the brokers looking for partition changes.
  49. defaultPartitionWatchTime = 5 * time.Second
  50. // defaultTimeout is the deadline to set when interacting with the
  51. // consumer group coordinator.
  52. defaultTimeout = 5 * time.Second
  53. )
  54. // ConsumerGroupConfig is a configuration object used to create new instances of
  55. // ConsumerGroup.
  56. type ConsumerGroupConfig struct {
  57. // ID is the consumer group ID. It must not be empty.
  58. ID string
  59. // The list of broker addresses used to connect to the kafka cluster. It
  60. // must not be empty.
  61. Brokers []string
  62. // An dialer used to open connections to the kafka server. This field is
  63. // optional, if nil, the default dialer is used instead.
  64. Dialer *Dialer
  65. // Topics is the list of topics that will be consumed by this group. It
  66. // will usually have a single value, but it is permitted to have multiple
  67. // for more complex use cases.
  68. Topics []string
  69. // GroupBalancers is the priority-ordered list of client-side consumer group
  70. // balancing strategies that will be offered to the coordinator. The first
  71. // strategy that all group members support will be chosen by the leader.
  72. //
  73. // Default: [Range, RoundRobin]
  74. GroupBalancers []GroupBalancer
  75. // HeartbeatInterval sets the optional frequency at which the reader sends the consumer
  76. // group heartbeat update.
  77. //
  78. // Default: 3s
  79. HeartbeatInterval time.Duration
  80. // PartitionWatchInterval indicates how often a reader checks for partition changes.
  81. // If a reader sees a partition change (such as a partition add) it will rebalance the group
  82. // picking up new partitions.
  83. //
  84. // Default: 5s
  85. PartitionWatchInterval time.Duration
  86. // WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
  87. // polling the brokers and rebalancing if any partition changes happen to the topic.
  88. WatchPartitionChanges bool
  89. // SessionTimeout optionally sets the length of time that may pass without a heartbeat
  90. // before the coordinator considers the consumer dead and initiates a rebalance.
  91. //
  92. // Default: 30s
  93. SessionTimeout time.Duration
  94. // RebalanceTimeout optionally sets the length of time the coordinator will wait
  95. // for members to join as part of a rebalance. For kafka servers under higher
  96. // load, it may be useful to set this value higher.
  97. //
  98. // Default: 30s
  99. RebalanceTimeout time.Duration
  100. // JoinGroupBackoff optionally sets the length of time to wait before re-joining
  101. // the consumer group after an error.
  102. //
  103. // Default: 5s
  104. JoinGroupBackoff time.Duration
  105. // RetentionTime optionally sets the length of time the consumer group will
  106. // be saved by the broker. -1 will disable the setting and leave the
  107. // retention up to the broker's offsets.retention.minutes property. By
  108. // default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >=
  109. // 2.0.
  110. //
  111. // Default: -1
  112. RetentionTime time.Duration
  113. // StartOffset determines from whence the consumer group should begin
  114. // consuming when it finds a partition without a committed offset. If
  115. // non-zero, it must be set to one of FirstOffset or LastOffset.
  116. //
  117. // Default: FirstOffset
  118. StartOffset int64
  119. // If not nil, specifies a logger used to report internal changes within the
  120. // reader.
  121. Logger Logger
  122. // ErrorLogger is the logger used to report errors. If nil, the reader falls
  123. // back to using Logger instead.
  124. ErrorLogger Logger
  125. // Timeout is the network timeout used when communicating with the consumer
  126. // group coordinator. This value should not be too small since errors
  127. // communicating with the broker will generally cause a consumer group
  128. // rebalance, and it's undesirable that a transient network error intoduce
  129. // that overhead. Similarly, it should not be too large or the consumer
  130. // group may be slow to respond to the coordinator failing over to another
  131. // broker.
  132. //
  133. // Default: 5s
  134. Timeout time.Duration
  135. // connect is a function for dialing the coordinator. This is provided for
  136. // unit testing to mock broker connections.
  137. connect func(dialer *Dialer, brokers ...string) (coordinator, error)
  138. }
  139. // Validate method validates ConsumerGroupConfig properties and sets relevant
  140. // defaults.
  141. func (config *ConsumerGroupConfig) Validate() error {
  142. if len(config.Brokers) == 0 {
  143. return errors.New("cannot create a consumer group with an empty list of broker addresses")
  144. }
  145. if len(config.Topics) == 0 {
  146. return errors.New("cannot create a consumer group without a topic")
  147. }
  148. if config.ID == "" {
  149. return errors.New("cannot create a consumer group without an ID")
  150. }
  151. if config.Dialer == nil {
  152. config.Dialer = DefaultDialer
  153. }
  154. if len(config.GroupBalancers) == 0 {
  155. config.GroupBalancers = []GroupBalancer{
  156. RangeGroupBalancer{},
  157. RoundRobinGroupBalancer{},
  158. }
  159. }
  160. if config.HeartbeatInterval == 0 {
  161. config.HeartbeatInterval = defaultHeartbeatInterval
  162. }
  163. if config.SessionTimeout == 0 {
  164. config.SessionTimeout = defaultSessionTimeout
  165. }
  166. if config.PartitionWatchInterval == 0 {
  167. config.PartitionWatchInterval = defaultPartitionWatchTime
  168. }
  169. if config.RebalanceTimeout == 0 {
  170. config.RebalanceTimeout = defaultRebalanceTimeout
  171. }
  172. if config.JoinGroupBackoff == 0 {
  173. config.JoinGroupBackoff = defaultJoinGroupBackoff
  174. }
  175. if config.RetentionTime == 0 {
  176. config.RetentionTime = defaultRetentionTime
  177. }
  178. if config.HeartbeatInterval < 0 || (config.HeartbeatInterval/time.Millisecond) >= math.MaxInt32 {
  179. return fmt.Errorf("HeartbeatInterval out of bounds: %d", config.HeartbeatInterval)
  180. }
  181. if config.SessionTimeout < 0 || (config.SessionTimeout/time.Millisecond) >= math.MaxInt32 {
  182. return fmt.Errorf("SessionTimeout out of bounds: %d", config.SessionTimeout)
  183. }
  184. if config.RebalanceTimeout < 0 || (config.RebalanceTimeout/time.Millisecond) >= math.MaxInt32 {
  185. return fmt.Errorf("RebalanceTimeout out of bounds: %d", config.RebalanceTimeout)
  186. }
  187. if config.JoinGroupBackoff < 0 || (config.JoinGroupBackoff/time.Millisecond) >= math.MaxInt32 {
  188. return fmt.Errorf("JoinGroupBackoff out of bounds: %d", config.JoinGroupBackoff)
  189. }
  190. if config.RetentionTime < 0 && config.RetentionTime != defaultRetentionTime {
  191. return fmt.Errorf("RetentionTime out of bounds: %d", config.RetentionTime)
  192. }
  193. if config.PartitionWatchInterval < 0 || (config.PartitionWatchInterval/time.Millisecond) >= math.MaxInt32 {
  194. return fmt.Errorf("PartitionWachInterval out of bounds %d", config.PartitionWatchInterval)
  195. }
  196. if config.StartOffset == 0 {
  197. config.StartOffset = FirstOffset
  198. }
  199. if config.StartOffset != FirstOffset && config.StartOffset != LastOffset {
  200. return fmt.Errorf("StartOffset is not valid %d", config.StartOffset)
  201. }
  202. if config.Timeout == 0 {
  203. config.Timeout = defaultTimeout
  204. }
  205. if config.connect == nil {
  206. config.connect = makeConnect(*config)
  207. }
  208. return nil
  209. }
  210. // PartitionAssignment represents the starting state of a partition that has
  211. // been assigned to a consumer.
  212. type PartitionAssignment struct {
  213. // ID is the partition ID.
  214. ID int
  215. // Offset is the initial offset at which this assignment begins. It will
  216. // either be an absolute offset if one has previously been committed for
  217. // the consumer group or a relative offset such as FirstOffset when this
  218. // is the first time the partition have been assigned to a member of the
  219. // group.
  220. Offset int64
  221. }
  222. // genCtx adapts the done channel of the generation to a context.Context. This
  223. // is used by Generation.Start so that we can pass a context to go routines
  224. // instead of passing around channels.
  225. type genCtx struct {
  226. gen *Generation
  227. }
  228. func (c genCtx) Done() <-chan struct{} {
  229. return c.gen.done
  230. }
  231. func (c genCtx) Err() error {
  232. select {
  233. case <-c.gen.done:
  234. return ErrGenerationEnded
  235. default:
  236. return nil
  237. }
  238. }
  239. func (c genCtx) Deadline() (time.Time, bool) {
  240. return time.Time{}, false
  241. }
  242. func (c genCtx) Value(interface{}) interface{} {
  243. return nil
  244. }
  245. // Generation represents a single consumer group generation. The generation
  246. // carries the topic+partition assignments for the given. It also provides
  247. // facilities for committing offsets and for running functions whose lifecycles
  248. // are bound to the generation.
  249. type Generation struct {
  250. // ID is the generation ID as assigned by the consumer group coordinator.
  251. ID int32
  252. // GroupID is the name of the consumer group.
  253. GroupID string
  254. // MemberID is the ID assigned to this consumer by the consumer group
  255. // coordinator.
  256. MemberID string
  257. // Assignments is the initial state of this Generation. The partition
  258. // assignments are grouped by topic.
  259. Assignments map[string][]PartitionAssignment
  260. conn coordinator
  261. // the following fields are used for process accounting to synchronize
  262. // between Start and close. lock protects all of them. done is closed
  263. // when the generation is ending in order to signal that the generation
  264. // should start self-desructing. closed protects against double-closing
  265. // the done chan. routines is a count of running go routines that have been
  266. // launched by Start. joined will be closed by the last go routine to exit.
  267. lock sync.Mutex
  268. done chan struct{}
  269. closed bool
  270. routines int
  271. joined chan struct{}
  272. retentionMillis int64
  273. log func(func(Logger))
  274. logError func(func(Logger))
  275. }
  276. // close stops the generation and waits for all functions launched via Start to
  277. // terminate.
  278. func (g *Generation) close() {
  279. g.lock.Lock()
  280. if !g.closed {
  281. close(g.done)
  282. g.closed = true
  283. }
  284. // determine whether any go routines are running that we need to wait for.
  285. // waiting needs to happen outside of the critical section.
  286. r := g.routines
  287. g.lock.Unlock()
  288. // NOTE: r will be zero if no go routines were ever launched. no need to
  289. // wait in that case.
  290. if r > 0 {
  291. <-g.joined
  292. }
  293. }
  294. // Start launches the provided function in a go routine and adds accounting such
  295. // that when the function exits, it stops the current generation (if not
  296. // already in the process of doing so).
  297. //
  298. // The provided function MUST support cancellation via the ctx argument and exit
  299. // in a timely manner once the ctx is complete. When the context is closed, the
  300. // context's Error() function will return ErrGenerationEnded.
  301. //
  302. // When closing out a generation, the consumer group will wait for all functions
  303. // launched by Start to exit before the group can move on and join the next
  304. // generation. If the function does not exit promptly, it will stop forward
  305. // progress for this consumer and potentially cause consumer group membership
  306. // churn.
  307. func (g *Generation) Start(fn func(ctx context.Context)) {
  308. g.lock.Lock()
  309. defer g.lock.Unlock()
  310. // this is an edge case: if the generation has already closed, then it's
  311. // possible that the close func has already waited on outstanding go
  312. // routines and exited.
  313. //
  314. // nonetheless, it's important to honor that the fn is invoked in case the
  315. // calling function is waiting e.g. on a channel send or a WaitGroup. in
  316. // such a case, fn should immediately exit because ctx.Err() will return
  317. // ErrGenerationEnded.
  318. if g.closed {
  319. go fn(genCtx{g})
  320. return
  321. }
  322. // register that there is one more go routine that's part of this gen.
  323. g.routines++
  324. go func() {
  325. fn(genCtx{g})
  326. g.lock.Lock()
  327. // shut down the generation as soon as one function exits. this is
  328. // different from close() in that it doesn't wait for all go routines in
  329. // the generation to exit.
  330. if !g.closed {
  331. close(g.done)
  332. g.closed = true
  333. }
  334. g.routines--
  335. // if this was the last go routine in the generation, close the joined
  336. // chan so that close() can exit if it's waiting.
  337. if g.routines == 0 {
  338. close(g.joined)
  339. }
  340. g.lock.Unlock()
  341. }()
  342. }
  343. // CommitOffsets commits the provided topic+partition+offset combos to the
  344. // consumer group coordinator. This can be used to reset the consumer to
  345. // explicit offsets.
  346. func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
  347. if len(offsets) == 0 {
  348. return nil
  349. }
  350. topics := make([]offsetCommitRequestV2Topic, 0, len(offsets))
  351. for topic, partitions := range offsets {
  352. t := offsetCommitRequestV2Topic{Topic: topic}
  353. for partition, offset := range partitions {
  354. t.Partitions = append(t.Partitions, offsetCommitRequestV2Partition{
  355. Partition: int32(partition),
  356. Offset: offset,
  357. })
  358. }
  359. topics = append(topics, t)
  360. }
  361. request := offsetCommitRequestV2{
  362. GroupID: g.GroupID,
  363. GenerationID: g.ID,
  364. MemberID: g.MemberID,
  365. RetentionTime: g.retentionMillis,
  366. Topics: topics,
  367. }
  368. _, err := g.conn.offsetCommit(request)
  369. if err == nil {
  370. // if logging is enabled, print out the partitions that were committed.
  371. g.log(func(l Logger) {
  372. var report []string
  373. for _, t := range request.Topics {
  374. report = append(report, fmt.Sprintf("\ttopic: %s", t.Topic))
  375. for _, p := range t.Partitions {
  376. report = append(report, fmt.Sprintf("\t\tpartition %d: %d", p.Partition, p.Offset))
  377. }
  378. }
  379. l.Printf("committed offsets for group %s: \n%s", g.GroupID, strings.Join(report, "\n"))
  380. })
  381. }
  382. return err
  383. }
  384. // heartbeatLoop checks in with the consumer group coordinator at the provided
  385. // interval. It exits if it ever encounters an error, which would signal the
  386. // end of the generation.
  387. func (g *Generation) heartbeatLoop(interval time.Duration) {
  388. g.Start(func(ctx context.Context) {
  389. g.log(func(l Logger) {
  390. l.Printf("started heartbeat for group, %v [%v]", g.GroupID, interval)
  391. })
  392. defer g.log(func(l Logger) {
  393. l.Printf("stopped heartbeat for group %s\n", g.GroupID)
  394. })
  395. ticker := time.NewTicker(interval)
  396. defer ticker.Stop()
  397. for {
  398. select {
  399. case <-ctx.Done():
  400. return
  401. case <-ticker.C:
  402. _, err := g.conn.heartbeat(heartbeatRequestV0{
  403. GroupID: g.GroupID,
  404. GenerationID: g.ID,
  405. MemberID: g.MemberID,
  406. })
  407. if err != nil {
  408. return
  409. }
  410. }
  411. }
  412. })
  413. }
  414. // partitionWatcher queries kafka and watches for partition changes, triggering
  415. // a rebalance if changes are found. Similar to heartbeat it's okay to return on
  416. // error here as if you are unable to ask a broker for basic metadata you're in
  417. // a bad spot and should rebalance. Commonly you will see an error here if there
  418. // is a problem with the connection to the coordinator and a rebalance will
  419. // establish a new connection to the coordinator.
  420. func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
  421. g.Start(func(ctx context.Context) {
  422. g.log(func(l Logger) {
  423. l.Printf("started partition watcher for group, %v, topic %v [%v]", g.GroupID, topic, interval)
  424. })
  425. defer g.log(func(l Logger) {
  426. l.Printf("stopped partition watcher for group, %v, topic %v", g.GroupID, topic)
  427. })
  428. ticker := time.NewTicker(interval)
  429. defer ticker.Stop()
  430. ops, err := g.conn.readPartitions(topic)
  431. if err != nil {
  432. g.logError(func(l Logger) {
  433. l.Printf("Problem getting partitions during startup, %v\n, Returning and setting up nextGeneration", err)
  434. })
  435. return
  436. }
  437. oParts := len(ops)
  438. for {
  439. select {
  440. case <-ctx.Done():
  441. return
  442. case <-ticker.C:
  443. ops, err := g.conn.readPartitions(topic)
  444. switch {
  445. case err == nil, errors.Is(err, UnknownTopicOrPartition):
  446. if len(ops) != oParts {
  447. g.log(func(l Logger) {
  448. l.Printf("Partition changes found, reblancing group: %v.", g.GroupID)
  449. })
  450. return
  451. }
  452. default:
  453. g.logError(func(l Logger) {
  454. l.Printf("Problem getting partitions while checking for changes, %v", err)
  455. })
  456. var kafkaError Error
  457. if errors.As(err, &kafkaError) {
  458. continue
  459. }
  460. // other errors imply that we lost the connection to the coordinator, so we
  461. // should abort and reconnect.
  462. return
  463. }
  464. }
  465. }
  466. })
  467. }
  468. // coordinator is a subset of the functionality in Conn in order to facilitate
  469. // testing the consumer group...especially for error conditions that are
  470. // difficult to instigate with a live broker running in docker.
  471. type coordinator interface {
  472. io.Closer
  473. findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error)
  474. joinGroup(joinGroupRequestV1) (joinGroupResponseV1, error)
  475. syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error)
  476. leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error)
  477. heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error)
  478. offsetFetch(offsetFetchRequestV1) (offsetFetchResponseV1, error)
  479. offsetCommit(offsetCommitRequestV2) (offsetCommitResponseV2, error)
  480. readPartitions(...string) ([]Partition, error)
  481. }
  482. // timeoutCoordinator wraps the Conn to ensure that every operation has a
  483. // deadline. Otherwise, it would be possible for requests to block indefinitely
  484. // if the remote server never responds. There are many spots where the consumer
  485. // group needs to interact with the broker, so it feels less error prone to
  486. // factor all of the deadline management into this shared location as opposed to
  487. // peppering it all through where the code actually interacts with the broker.
  488. type timeoutCoordinator struct {
  489. timeout time.Duration
  490. sessionTimeout time.Duration
  491. rebalanceTimeout time.Duration
  492. conn *Conn
  493. }
  494. func (t *timeoutCoordinator) Close() error {
  495. return t.conn.Close()
  496. }
  497. func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoordinatorResponseV0, error) {
  498. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  499. return findCoordinatorResponseV0{}, err
  500. }
  501. return t.conn.findCoordinator(req)
  502. }
  503. func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) {
  504. // in the case of join group, the consumer group coordinator may wait up
  505. // to rebalance timeout in order to wait for all members to join.
  506. if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil {
  507. return joinGroupResponseV1{}, err
  508. }
  509. return t.conn.joinGroup(req)
  510. }
  511. func (t *timeoutCoordinator) syncGroup(req syncGroupRequestV0) (syncGroupResponseV0, error) {
  512. // in the case of sync group, the consumer group leader is given up to
  513. // the session timeout to respond before the coordinator will give up.
  514. if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.sessionTimeout)); err != nil {
  515. return syncGroupResponseV0{}, err
  516. }
  517. return t.conn.syncGroup(req)
  518. }
  519. func (t *timeoutCoordinator) leaveGroup(req leaveGroupRequestV0) (leaveGroupResponseV0, error) {
  520. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  521. return leaveGroupResponseV0{}, err
  522. }
  523. return t.conn.leaveGroup(req)
  524. }
  525. func (t *timeoutCoordinator) heartbeat(req heartbeatRequestV0) (heartbeatResponseV0, error) {
  526. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  527. return heartbeatResponseV0{}, err
  528. }
  529. return t.conn.heartbeat(req)
  530. }
  531. func (t *timeoutCoordinator) offsetFetch(req offsetFetchRequestV1) (offsetFetchResponseV1, error) {
  532. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  533. return offsetFetchResponseV1{}, err
  534. }
  535. return t.conn.offsetFetch(req)
  536. }
  537. func (t *timeoutCoordinator) offsetCommit(req offsetCommitRequestV2) (offsetCommitResponseV2, error) {
  538. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  539. return offsetCommitResponseV2{}, err
  540. }
  541. return t.conn.offsetCommit(req)
  542. }
  543. func (t *timeoutCoordinator) readPartitions(topics ...string) ([]Partition, error) {
  544. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  545. return nil, err
  546. }
  547. return t.conn.ReadPartitions(topics...)
  548. }
  549. // NewConsumerGroup creates a new ConsumerGroup. It returns an error if the
  550. // provided configuration is invalid. It does not attempt to connect to the
  551. // Kafka cluster. That happens asynchronously, and any errors will be reported
  552. // by Next.
  553. func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) {
  554. if err := config.Validate(); err != nil {
  555. return nil, err
  556. }
  557. cg := &ConsumerGroup{
  558. config: config,
  559. next: make(chan *Generation),
  560. errs: make(chan error),
  561. done: make(chan struct{}),
  562. }
  563. cg.wg.Add(1)
  564. go func() {
  565. cg.run()
  566. cg.wg.Done()
  567. }()
  568. return cg, nil
  569. }
  570. // ConsumerGroup models a Kafka consumer group. A caller doesn't interact with
  571. // the group directly. Rather, they interact with a Generation. Every time a
  572. // member enters or exits the group, it results in a new Generation. The
  573. // Generation is where partition assignments and offset management occur.
  574. // Callers will use Next to get a handle to the Generation.
  575. type ConsumerGroup struct {
  576. config ConsumerGroupConfig
  577. next chan *Generation
  578. errs chan error
  579. closeOnce sync.Once
  580. wg sync.WaitGroup
  581. done chan struct{}
  582. }
  583. // Close terminates the current generation by causing this member to leave and
  584. // releases all local resources used to participate in the consumer group.
  585. // Close will also end the current generation if it is still active.
  586. func (cg *ConsumerGroup) Close() error {
  587. cg.closeOnce.Do(func() {
  588. close(cg.done)
  589. })
  590. cg.wg.Wait()
  591. return nil
  592. }
  593. // Next waits for the next consumer group generation. There will never be two
  594. // active generations. Next will never return a new generation until the
  595. // previous one has completed.
  596. //
  597. // If there are errors setting up the next generation, they will be surfaced
  598. // here.
  599. //
  600. // If the ConsumerGroup has been closed, then Next will return ErrGroupClosed.
  601. func (cg *ConsumerGroup) Next(ctx context.Context) (*Generation, error) {
  602. select {
  603. case <-ctx.Done():
  604. return nil, ctx.Err()
  605. case <-cg.done:
  606. return nil, ErrGroupClosed
  607. case err := <-cg.errs:
  608. return nil, err
  609. case next := <-cg.next:
  610. return next, nil
  611. }
  612. }
  613. func (cg *ConsumerGroup) run() {
  614. // the memberID is the only piece of information that is maintained across
  615. // generations. it starts empty and will be assigned on the first nextGeneration
  616. // when the joinGroup request is processed. it may change again later if
  617. // the CG coordinator fails over or if the member is evicted. otherwise, it
  618. // will be constant for the lifetime of this group.
  619. var memberID string
  620. var err error
  621. for {
  622. memberID, err = cg.nextGeneration(memberID)
  623. // backoff will be set if this go routine should sleep before continuing
  624. // to the next generation. it will be non-nil in the case of an error
  625. // joining or syncing the group.
  626. var backoff <-chan time.Time
  627. switch {
  628. case err == nil:
  629. // no error...the previous generation finished normally.
  630. continue
  631. case errors.Is(err, ErrGroupClosed):
  632. // the CG has been closed...leave the group and exit loop.
  633. _ = cg.leaveGroup(memberID)
  634. return
  635. case errors.Is(err, RebalanceInProgress):
  636. // in case of a RebalanceInProgress, don't leave the group or
  637. // change the member ID, but report the error. the next attempt
  638. // to join the group will then be subject to the rebalance
  639. // timeout, so the broker will be responsible for throttling
  640. // this loop.
  641. default:
  642. // leave the group and report the error if we had gotten far
  643. // enough so as to have a member ID. also clear the member id
  644. // so we don't attempt to use it again. in order to avoid
  645. // a tight error loop, backoff before the next attempt to join
  646. // the group.
  647. _ = cg.leaveGroup(memberID)
  648. memberID = ""
  649. backoff = time.After(cg.config.JoinGroupBackoff)
  650. }
  651. // ensure that we exit cleanly in case the CG is done and no one is
  652. // waiting to receive on the unbuffered error channel.
  653. select {
  654. case <-cg.done:
  655. return
  656. case cg.errs <- err:
  657. }
  658. // backoff if needed, being sure to exit cleanly if the CG is done.
  659. if backoff != nil {
  660. select {
  661. case <-cg.done:
  662. // exit cleanly if the group is closed.
  663. return
  664. case <-backoff:
  665. }
  666. }
  667. }
  668. }
  669. func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
  670. // get a new connection to the coordinator on each loop. the previous
  671. // generation could have exited due to losing the connection, so this
  672. // ensures that we always have a clean starting point. it means we will
  673. // re-connect in certain cases, but that shouldn't be an issue given that
  674. // rebalances are relatively infrequent under normal operating
  675. // conditions.
  676. conn, err := cg.coordinator()
  677. if err != nil {
  678. cg.withErrorLogger(func(log Logger) {
  679. log.Printf("Unable to establish connection to consumer group coordinator for group %s: %v", cg.config.ID, err)
  680. })
  681. return memberID, err // a prior memberID may still be valid, so don't return ""
  682. }
  683. defer conn.Close()
  684. var generationID int32
  685. var groupAssignments GroupMemberAssignments
  686. var assignments map[string][]int32
  687. // join group. this will join the group and prepare assignments if our
  688. // consumer is elected leader. it may also change or assign the member ID.
  689. memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID)
  690. if err != nil {
  691. cg.withErrorLogger(func(log Logger) {
  692. log.Printf("Failed to join group %s: %v", cg.config.ID, err)
  693. })
  694. return memberID, err
  695. }
  696. cg.withLogger(func(log Logger) {
  697. log.Printf("Joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
  698. })
  699. // sync group
  700. assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments)
  701. if err != nil {
  702. cg.withErrorLogger(func(log Logger) {
  703. log.Printf("Failed to sync group %s: %v", cg.config.ID, err)
  704. })
  705. return memberID, err
  706. }
  707. // fetch initial offsets.
  708. var offsets map[string]map[int]int64
  709. offsets, err = cg.fetchOffsets(conn, assignments)
  710. if err != nil {
  711. cg.withErrorLogger(func(log Logger) {
  712. log.Printf("Failed to fetch offsets for group %s: %v", cg.config.ID, err)
  713. })
  714. return memberID, err
  715. }
  716. // create the generation.
  717. gen := Generation{
  718. ID: generationID,
  719. GroupID: cg.config.ID,
  720. MemberID: memberID,
  721. Assignments: cg.makeAssignments(assignments, offsets),
  722. conn: conn,
  723. done: make(chan struct{}),
  724. joined: make(chan struct{}),
  725. retentionMillis: int64(cg.config.RetentionTime / time.Millisecond),
  726. log: cg.withLogger,
  727. logError: cg.withErrorLogger,
  728. }
  729. // spawn all of the go routines required to facilitate this generation. if
  730. // any of these functions exit, then the generation is determined to be
  731. // complete.
  732. gen.heartbeatLoop(cg.config.HeartbeatInterval)
  733. if cg.config.WatchPartitionChanges {
  734. for _, topic := range cg.config.Topics {
  735. gen.partitionWatcher(cg.config.PartitionWatchInterval, topic)
  736. }
  737. }
  738. // make this generation available for retrieval. if the CG is closed before
  739. // we can send it on the channel, exit. that case is required b/c the next
  740. // channel is unbuffered. if the caller to Next has already bailed because
  741. // it's own teardown logic has been invoked, this would deadlock otherwise.
  742. select {
  743. case <-cg.done:
  744. gen.close()
  745. return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
  746. case cg.next <- &gen:
  747. }
  748. // wait for generation to complete. if the CG is closed before the
  749. // generation is finished, exit and leave the group.
  750. select {
  751. case <-cg.done:
  752. gen.close()
  753. return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
  754. case <-gen.done:
  755. // time for next generation! make sure all the current go routines exit
  756. // before continuing onward.
  757. gen.close()
  758. return memberID, nil
  759. }
  760. }
  761. // connect returns a connection to ANY broker.
  762. func makeConnect(config ConsumerGroupConfig) func(dialer *Dialer, brokers ...string) (coordinator, error) {
  763. return func(dialer *Dialer, brokers ...string) (coordinator, error) {
  764. var err error
  765. for _, broker := range brokers {
  766. var conn *Conn
  767. if conn, err = dialer.Dial("tcp", broker); err == nil {
  768. return &timeoutCoordinator{
  769. conn: conn,
  770. timeout: config.Timeout,
  771. sessionTimeout: config.SessionTimeout,
  772. rebalanceTimeout: config.RebalanceTimeout,
  773. }, nil
  774. }
  775. }
  776. return nil, err // err will be non-nil
  777. }
  778. }
  779. // coordinator establishes a connection to the coordinator for this consumer
  780. // group.
  781. func (cg *ConsumerGroup) coordinator() (coordinator, error) {
  782. // NOTE : could try to cache the coordinator to avoid the double connect
  783. // here. since consumer group balances happen infrequently and are
  784. // an expensive operation, we're not currently optimizing that case
  785. // in order to keep the code simpler.
  786. conn, err := cg.config.connect(cg.config.Dialer, cg.config.Brokers...)
  787. if err != nil {
  788. return nil, err
  789. }
  790. defer conn.Close()
  791. out, err := conn.findCoordinator(findCoordinatorRequestV0{
  792. CoordinatorKey: cg.config.ID,
  793. })
  794. if err == nil && out.ErrorCode != 0 {
  795. err = Error(out.ErrorCode)
  796. }
  797. if err != nil {
  798. return nil, err
  799. }
  800. address := net.JoinHostPort(out.Coordinator.Host, strconv.Itoa(int(out.Coordinator.Port)))
  801. return cg.config.connect(cg.config.Dialer, address)
  802. }
  803. // joinGroup attempts to join the reader to the consumer group.
  804. // Returns GroupMemberAssignments is this Reader was selected as
  805. // the leader. Otherwise, GroupMemberAssignments will be nil.
  806. //
  807. // Possible kafka error codes returned:
  808. // * GroupLoadInProgress:
  809. // * GroupCoordinatorNotAvailable:
  810. // * NotCoordinatorForGroup:
  811. // * InconsistentGroupProtocol:
  812. // * InvalidSessionTimeout:
  813. // * GroupAuthorizationFailed:
  814. func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
  815. request, err := cg.makeJoinGroupRequestV1(memberID)
  816. if err != nil {
  817. return "", 0, nil, err
  818. }
  819. response, err := conn.joinGroup(request)
  820. if err == nil && response.ErrorCode != 0 {
  821. err = Error(response.ErrorCode)
  822. }
  823. if err != nil {
  824. return "", 0, nil, err
  825. }
  826. memberID = response.MemberID
  827. generationID := response.GenerationID
  828. cg.withLogger(func(l Logger) {
  829. l.Printf("joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
  830. })
  831. var assignments GroupMemberAssignments
  832. if iAmLeader := response.MemberID == response.LeaderID; iAmLeader {
  833. v, err := cg.assignTopicPartitions(conn, response)
  834. if err != nil {
  835. return memberID, 0, nil, err
  836. }
  837. assignments = v
  838. cg.withLogger(func(l Logger) {
  839. for memberID, assignment := range assignments {
  840. for topic, partitions := range assignment {
  841. l.Printf("assigned member/topic/partitions %v/%v/%v", memberID, topic, partitions)
  842. }
  843. }
  844. })
  845. }
  846. cg.withLogger(func(l Logger) {
  847. l.Printf("joinGroup succeeded for response, %v. generationID=%v, memberID=%v", cg.config.ID, response.GenerationID, response.MemberID)
  848. })
  849. return memberID, generationID, assignments, nil
  850. }
  851. // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup
  852. // request.
  853. func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) {
  854. request := joinGroupRequestV1{
  855. GroupID: cg.config.ID,
  856. MemberID: memberID,
  857. SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond),
  858. RebalanceTimeout: int32(cg.config.RebalanceTimeout / time.Millisecond),
  859. ProtocolType: defaultProtocolType,
  860. }
  861. for _, balancer := range cg.config.GroupBalancers {
  862. userData, err := balancer.UserData()
  863. if err != nil {
  864. return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
  865. }
  866. request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
  867. ProtocolName: balancer.ProtocolName(),
  868. ProtocolMetadata: groupMetadata{
  869. Version: 1,
  870. Topics: cg.config.Topics,
  871. UserData: userData,
  872. }.bytes(),
  873. })
  874. }
  875. return request, nil
  876. }
  877. // assignTopicPartitions uses the selected GroupBalancer to assign members to
  878. // their various partitions.
  879. func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) {
  880. cg.withLogger(func(l Logger) {
  881. l.Printf("selected as leader for group, %s\n", cg.config.ID)
  882. })
  883. balancer, ok := findGroupBalancer(group.GroupProtocol, cg.config.GroupBalancers)
  884. if !ok {
  885. // NOTE : this shouldn't happen in practice...the broker should not
  886. // return successfully from joinGroup unless all members support
  887. // at least one common protocol.
  888. return nil, fmt.Errorf("unable to find selected balancer, %v, for group, %v", group.GroupProtocol, cg.config.ID)
  889. }
  890. members, err := cg.makeMemberProtocolMetadata(group.Members)
  891. if err != nil {
  892. return nil, err
  893. }
  894. topics := extractTopics(members)
  895. partitions, err := conn.readPartitions(topics...)
  896. // it's not a failure if the topic doesn't exist yet. it results in no
  897. // assignments for the topic. this matches the behavior of the official
  898. // clients: java, python, and librdkafka.
  899. // a topic watcher can trigger a rebalance when the topic comes into being.
  900. if err != nil && !errors.Is(err, UnknownTopicOrPartition) {
  901. return nil, err
  902. }
  903. cg.withLogger(func(l Logger) {
  904. l.Printf("using '%v' balancer to assign group, %v", group.GroupProtocol, cg.config.ID)
  905. for _, member := range members {
  906. l.Printf("found member: %v/%#v", member.ID, member.UserData)
  907. }
  908. for _, partition := range partitions {
  909. l.Printf("found topic/partition: %v/%v", partition.Topic, partition.ID)
  910. }
  911. })
  912. return balancer.AssignGroups(members, partitions), nil
  913. }
  914. // makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember.
  915. func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) {
  916. members := make([]GroupMember, 0, len(in))
  917. for _, item := range in {
  918. metadata := groupMetadata{}
  919. reader := bufio.NewReader(bytes.NewReader(item.MemberMetadata))
  920. if remain, err := (&metadata).readFrom(reader, len(item.MemberMetadata)); err != nil || remain != 0 {
  921. return nil, fmt.Errorf("unable to read metadata for member, %v: %w", item.MemberID, err)
  922. }
  923. members = append(members, GroupMember{
  924. ID: item.MemberID,
  925. Topics: metadata.Topics,
  926. UserData: metadata.UserData,
  927. })
  928. }
  929. return members, nil
  930. }
  931. // syncGroup completes the consumer group nextGeneration by accepting the
  932. // memberAssignments (if this Reader is the leader) and returning this
  933. // Readers subscriptions topic => partitions
  934. //
  935. // Possible kafka error codes returned:
  936. // * GroupCoordinatorNotAvailable:
  937. // * NotCoordinatorForGroup:
  938. // * IllegalGeneration:
  939. // * RebalanceInProgress:
  940. // * GroupAuthorizationFailed:
  941. func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
  942. request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
  943. response, err := conn.syncGroup(request)
  944. if err == nil && response.ErrorCode != 0 {
  945. err = Error(response.ErrorCode)
  946. }
  947. if err != nil {
  948. return nil, err
  949. }
  950. assignments := groupAssignment{}
  951. reader := bufio.NewReader(bytes.NewReader(response.MemberAssignments))
  952. if _, err := (&assignments).readFrom(reader, len(response.MemberAssignments)); err != nil {
  953. return nil, err
  954. }
  955. if len(assignments.Topics) == 0 {
  956. cg.withLogger(func(l Logger) {
  957. l.Printf("received empty assignments for group, %v as member %s for generation %d", cg.config.ID, memberID, generationID)
  958. })
  959. }
  960. cg.withLogger(func(l Logger) {
  961. l.Printf("sync group finished for group, %v", cg.config.ID)
  962. })
  963. return assignments.Topics, nil
  964. }
  965. func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID int32, memberAssignments GroupMemberAssignments) syncGroupRequestV0 {
  966. request := syncGroupRequestV0{
  967. GroupID: cg.config.ID,
  968. GenerationID: generationID,
  969. MemberID: memberID,
  970. }
  971. if memberAssignments != nil {
  972. request.GroupAssignments = make([]syncGroupRequestGroupAssignmentV0, 0, 1)
  973. for memberID, topics := range memberAssignments {
  974. topics32 := make(map[string][]int32)
  975. for topic, partitions := range topics {
  976. partitions32 := make([]int32, len(partitions))
  977. for i := range partitions {
  978. partitions32[i] = int32(partitions[i])
  979. }
  980. topics32[topic] = partitions32
  981. }
  982. request.GroupAssignments = append(request.GroupAssignments, syncGroupRequestGroupAssignmentV0{
  983. MemberID: memberID,
  984. MemberAssignments: groupAssignment{
  985. Version: 1,
  986. Topics: topics32,
  987. }.bytes(),
  988. })
  989. }
  990. cg.withLogger(func(logger Logger) {
  991. logger.Printf("Syncing %d assignments for generation %d as member %s", len(request.GroupAssignments), generationID, memberID)
  992. })
  993. }
  994. return request
  995. }
  996. func (cg *ConsumerGroup) fetchOffsets(conn coordinator, subs map[string][]int32) (map[string]map[int]int64, error) {
  997. req := offsetFetchRequestV1{
  998. GroupID: cg.config.ID,
  999. Topics: make([]offsetFetchRequestV1Topic, 0, len(cg.config.Topics)),
  1000. }
  1001. for _, topic := range cg.config.Topics {
  1002. req.Topics = append(req.Topics, offsetFetchRequestV1Topic{
  1003. Topic: topic,
  1004. Partitions: subs[topic],
  1005. })
  1006. }
  1007. offsets, err := conn.offsetFetch(req)
  1008. if err != nil {
  1009. return nil, err
  1010. }
  1011. offsetsByTopic := make(map[string]map[int]int64)
  1012. for _, res := range offsets.Responses {
  1013. offsetsByPartition := map[int]int64{}
  1014. offsetsByTopic[res.Topic] = offsetsByPartition
  1015. for _, pr := range res.PartitionResponses {
  1016. for _, partition := range subs[res.Topic] {
  1017. if partition == pr.Partition {
  1018. offset := pr.Offset
  1019. if offset < 0 {
  1020. offset = cg.config.StartOffset
  1021. }
  1022. offsetsByPartition[int(partition)] = offset
  1023. }
  1024. }
  1025. }
  1026. }
  1027. return offsetsByTopic, nil
  1028. }
  1029. func (cg *ConsumerGroup) makeAssignments(assignments map[string][]int32, offsets map[string]map[int]int64) map[string][]PartitionAssignment {
  1030. topicAssignments := make(map[string][]PartitionAssignment)
  1031. for _, topic := range cg.config.Topics {
  1032. topicPartitions := assignments[topic]
  1033. topicAssignments[topic] = make([]PartitionAssignment, 0, len(topicPartitions))
  1034. for _, partition := range topicPartitions {
  1035. var offset int64
  1036. partitionOffsets, ok := offsets[topic]
  1037. if ok {
  1038. offset, ok = partitionOffsets[int(partition)]
  1039. }
  1040. if !ok {
  1041. offset = cg.config.StartOffset
  1042. }
  1043. topicAssignments[topic] = append(topicAssignments[topic], PartitionAssignment{
  1044. ID: int(partition),
  1045. Offset: offset,
  1046. })
  1047. }
  1048. }
  1049. return topicAssignments
  1050. }
  1051. func (cg *ConsumerGroup) leaveGroup(memberID string) error {
  1052. // don't attempt to leave the group if no memberID was ever assigned.
  1053. if memberID == "" {
  1054. return nil
  1055. }
  1056. cg.withLogger(func(log Logger) {
  1057. log.Printf("Leaving group %s, member %s", cg.config.ID, memberID)
  1058. })
  1059. // IMPORTANT : leaveGroup establishes its own connection to the coordinator
  1060. // because it is often called after some other operation failed.
  1061. // said failure could be the result of connection-level issues,
  1062. // so we want to re-establish the connection to ensure that we
  1063. // are able to process the cleanup step.
  1064. coordinator, err := cg.coordinator()
  1065. if err != nil {
  1066. return err
  1067. }
  1068. _, err = coordinator.leaveGroup(leaveGroupRequestV0{
  1069. GroupID: cg.config.ID,
  1070. MemberID: memberID,
  1071. })
  1072. if err != nil {
  1073. cg.withErrorLogger(func(log Logger) {
  1074. log.Printf("leave group failed for group, %v, and member, %v: %v", cg.config.ID, memberID, err)
  1075. })
  1076. }
  1077. _ = coordinator.Close()
  1078. return err
  1079. }
  1080. func (cg *ConsumerGroup) withLogger(do func(Logger)) {
  1081. if cg.config.Logger != nil {
  1082. do(cg.config.Logger)
  1083. }
  1084. }
  1085. func (cg *ConsumerGroup) withErrorLogger(do func(Logger)) {
  1086. if cg.config.ErrorLogger != nil {
  1087. do(cg.config.ErrorLogger)
  1088. } else {
  1089. cg.withLogger(do)
  1090. }
  1091. }