consumergroup.go 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199
  1. package kafka
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "math"
  10. "net"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. // ErrGroupClosed is returned by ConsumerGroup.Next when the group has already
  17. // been closed.
  18. var ErrGroupClosed = errors.New("consumer group is closed")
  19. // ErrGenerationEnded is returned by the context.Context issued by the
  20. // Generation's Start function when the context has been closed.
  21. var ErrGenerationEnded = errors.New("consumer group generation has ended")
  22. const (
  23. // defaultProtocolType holds the default protocol type documented in the
  24. // kafka protocol
  25. //
  26. // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI
  27. defaultProtocolType = "consumer"
  28. // defaultHeartbeatInterval contains the default time between heartbeats. If
  29. // the coordinator does not receive a heartbeat within the session timeout interval,
  30. // the consumer will be considered dead and the coordinator will rebalance the
  31. // group.
  32. //
  33. // As a rule, the heartbeat interval should be no greater than 1/3 the session timeout
  34. defaultHeartbeatInterval = 3 * time.Second
  35. // defaultSessionTimeout contains the default interval the coordinator will wait
  36. // for a heartbeat before marking a consumer as dead
  37. defaultSessionTimeout = 30 * time.Second
  38. // defaultRebalanceTimeout contains the amount of time the coordinator will wait
  39. // for consumers to issue a join group once a rebalance has been requested
  40. defaultRebalanceTimeout = 30 * time.Second
  41. // defaultJoinGroupBackoff is the amount of time to wait after a failed
  42. // consumer group generation before attempting to re-join.
  43. defaultJoinGroupBackoff = 5 * time.Second
  44. // defaultRetentionTime holds the length of time a the consumer group will be
  45. // saved by kafka. This value tells the broker to use its configured value.
  46. defaultRetentionTime = -1 * time.Millisecond
  47. // defaultPartitionWatchTime contains the amount of time the kafka-go will wait to
  48. // query the brokers looking for partition changes.
  49. defaultPartitionWatchTime = 5 * time.Second
  50. // defaultTimeout is the deadline to set when interacting with the
  51. // consumer group coordinator.
  52. defaultTimeout = 5 * time.Second
  53. )
  54. // ConsumerGroupConfig is a configuration object used to create new instances of
  55. // ConsumerGroup.
  56. type ConsumerGroupConfig struct {
  57. // ID is the consumer group ID. It must not be empty.
  58. ID string
  59. // The list of broker addresses used to connect to the kafka cluster. It
  60. // must not be empty.
  61. Brokers []string
  62. // An dialer used to open connections to the kafka server. This field is
  63. // optional, if nil, the default dialer is used instead.
  64. Dialer *Dialer
  65. // Topics is the list of topics that will be consumed by this group. It
  66. // will usually have a single value, but it is permitted to have multiple
  67. // for more complex use cases.
  68. Topics []string
  69. // GroupBalancers is the priority-ordered list of client-side consumer group
  70. // balancing strategies that will be offered to the coordinator. The first
  71. // strategy that all group members support will be chosen by the leader.
  72. //
  73. // Default: [Range, RoundRobin]
  74. GroupBalancers []GroupBalancer
  75. // HeartbeatInterval sets the optional frequency at which the reader sends the consumer
  76. // group heartbeat update.
  77. //
  78. // Default: 3s
  79. HeartbeatInterval time.Duration
  80. // PartitionWatchInterval indicates how often a reader checks for partition changes.
  81. // If a reader sees a partition change (such as a partition add) it will rebalance the group
  82. // picking up new partitions.
  83. //
  84. // Default: 5s
  85. PartitionWatchInterval time.Duration
  86. // WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
  87. // polling the brokers and rebalancing if any partition changes happen to the topic.
  88. WatchPartitionChanges bool
  89. // SessionTimeout optionally sets the length of time that may pass without a heartbeat
  90. // before the coordinator considers the consumer dead and initiates a rebalance.
  91. //
  92. // Default: 30s
  93. SessionTimeout time.Duration
  94. // RebalanceTimeout optionally sets the length of time the coordinator will wait
  95. // for members to join as part of a rebalance. For kafka servers under higher
  96. // load, it may be useful to set this value higher.
  97. //
  98. // Default: 30s
  99. RebalanceTimeout time.Duration
  100. // JoinGroupBackoff optionally sets the length of time to wait before re-joining
  101. // the consumer group after an error.
  102. //
  103. // Default: 5s
  104. JoinGroupBackoff time.Duration
  105. // RetentionTime optionally sets the length of time the consumer group will
  106. // be saved by the broker. -1 will disable the setting and leave the
  107. // retention up to the broker's offsets.retention.minutes property. By
  108. // default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >=
  109. // 2.0.
  110. //
  111. // Default: -1
  112. RetentionTime time.Duration
  113. // StartOffset determines from whence the consumer group should begin
  114. // consuming when it finds a partition without a committed offset. If
  115. // non-zero, it must be set to one of FirstOffset or LastOffset.
  116. //
  117. // Default: FirstOffset
  118. StartOffset int64
  119. // If not nil, specifies a logger used to report internal changes within the
  120. // reader.
  121. Logger Logger
  122. // ErrorLogger is the logger used to report errors. If nil, the reader falls
  123. // back to using Logger instead.
  124. ErrorLogger Logger
  125. // Timeout is the network timeout used when communicating with the consumer
  126. // group coordinator. This value should not be too small since errors
  127. // communicating with the broker will generally cause a consumer group
  128. // rebalance, and it's undesirable that a transient network error intoduce
  129. // that overhead. Similarly, it should not be too large or the consumer
  130. // group may be slow to respond to the coordinator failing over to another
  131. // broker.
  132. //
  133. // Default: 5s
  134. Timeout time.Duration
  135. // connect is a function for dialing the coordinator. This is provided for
  136. // unit testing to mock broker connections.
  137. connect func(dialer *Dialer, brokers ...string) (coordinator, error)
  138. }
  139. // Validate method validates ConsumerGroupConfig properties and sets relevant
  140. // defaults.
  141. func (config *ConsumerGroupConfig) Validate() error {
  142. if len(config.Brokers) == 0 {
  143. return errors.New("cannot create a consumer group with an empty list of broker addresses")
  144. }
  145. if len(config.Topics) == 0 {
  146. return errors.New("cannot create a consumer group without a topic")
  147. }
  148. if config.ID == "" {
  149. return errors.New("cannot create a consumer group without an ID")
  150. }
  151. if config.Dialer == nil {
  152. config.Dialer = DefaultDialer
  153. }
  154. if len(config.GroupBalancers) == 0 {
  155. config.GroupBalancers = []GroupBalancer{
  156. RangeGroupBalancer{},
  157. RoundRobinGroupBalancer{},
  158. }
  159. }
  160. if config.HeartbeatInterval == 0 {
  161. config.HeartbeatInterval = defaultHeartbeatInterval
  162. }
  163. if config.SessionTimeout == 0 {
  164. config.SessionTimeout = defaultSessionTimeout
  165. }
  166. if config.PartitionWatchInterval == 0 {
  167. config.PartitionWatchInterval = defaultPartitionWatchTime
  168. }
  169. if config.RebalanceTimeout == 0 {
  170. config.RebalanceTimeout = defaultRebalanceTimeout
  171. }
  172. if config.JoinGroupBackoff == 0 {
  173. config.JoinGroupBackoff = defaultJoinGroupBackoff
  174. }
  175. if config.RetentionTime == 0 {
  176. config.RetentionTime = defaultRetentionTime
  177. }
  178. if config.HeartbeatInterval < 0 || (config.HeartbeatInterval/time.Millisecond) >= math.MaxInt32 {
  179. return errors.New(fmt.Sprintf("HeartbeatInterval out of bounds: %d", config.HeartbeatInterval))
  180. }
  181. if config.SessionTimeout < 0 || (config.SessionTimeout/time.Millisecond) >= math.MaxInt32 {
  182. return errors.New(fmt.Sprintf("SessionTimeout out of bounds: %d", config.SessionTimeout))
  183. }
  184. if config.RebalanceTimeout < 0 || (config.RebalanceTimeout/time.Millisecond) >= math.MaxInt32 {
  185. return errors.New(fmt.Sprintf("RebalanceTimeout out of bounds: %d", config.RebalanceTimeout))
  186. }
  187. if config.JoinGroupBackoff < 0 || (config.JoinGroupBackoff/time.Millisecond) >= math.MaxInt32 {
  188. return errors.New(fmt.Sprintf("JoinGroupBackoff out of bounds: %d", config.JoinGroupBackoff))
  189. }
  190. if config.RetentionTime < 0 && config.RetentionTime != defaultRetentionTime {
  191. return errors.New(fmt.Sprintf("RetentionTime out of bounds: %d", config.RetentionTime))
  192. }
  193. if config.PartitionWatchInterval < 0 || (config.PartitionWatchInterval/time.Millisecond) >= math.MaxInt32 {
  194. return errors.New(fmt.Sprintf("PartitionWachInterval out of bounds %d", config.PartitionWatchInterval))
  195. }
  196. if config.StartOffset == 0 {
  197. config.StartOffset = FirstOffset
  198. }
  199. if config.StartOffset != FirstOffset && config.StartOffset != LastOffset {
  200. return errors.New(fmt.Sprintf("StartOffset is not valid %d", config.StartOffset))
  201. }
  202. if config.Timeout == 0 {
  203. config.Timeout = defaultTimeout
  204. }
  205. if config.connect == nil {
  206. config.connect = makeConnect(*config)
  207. }
  208. return nil
  209. }
  210. // PartitionAssignment represents the starting state of a partition that has
  211. // been assigned to a consumer.
  212. type PartitionAssignment struct {
  213. // ID is the partition ID.
  214. ID int
  215. // Offset is the initial offset at which this assignment begins. It will
  216. // either be an absolute offset if one has previously been committed for
  217. // the consumer group or a relative offset such as FirstOffset when this
  218. // is the first time the partition have been assigned to a member of the
  219. // group.
  220. Offset int64
  221. }
  222. // genCtx adapts the done channel of the generation to a context.Context. This
  223. // is used by Generation.Start so that we can pass a context to go routines
  224. // instead of passing around channels.
  225. type genCtx struct {
  226. gen *Generation
  227. }
  228. func (c genCtx) Done() <-chan struct{} {
  229. return c.gen.done
  230. }
  231. func (c genCtx) Err() error {
  232. select {
  233. case <-c.gen.done:
  234. return ErrGenerationEnded
  235. default:
  236. return nil
  237. }
  238. }
  239. func (c genCtx) Deadline() (time.Time, bool) {
  240. return time.Time{}, false
  241. }
  242. func (c genCtx) Value(interface{}) interface{} {
  243. return nil
  244. }
  245. // Generation represents a single consumer group generation. The generation
  246. // carries the topic+partition assignments for the given. It also provides
  247. // facilities for committing offsets and for running functions whose lifecycles
  248. // are bound to the generation.
  249. type Generation struct {
  250. // ID is the generation ID as assigned by the consumer group coordinator.
  251. ID int32
  252. // GroupID is the name of the consumer group.
  253. GroupID string
  254. // MemberID is the ID assigned to this consumer by the consumer group
  255. // coordinator.
  256. MemberID string
  257. // Assignments is the initial state of this Generation. The partition
  258. // assignments are grouped by topic.
  259. Assignments map[string][]PartitionAssignment
  260. conn coordinator
  261. once sync.Once
  262. done chan struct{}
  263. wg sync.WaitGroup
  264. retentionMillis int64
  265. log func(func(Logger))
  266. logError func(func(Logger))
  267. }
  268. // close stops the generation and waits for all functions launched via Start to
  269. // terminate.
  270. func (g *Generation) close() {
  271. g.once.Do(func() {
  272. close(g.done)
  273. })
  274. g.wg.Wait()
  275. }
  276. // Start launches the provided function in a go routine and adds accounting such
  277. // that when the function exits, it stops the current generation (if not
  278. // already in the process of doing so).
  279. //
  280. // The provided function MUST support cancellation via the ctx argument and exit
  281. // in a timely manner once the ctx is complete. When the context is closed, the
  282. // context's Error() function will return ErrGenerationEnded.
  283. //
  284. // When closing out a generation, the consumer group will wait for all functions
  285. // launched by Start to exit before the group can move on and join the next
  286. // generation. If the function does not exit promptly, it will stop forward
  287. // progress for this consumer and potentially cause consumer group membership
  288. // churn.
  289. func (g *Generation) Start(fn func(ctx context.Context)) {
  290. g.wg.Add(1)
  291. go func() {
  292. fn(genCtx{g})
  293. // shut down the generation as soon as one function exits. this is
  294. // different from close() in that it doesn't wait on the wg.
  295. g.once.Do(func() {
  296. close(g.done)
  297. })
  298. g.wg.Done()
  299. }()
  300. }
  301. // CommitOffsets commits the provided topic+partition+offset combos to the
  302. // consumer group coordinator. This can be used to reset the consumer to
  303. // explicit offsets.
  304. func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
  305. if len(offsets) == 0 {
  306. return nil
  307. }
  308. topics := make([]offsetCommitRequestV2Topic, 0, len(offsets))
  309. for topic, partitions := range offsets {
  310. t := offsetCommitRequestV2Topic{Topic: topic}
  311. for partition, offset := range partitions {
  312. t.Partitions = append(t.Partitions, offsetCommitRequestV2Partition{
  313. Partition: int32(partition),
  314. Offset: offset,
  315. })
  316. }
  317. topics = append(topics, t)
  318. }
  319. request := offsetCommitRequestV2{
  320. GroupID: g.GroupID,
  321. GenerationID: g.ID,
  322. MemberID: g.MemberID,
  323. RetentionTime: g.retentionMillis,
  324. Topics: topics,
  325. }
  326. _, err := g.conn.offsetCommit(request)
  327. if err == nil {
  328. // if logging is enabled, print out the partitions that were committed.
  329. g.log(func(l Logger) {
  330. var report []string
  331. for _, t := range request.Topics {
  332. report = append(report, fmt.Sprintf("\ttopic: %s", t.Topic))
  333. for _, p := range t.Partitions {
  334. report = append(report, fmt.Sprintf("\t\tpartition %d: %d", p.Partition, p.Offset))
  335. }
  336. }
  337. l.Printf("committed offsets for group %s: \n%s", g.GroupID, strings.Join(report, "\n"))
  338. })
  339. }
  340. return err
  341. }
  342. // heartbeatLoop checks in with the consumer group coordinator at the provided
  343. // interval. It exits if it ever encounters an error, which would signal the
  344. // end of the generation.
  345. func (g *Generation) heartbeatLoop(interval time.Duration) {
  346. g.Start(func(ctx context.Context) {
  347. g.log(func(l Logger) {
  348. l.Printf("started heartbeat for group, %v [%v]", g.GroupID, interval)
  349. })
  350. defer g.log(func(l Logger) {
  351. l.Printf("stopped heartbeat for group %s\n", g.GroupID)
  352. })
  353. ticker := time.NewTicker(interval)
  354. defer ticker.Stop()
  355. for {
  356. select {
  357. case <-ctx.Done():
  358. return
  359. case <-ticker.C:
  360. _, err := g.conn.heartbeat(heartbeatRequestV0{
  361. GroupID: g.GroupID,
  362. GenerationID: g.ID,
  363. MemberID: g.MemberID,
  364. })
  365. if err != nil {
  366. return
  367. }
  368. }
  369. }
  370. })
  371. }
  372. // partitionWatcher queries kafka and watches for partition changes, triggering
  373. // a rebalance if changes are found. Similar to heartbeat it's okay to return on
  374. // error here as if you are unable to ask a broker for basic metadata you're in
  375. // a bad spot and should rebalance. Commonly you will see an error here if there
  376. // is a problem with the connection to the coordinator and a rebalance will
  377. // establish a new connection to the coordinator.
  378. func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
  379. g.Start(func(ctx context.Context) {
  380. g.log(func(l Logger) {
  381. l.Printf("started partition watcher for group, %v, topic %v [%v]", g.GroupID, topic, interval)
  382. })
  383. defer g.log(func(l Logger) {
  384. l.Printf("stopped partition watcher for group, %v, topic %v", g.GroupID, topic)
  385. })
  386. ticker := time.NewTicker(interval)
  387. defer ticker.Stop()
  388. ops, err := g.conn.readPartitions(topic)
  389. if err != nil {
  390. g.logError(func(l Logger) {
  391. l.Printf("Problem getting partitions during startup, %v\n, Returning and setting up nextGeneration", err)
  392. })
  393. return
  394. }
  395. oParts := len(ops)
  396. for {
  397. select {
  398. case <-ctx.Done():
  399. return
  400. case <-ticker.C:
  401. ops, err := g.conn.readPartitions(topic)
  402. switch err {
  403. case nil, UnknownTopicOrPartition:
  404. if len(ops) != oParts {
  405. g.log(func(l Logger) {
  406. l.Printf("Partition changes found, reblancing group: %v.", g.GroupID)
  407. })
  408. return
  409. }
  410. default:
  411. g.logError(func(l Logger) {
  412. l.Printf("Problem getting partitions while checking for changes, %v", err)
  413. })
  414. if _, ok := err.(Error); ok {
  415. continue
  416. }
  417. // other errors imply that we lost the connection to the coordinator, so we
  418. // should abort and reconnect.
  419. return
  420. }
  421. }
  422. }
  423. })
  424. }
  425. // coordinator is a subset of the functionality in Conn in order to facilitate
  426. // testing the consumer group...especially for error conditions that are
  427. // difficult to instigate with a live broker running in docker.
  428. type coordinator interface {
  429. io.Closer
  430. findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error)
  431. joinGroup(joinGroupRequestV1) (joinGroupResponseV1, error)
  432. syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error)
  433. leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error)
  434. heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error)
  435. offsetFetch(offsetFetchRequestV1) (offsetFetchResponseV1, error)
  436. offsetCommit(offsetCommitRequestV2) (offsetCommitResponseV2, error)
  437. readPartitions(...string) ([]Partition, error)
  438. }
  439. // timeoutCoordinator wraps the Conn to ensure that every operation has a
  440. // deadline. Otherwise, it would be possible for requests to block indefinitely
  441. // if the remote server never responds. There are many spots where the consumer
  442. // group needs to interact with the broker, so it feels less error prone to
  443. // factor all of the deadline management into this shared location as opposed to
  444. // peppering it all through where the code actually interacts with the broker.
  445. type timeoutCoordinator struct {
  446. timeout time.Duration
  447. sessionTimeout time.Duration
  448. rebalanceTimeout time.Duration
  449. conn *Conn
  450. }
  451. func (t *timeoutCoordinator) Close() error {
  452. return t.conn.Close()
  453. }
  454. func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoordinatorResponseV0, error) {
  455. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  456. return findCoordinatorResponseV0{}, err
  457. }
  458. return t.conn.findCoordinator(req)
  459. }
  460. func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) {
  461. // in the case of join group, the consumer group coordinator may wait up
  462. // to rebalance timeout in order to wait for all members to join.
  463. if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil {
  464. return joinGroupResponseV1{}, err
  465. }
  466. return t.conn.joinGroup(req)
  467. }
  468. func (t *timeoutCoordinator) syncGroup(req syncGroupRequestV0) (syncGroupResponseV0, error) {
  469. // in the case of sync group, the consumer group leader is given up to
  470. // the session timeout to respond before the coordinator will give up.
  471. if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.sessionTimeout)); err != nil {
  472. return syncGroupResponseV0{}, err
  473. }
  474. return t.conn.syncGroup(req)
  475. }
  476. func (t *timeoutCoordinator) leaveGroup(req leaveGroupRequestV0) (leaveGroupResponseV0, error) {
  477. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  478. return leaveGroupResponseV0{}, err
  479. }
  480. return t.conn.leaveGroup(req)
  481. }
  482. func (t *timeoutCoordinator) heartbeat(req heartbeatRequestV0) (heartbeatResponseV0, error) {
  483. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  484. return heartbeatResponseV0{}, err
  485. }
  486. return t.conn.heartbeat(req)
  487. }
  488. func (t *timeoutCoordinator) offsetFetch(req offsetFetchRequestV1) (offsetFetchResponseV1, error) {
  489. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  490. return offsetFetchResponseV1{}, err
  491. }
  492. return t.conn.offsetFetch(req)
  493. }
  494. func (t *timeoutCoordinator) offsetCommit(req offsetCommitRequestV2) (offsetCommitResponseV2, error) {
  495. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  496. return offsetCommitResponseV2{}, err
  497. }
  498. return t.conn.offsetCommit(req)
  499. }
  500. func (t *timeoutCoordinator) readPartitions(topics ...string) ([]Partition, error) {
  501. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  502. return nil, err
  503. }
  504. return t.conn.ReadPartitions(topics...)
  505. }
  506. // NewConsumerGroup creates a new ConsumerGroup. It returns an error if the
  507. // provided configuration is invalid. It does not attempt to connect to the
  508. // Kafka cluster. That happens asynchronously, and any errors will be reported
  509. // by Next.
  510. func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) {
  511. if err := config.Validate(); err != nil {
  512. return nil, err
  513. }
  514. cg := &ConsumerGroup{
  515. config: config,
  516. next: make(chan *Generation),
  517. errs: make(chan error),
  518. done: make(chan struct{}),
  519. }
  520. cg.wg.Add(1)
  521. go func() {
  522. cg.run()
  523. cg.wg.Done()
  524. }()
  525. return cg, nil
  526. }
  527. // ConsumerGroup models a Kafka consumer group. A caller doesn't interact with
  528. // the group directly. Rather, they interact with a Generation. Every time a
  529. // member enters or exits the group, it results in a new Generation. The
  530. // Generation is where partition assignments and offset management occur.
  531. // Callers will use Next to get a handle to the Generation.
  532. type ConsumerGroup struct {
  533. config ConsumerGroupConfig
  534. next chan *Generation
  535. errs chan error
  536. closeOnce sync.Once
  537. wg sync.WaitGroup
  538. done chan struct{}
  539. }
  540. // Close terminates the current generation by causing this member to leave and
  541. // releases all local resources used to participate in the consumer group.
  542. // Close will also end the current generation if it is still active.
  543. func (cg *ConsumerGroup) Close() error {
  544. cg.closeOnce.Do(func() {
  545. close(cg.done)
  546. })
  547. cg.wg.Wait()
  548. return nil
  549. }
  550. // Next waits for the next consumer group generation. There will never be two
  551. // active generations. Next will never return a new generation until the
  552. // previous one has completed.
  553. //
  554. // If there are errors setting up the next generation, they will be surfaced
  555. // here.
  556. //
  557. // If the ConsumerGroup has been closed, then Next will return ErrGroupClosed.
  558. func (cg *ConsumerGroup) Next(ctx context.Context) (*Generation, error) {
  559. select {
  560. case <-ctx.Done():
  561. return nil, ctx.Err()
  562. case <-cg.done:
  563. return nil, ErrGroupClosed
  564. case err := <-cg.errs:
  565. return nil, err
  566. case next := <-cg.next:
  567. return next, nil
  568. }
  569. }
  570. func (cg *ConsumerGroup) run() {
  571. // the memberID is the only piece of information that is maintained across
  572. // generations. it starts empty and will be assigned on the first nextGeneration
  573. // when the joinGroup request is processed. it may change again later if
  574. // the CG coordinator fails over or if the member is evicted. otherwise, it
  575. // will be constant for the lifetime of this group.
  576. var memberID string
  577. var err error
  578. for {
  579. memberID, err = cg.nextGeneration(memberID)
  580. // backoff will be set if this go routine should sleep before continuing
  581. // to the next generation. it will be non-nil in the case of an error
  582. // joining or syncing the group.
  583. var backoff <-chan time.Time
  584. switch err {
  585. case nil:
  586. // no error...the previous generation finished normally.
  587. continue
  588. case ErrGroupClosed:
  589. // the CG has been closed...leave the group and exit loop.
  590. _ = cg.leaveGroup(memberID)
  591. return
  592. case RebalanceInProgress:
  593. // in case of a RebalanceInProgress, don't leave the group or
  594. // change the member ID, but report the error. the next attempt
  595. // to join the group will then be subject to the rebalance
  596. // timeout, so the broker will be responsible for throttling
  597. // this loop.
  598. default:
  599. // leave the group and report the error if we had gotten far
  600. // enough so as to have a member ID. also clear the member id
  601. // so we don't attempt to use it again. in order to avoid
  602. // a tight error loop, backoff before the next attempt to join
  603. // the group.
  604. _ = cg.leaveGroup(memberID)
  605. memberID = ""
  606. backoff = time.After(cg.config.JoinGroupBackoff)
  607. }
  608. // ensure that we exit cleanly in case the CG is done and no one is
  609. // waiting to receive on the unbuffered error channel.
  610. select {
  611. case <-cg.done:
  612. return
  613. case cg.errs <- err:
  614. }
  615. // backoff if needed, being sure to exit cleanly if the CG is done.
  616. if backoff != nil {
  617. select {
  618. case <-cg.done:
  619. // exit cleanly if the group is closed.
  620. return
  621. case <-backoff:
  622. }
  623. }
  624. }
  625. }
  626. func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
  627. // get a new connection to the coordinator on each loop. the previous
  628. // generation could have exited due to losing the connection, so this
  629. // ensures that we always have a clean starting point. it means we will
  630. // re-connect in certain cases, but that shouldn't be an issue given that
  631. // rebalances are relatively infrequent under normal operating
  632. // conditions.
  633. conn, err := cg.coordinator()
  634. if err != nil {
  635. cg.withErrorLogger(func(log Logger) {
  636. log.Printf("Unable to establish connection to consumer group coordinator for group %s: %v", cg.config.ID, err)
  637. })
  638. return memberID, err // a prior memberID may still be valid, so don't return ""
  639. }
  640. defer conn.Close()
  641. var generationID int32
  642. var groupAssignments GroupMemberAssignments
  643. var assignments map[string][]int32
  644. // join group. this will join the group and prepare assignments if our
  645. // consumer is elected leader. it may also change or assign the member ID.
  646. memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID)
  647. if err != nil {
  648. cg.withErrorLogger(func(log Logger) {
  649. log.Printf("Failed to join group %s: %v", cg.config.ID, err)
  650. })
  651. return memberID, err
  652. }
  653. cg.withLogger(func(log Logger) {
  654. log.Printf("Joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
  655. })
  656. // sync group
  657. assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments)
  658. if err != nil {
  659. cg.withErrorLogger(func(log Logger) {
  660. log.Printf("Failed to sync group %s: %v", cg.config.ID, err)
  661. })
  662. return memberID, err
  663. }
  664. // fetch initial offsets.
  665. var offsets map[string]map[int]int64
  666. offsets, err = cg.fetchOffsets(conn, assignments)
  667. if err != nil {
  668. cg.withErrorLogger(func(log Logger) {
  669. log.Printf("Failed to fetch offsets for group %s: %v", cg.config.ID, err)
  670. })
  671. return memberID, err
  672. }
  673. // create the generation.
  674. gen := Generation{
  675. ID: generationID,
  676. GroupID: cg.config.ID,
  677. MemberID: memberID,
  678. Assignments: cg.makeAssignments(assignments, offsets),
  679. conn: conn,
  680. done: make(chan struct{}),
  681. retentionMillis: int64(cg.config.RetentionTime / time.Millisecond),
  682. log: cg.withLogger,
  683. logError: cg.withErrorLogger,
  684. }
  685. // spawn all of the go routines required to facilitate this generation. if
  686. // any of these functions exit, then the generation is determined to be
  687. // complete.
  688. gen.heartbeatLoop(cg.config.HeartbeatInterval)
  689. if cg.config.WatchPartitionChanges {
  690. for _, topic := range cg.config.Topics {
  691. gen.partitionWatcher(cg.config.PartitionWatchInterval, topic)
  692. }
  693. }
  694. // make this generation available for retrieval. if the CG is closed before
  695. // we can send it on the channel, exit. that case is required b/c the next
  696. // channel is unbuffered. if the caller to Next has already bailed because
  697. // it's own teardown logic has been invoked, this would deadlock otherwise.
  698. select {
  699. case <-cg.done:
  700. gen.close()
  701. return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
  702. case cg.next <- &gen:
  703. }
  704. // wait for generation to complete. if the CG is closed before the
  705. // generation is finished, exit and leave the group.
  706. select {
  707. case <-cg.done:
  708. gen.close()
  709. return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
  710. case <-gen.done:
  711. // time for next generation! make sure all the current go routines exit
  712. // before continuing onward.
  713. gen.close()
  714. return memberID, nil
  715. }
  716. }
  717. // connect returns a connection to ANY broker
  718. func makeConnect(config ConsumerGroupConfig) func(dialer *Dialer, brokers ...string) (coordinator, error) {
  719. return func(dialer *Dialer, brokers ...string) (coordinator, error) {
  720. var err error
  721. for _, broker := range brokers {
  722. var conn *Conn
  723. if conn, err = dialer.Dial("tcp", broker); err == nil {
  724. return &timeoutCoordinator{
  725. conn: conn,
  726. timeout: config.Timeout,
  727. sessionTimeout: config.SessionTimeout,
  728. rebalanceTimeout: config.RebalanceTimeout,
  729. }, nil
  730. }
  731. }
  732. return nil, err // err will be non-nil
  733. }
  734. }
  735. // coordinator establishes a connection to the coordinator for this consumer
  736. // group.
  737. func (cg *ConsumerGroup) coordinator() (coordinator, error) {
  738. // NOTE : could try to cache the coordinator to avoid the double connect
  739. // here. since consumer group balances happen infrequently and are
  740. // an expensive operation, we're not currently optimizing that case
  741. // in order to keep the code simpler.
  742. conn, err := cg.config.connect(cg.config.Dialer, cg.config.Brokers...)
  743. if err != nil {
  744. return nil, err
  745. }
  746. defer conn.Close()
  747. out, err := conn.findCoordinator(findCoordinatorRequestV0{
  748. CoordinatorKey: cg.config.ID,
  749. })
  750. if err == nil && out.ErrorCode != 0 {
  751. err = Error(out.ErrorCode)
  752. }
  753. if err != nil {
  754. return nil, err
  755. }
  756. address := net.JoinHostPort(out.Coordinator.Host, strconv.Itoa(int(out.Coordinator.Port)))
  757. return cg.config.connect(cg.config.Dialer, address)
  758. }
  759. // joinGroup attempts to join the reader to the consumer group.
  760. // Returns GroupMemberAssignments is this Reader was selected as
  761. // the leader. Otherwise, GroupMemberAssignments will be nil.
  762. //
  763. // Possible kafka error codes returned:
  764. // * GroupLoadInProgress:
  765. // * GroupCoordinatorNotAvailable:
  766. // * NotCoordinatorForGroup:
  767. // * InconsistentGroupProtocol:
  768. // * InvalidSessionTimeout:
  769. // * GroupAuthorizationFailed:
  770. func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
  771. request, err := cg.makeJoinGroupRequestV1(memberID)
  772. if err != nil {
  773. return "", 0, nil, err
  774. }
  775. response, err := conn.joinGroup(request)
  776. if err == nil && response.ErrorCode != 0 {
  777. err = Error(response.ErrorCode)
  778. }
  779. if err != nil {
  780. return "", 0, nil, err
  781. }
  782. memberID = response.MemberID
  783. generationID := response.GenerationID
  784. cg.withLogger(func(l Logger) {
  785. l.Printf("joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
  786. })
  787. var assignments GroupMemberAssignments
  788. if iAmLeader := response.MemberID == response.LeaderID; iAmLeader {
  789. v, err := cg.assignTopicPartitions(conn, response)
  790. if err != nil {
  791. return memberID, 0, nil, err
  792. }
  793. assignments = v
  794. cg.withLogger(func(l Logger) {
  795. for memberID, assignment := range assignments {
  796. for topic, partitions := range assignment {
  797. l.Printf("assigned member/topic/partitions %v/%v/%v", memberID, topic, partitions)
  798. }
  799. }
  800. })
  801. }
  802. cg.withLogger(func(l Logger) {
  803. l.Printf("joinGroup succeeded for response, %v. generationID=%v, memberID=%v", cg.config.ID, response.GenerationID, response.MemberID)
  804. })
  805. return memberID, generationID, assignments, nil
  806. }
  807. // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup
  808. // request
  809. func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) {
  810. request := joinGroupRequestV1{
  811. GroupID: cg.config.ID,
  812. MemberID: memberID,
  813. SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond),
  814. RebalanceTimeout: int32(cg.config.RebalanceTimeout / time.Millisecond),
  815. ProtocolType: defaultProtocolType,
  816. }
  817. for _, balancer := range cg.config.GroupBalancers {
  818. userData, err := balancer.UserData()
  819. if err != nil {
  820. return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %v", balancer.ProtocolName(), err)
  821. }
  822. request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
  823. ProtocolName: balancer.ProtocolName(),
  824. ProtocolMetadata: groupMetadata{
  825. Version: 1,
  826. Topics: cg.config.Topics,
  827. UserData: userData,
  828. }.bytes(),
  829. })
  830. }
  831. return request, nil
  832. }
  833. // assignTopicPartitions uses the selected GroupBalancer to assign members to
  834. // their various partitions
  835. func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) {
  836. cg.withLogger(func(l Logger) {
  837. l.Printf("selected as leader for group, %s\n", cg.config.ID)
  838. })
  839. balancer, ok := findGroupBalancer(group.GroupProtocol, cg.config.GroupBalancers)
  840. if !ok {
  841. // NOTE : this shouldn't happen in practice...the broker should not
  842. // return successfully from joinGroup unless all members support
  843. // at least one common protocol.
  844. return nil, fmt.Errorf("unable to find selected balancer, %v, for group, %v", group.GroupProtocol, cg.config.ID)
  845. }
  846. members, err := cg.makeMemberProtocolMetadata(group.Members)
  847. if err != nil {
  848. return nil, err
  849. }
  850. topics := extractTopics(members)
  851. partitions, err := conn.readPartitions(topics...)
  852. // it's not a failure if the topic doesn't exist yet. it results in no
  853. // assignments for the topic. this matches the behavior of the official
  854. // clients: java, python, and librdkafka.
  855. // a topic watcher can trigger a rebalance when the topic comes into being.
  856. if err != nil && err != UnknownTopicOrPartition {
  857. return nil, err
  858. }
  859. cg.withLogger(func(l Logger) {
  860. l.Printf("using '%v' balancer to assign group, %v", group.GroupProtocol, cg.config.ID)
  861. for _, member := range members {
  862. l.Printf("found member: %v/%#v", member.ID, member.UserData)
  863. }
  864. for _, partition := range partitions {
  865. l.Printf("found topic/partition: %v/%v", partition.Topic, partition.ID)
  866. }
  867. })
  868. return balancer.AssignGroups(members, partitions), nil
  869. }
  870. // makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember
  871. func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) {
  872. members := make([]GroupMember, 0, len(in))
  873. for _, item := range in {
  874. metadata := groupMetadata{}
  875. reader := bufio.NewReader(bytes.NewReader(item.MemberMetadata))
  876. if remain, err := (&metadata).readFrom(reader, len(item.MemberMetadata)); err != nil || remain != 0 {
  877. return nil, fmt.Errorf("unable to read metadata for member, %v: %v", item.MemberID, err)
  878. }
  879. members = append(members, GroupMember{
  880. ID: item.MemberID,
  881. Topics: metadata.Topics,
  882. UserData: metadata.UserData,
  883. })
  884. }
  885. return members, nil
  886. }
  887. // syncGroup completes the consumer group nextGeneration by accepting the
  888. // memberAssignments (if this Reader is the leader) and returning this
  889. // Readers subscriptions topic => partitions
  890. //
  891. // Possible kafka error codes returned:
  892. // * GroupCoordinatorNotAvailable:
  893. // * NotCoordinatorForGroup:
  894. // * IllegalGeneration:
  895. // * RebalanceInProgress:
  896. // * GroupAuthorizationFailed:
  897. func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
  898. request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
  899. response, err := conn.syncGroup(request)
  900. if err == nil && response.ErrorCode != 0 {
  901. err = Error(response.ErrorCode)
  902. }
  903. if err != nil {
  904. return nil, err
  905. }
  906. assignments := groupAssignment{}
  907. reader := bufio.NewReader(bytes.NewReader(response.MemberAssignments))
  908. if _, err := (&assignments).readFrom(reader, len(response.MemberAssignments)); err != nil {
  909. return nil, err
  910. }
  911. if len(assignments.Topics) == 0 {
  912. cg.withLogger(func(l Logger) {
  913. l.Printf("received empty assignments for group, %v as member %s for generation %d", cg.config.ID, memberID, generationID)
  914. })
  915. }
  916. cg.withLogger(func(l Logger) {
  917. l.Printf("sync group finished for group, %v", cg.config.ID)
  918. })
  919. return assignments.Topics, nil
  920. }
  921. func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID int32, memberAssignments GroupMemberAssignments) syncGroupRequestV0 {
  922. request := syncGroupRequestV0{
  923. GroupID: cg.config.ID,
  924. GenerationID: generationID,
  925. MemberID: memberID,
  926. }
  927. if memberAssignments != nil {
  928. request.GroupAssignments = make([]syncGroupRequestGroupAssignmentV0, 0, 1)
  929. for memberID, topics := range memberAssignments {
  930. topics32 := make(map[string][]int32)
  931. for topic, partitions := range topics {
  932. partitions32 := make([]int32, len(partitions))
  933. for i := range partitions {
  934. partitions32[i] = int32(partitions[i])
  935. }
  936. topics32[topic] = partitions32
  937. }
  938. request.GroupAssignments = append(request.GroupAssignments, syncGroupRequestGroupAssignmentV0{
  939. MemberID: memberID,
  940. MemberAssignments: groupAssignment{
  941. Version: 1,
  942. Topics: topics32,
  943. }.bytes(),
  944. })
  945. }
  946. cg.withLogger(func(logger Logger) {
  947. logger.Printf("Syncing %d assignments for generation %d as member %s", len(request.GroupAssignments), generationID, memberID)
  948. })
  949. }
  950. return request
  951. }
  952. func (cg *ConsumerGroup) fetchOffsets(conn coordinator, subs map[string][]int32) (map[string]map[int]int64, error) {
  953. req := offsetFetchRequestV1{
  954. GroupID: cg.config.ID,
  955. Topics: make([]offsetFetchRequestV1Topic, 0, len(cg.config.Topics)),
  956. }
  957. for _, topic := range cg.config.Topics {
  958. req.Topics = append(req.Topics, offsetFetchRequestV1Topic{
  959. Topic: topic,
  960. Partitions: subs[topic],
  961. })
  962. }
  963. offsets, err := conn.offsetFetch(req)
  964. if err != nil {
  965. return nil, err
  966. }
  967. offsetsByTopic := make(map[string]map[int]int64)
  968. for _, res := range offsets.Responses {
  969. offsetsByPartition := map[int]int64{}
  970. offsetsByTopic[res.Topic] = offsetsByPartition
  971. for _, pr := range res.PartitionResponses {
  972. for _, partition := range subs[res.Topic] {
  973. if partition == pr.Partition {
  974. offset := pr.Offset
  975. if offset < 0 {
  976. offset = cg.config.StartOffset
  977. }
  978. offsetsByPartition[int(partition)] = offset
  979. }
  980. }
  981. }
  982. }
  983. return offsetsByTopic, nil
  984. }
  985. func (cg *ConsumerGroup) makeAssignments(assignments map[string][]int32, offsets map[string]map[int]int64) map[string][]PartitionAssignment {
  986. topicAssignments := make(map[string][]PartitionAssignment)
  987. for _, topic := range cg.config.Topics {
  988. topicPartitions := assignments[topic]
  989. topicAssignments[topic] = make([]PartitionAssignment, 0, len(topicPartitions))
  990. for _, partition := range topicPartitions {
  991. var offset int64
  992. partitionOffsets, ok := offsets[topic]
  993. if ok {
  994. offset, ok = partitionOffsets[int(partition)]
  995. }
  996. if !ok {
  997. offset = cg.config.StartOffset
  998. }
  999. topicAssignments[topic] = append(topicAssignments[topic], PartitionAssignment{
  1000. ID: int(partition),
  1001. Offset: offset,
  1002. })
  1003. }
  1004. }
  1005. return topicAssignments
  1006. }
  1007. func (cg *ConsumerGroup) leaveGroup(memberID string) error {
  1008. // don't attempt to leave the group if no memberID was ever assigned.
  1009. if memberID == "" {
  1010. return nil
  1011. }
  1012. cg.withLogger(func(log Logger) {
  1013. log.Printf("Leaving group %s, member %s", cg.config.ID, memberID)
  1014. })
  1015. // IMPORTANT : leaveGroup establishes its own connection to the coordinator
  1016. // because it is often called after some other operation failed.
  1017. // said failure could be the result of connection-level issues,
  1018. // so we want to re-establish the connection to ensure that we
  1019. // are able to process the cleanup step.
  1020. coordinator, err := cg.coordinator()
  1021. if err != nil {
  1022. return err
  1023. }
  1024. _, err = coordinator.leaveGroup(leaveGroupRequestV0{
  1025. GroupID: cg.config.ID,
  1026. MemberID: memberID,
  1027. })
  1028. if err != nil {
  1029. cg.withErrorLogger(func(log Logger) {
  1030. log.Printf("leave group failed for group, %v, and member, %v: %v", cg.config.ID, memberID, err)
  1031. })
  1032. }
  1033. _ = coordinator.Close()
  1034. return err
  1035. }
  1036. func (cg *ConsumerGroup) withLogger(do func(Logger)) {
  1037. if cg.config.Logger != nil {
  1038. do(cg.config.Logger)
  1039. }
  1040. }
  1041. func (cg *ConsumerGroup) withErrorLogger(do func(Logger)) {
  1042. if cg.config.ErrorLogger != nil {
  1043. do(cg.config.ErrorLogger)
  1044. } else {
  1045. cg.withLogger(do)
  1046. }
  1047. }