reader.go 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619
  1. package kafka
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "sort"
  9. "strconv"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. const (
  15. LastOffset int64 = -1 // The most recent offset available for a partition.
  16. FirstOffset int64 = -2 // The least recent offset available for a partition.
  17. )
  18. const (
  19. // defaultCommitRetries holds the number of commit attempts to make
  20. // before giving up.
  21. defaultCommitRetries = 3
  22. )
  23. const (
  24. // defaultFetchMinBytes of 1 byte means that fetch requests are answered as
  25. // soon as a single byte of data is available or the fetch request times out
  26. // waiting for data to arrive.
  27. defaultFetchMinBytes = 1
  28. )
  29. var (
  30. errOnlyAvailableWithGroup = errors.New("unavailable when GroupID is not set")
  31. errNotAvailableWithGroup = errors.New("unavailable when GroupID is set")
  32. )
  33. const (
  34. // defaultReadBackoffMax/Min sets the boundaries for how long the reader wait before
  35. // polling for new messages.
  36. defaultReadBackoffMin = 100 * time.Millisecond
  37. defaultReadBackoffMax = 1 * time.Second
  38. )
  39. // Reader provides a high-level API for consuming messages from kafka.
  40. //
  41. // A Reader automatically manages reconnections to a kafka server, and
  42. // blocking methods have context support for asynchronous cancellations.
  43. //
  44. // Note that it is important to call `Close()` on a `Reader` when a process exits.
  45. // The kafka server needs a graceful disconnect to stop it from continuing to
  46. // attempt to send messages to the connected clients. The given example will not
  47. // call `Close()` if the process is terminated with SIGINT (ctrl-c at the shell) or
  48. // SIGTERM (as docker stop or a kubernetes restart does). This can result in a
  49. // delay when a new reader on the same topic connects (e.g. new process started
  50. // or new container running). Use a `signal.Notify` handler to close the reader on
  51. // process shutdown.
  52. type Reader struct {
  53. // immutable fields of the reader
  54. config ReaderConfig
  55. // communication channels between the parent reader and its subreaders
  56. msgs chan readerMessage
  57. // mutable fields of the reader (synchronized on the mutex)
  58. mutex sync.Mutex
  59. join sync.WaitGroup
  60. cancel context.CancelFunc
  61. stop context.CancelFunc
  62. done chan struct{}
  63. commits chan commitRequest
  64. version int64 // version holds the generation of the spawned readers
  65. offset int64
  66. lag int64
  67. closed bool
  68. // Without a group subscription (when Reader.config.GroupID == ""),
  69. // when errors occur, the Reader gets a synthetic readerMessage with
  70. // a non-nil err set. With group subscriptions however, when an error
  71. // occurs in Reader.run, there's no reader running (sic, cf. reader vs.
  72. // Reader) and there's no way to let the high-level methods like
  73. // FetchMessage know that an error indeed occurred. If an error in run
  74. // occurs, it will be non-block-sent to this unbuffered channel, where
  75. // the high-level methods can select{} on it and notify the caller.
  76. runError chan error
  77. // reader stats are all made of atomic values, no need for synchronization.
  78. once uint32
  79. stctx context.Context
  80. // reader stats are all made of atomic values, no need for synchronization.
  81. // Use a pointer to ensure 64-bit alignment of the values.
  82. stats *readerStats
  83. }
  84. // useConsumerGroup indicates whether the Reader is part of a consumer group.
  85. func (r *Reader) useConsumerGroup() bool { return r.config.GroupID != "" }
  86. func (r *Reader) getTopics() []string {
  87. if len(r.config.GroupTopics) > 0 {
  88. return r.config.GroupTopics[:]
  89. }
  90. return []string{r.config.Topic}
  91. }
  92. // useSyncCommits indicates whether the Reader is configured to perform sync or
  93. // async commits.
  94. func (r *Reader) useSyncCommits() bool { return r.config.CommitInterval == 0 }
  95. func (r *Reader) unsubscribe() {
  96. r.cancel()
  97. r.join.Wait()
  98. // it would be interesting to drain the r.msgs channel at this point since
  99. // it will contain buffered messages for partitions that may not be
  100. // re-assigned to this reader in the next consumer group generation.
  101. // however, draining the channel could race with the client calling
  102. // ReadMessage, which could result in messages delivered and/or committed
  103. // with gaps in the offset. for now, we will err on the side of caution and
  104. // potentially have those messages be reprocessed in the next generation by
  105. // another consumer to avoid such a race.
  106. }
  107. func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
  108. offsets := make(map[topicPartition]int64)
  109. for topic, assignments := range allAssignments {
  110. for _, assignment := range assignments {
  111. key := topicPartition{
  112. topic: topic,
  113. partition: int32(assignment.ID),
  114. }
  115. offsets[key] = assignment.Offset
  116. }
  117. }
  118. r.mutex.Lock()
  119. r.start(offsets)
  120. r.mutex.Unlock()
  121. r.withLogger(func(l Logger) {
  122. l.Printf("subscribed to topics and partitions: %+v", offsets)
  123. })
  124. }
  125. // commitOffsetsWithRetry attempts to commit the specified offsets and retries
  126. // up to the specified number of times.
  127. func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash, retries int) (err error) {
  128. const (
  129. backoffDelayMin = 100 * time.Millisecond
  130. backoffDelayMax = 5 * time.Second
  131. )
  132. for attempt := 0; attempt < retries; attempt++ {
  133. if attempt != 0 {
  134. if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
  135. return
  136. }
  137. }
  138. if err = gen.CommitOffsets(offsetStash); err == nil {
  139. return
  140. }
  141. }
  142. return // err will not be nil
  143. }
  144. // offsetStash holds offsets by topic => partition => offset.
  145. type offsetStash map[string]map[int]int64
  146. // merge updates the offsetStash with the offsets from the provided messages.
  147. func (o offsetStash) merge(commits []commit) {
  148. for _, c := range commits {
  149. offsetsByPartition, ok := o[c.topic]
  150. if !ok {
  151. offsetsByPartition = map[int]int64{}
  152. o[c.topic] = offsetsByPartition
  153. }
  154. if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset {
  155. offsetsByPartition[c.partition] = c.offset
  156. }
  157. }
  158. }
  159. // reset clears the contents of the offsetStash.
  160. func (o offsetStash) reset() {
  161. for key := range o {
  162. delete(o, key)
  163. }
  164. }
  165. // commitLoopImmediate handles each commit synchronously.
  166. func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) {
  167. offsets := offsetStash{}
  168. for {
  169. select {
  170. case <-ctx.Done():
  171. // drain the commit channel and prepare a single, final commit.
  172. // the commit will combine any outstanding requests and the result
  173. // will be sent back to all the callers of CommitMessages so that
  174. // they can return.
  175. var errchs []chan<- error
  176. for hasCommits := true; hasCommits; {
  177. select {
  178. case req := <-r.commits:
  179. offsets.merge(req.commits)
  180. errchs = append(errchs, req.errch)
  181. default:
  182. hasCommits = false
  183. }
  184. }
  185. err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
  186. for _, errch := range errchs {
  187. // NOTE : this will be a buffered channel and will not block.
  188. errch <- err
  189. }
  190. return
  191. case req := <-r.commits:
  192. offsets.merge(req.commits)
  193. req.errch <- r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
  194. offsets.reset()
  195. }
  196. }
  197. }
  198. // commitLoopInterval handles each commit asynchronously with a period defined
  199. // by ReaderConfig.CommitInterval.
  200. func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) {
  201. ticker := time.NewTicker(r.config.CommitInterval)
  202. defer ticker.Stop()
  203. // the offset stash should not survive rebalances b/c the consumer may
  204. // receive new assignments.
  205. offsets := offsetStash{}
  206. commit := func() {
  207. if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil {
  208. r.withErrorLogger(func(l Logger) { l.Printf("%v", err) })
  209. } else {
  210. offsets.reset()
  211. }
  212. }
  213. for {
  214. select {
  215. case <-ctx.Done():
  216. // drain the commit channel in order to prepare the final commit.
  217. for hasCommits := true; hasCommits; {
  218. select {
  219. case req := <-r.commits:
  220. offsets.merge(req.commits)
  221. default:
  222. hasCommits = false
  223. }
  224. }
  225. commit()
  226. return
  227. case <-ticker.C:
  228. commit()
  229. case req := <-r.commits:
  230. offsets.merge(req.commits)
  231. }
  232. }
  233. }
  234. // commitLoop processes commits off the commit chan.
  235. func (r *Reader) commitLoop(ctx context.Context, gen *Generation) {
  236. r.withLogger(func(l Logger) {
  237. l.Printf("started commit for group %s\n", r.config.GroupID)
  238. })
  239. defer r.withLogger(func(l Logger) {
  240. l.Printf("stopped commit for group %s\n", r.config.GroupID)
  241. })
  242. if r.useSyncCommits() {
  243. r.commitLoopImmediate(ctx, gen)
  244. } else {
  245. r.commitLoopInterval(ctx, gen)
  246. }
  247. }
  248. // run provides the main consumer group management loop. Each iteration performs the
  249. // handshake to join the Reader to the consumer group.
  250. //
  251. // This function is responsible for closing the consumer group upon exit.
  252. func (r *Reader) run(cg *ConsumerGroup) {
  253. defer close(r.done)
  254. defer cg.Close()
  255. r.withLogger(func(l Logger) {
  256. l.Printf("entering loop for consumer group, %v\n", r.config.GroupID)
  257. })
  258. for {
  259. // Limit the number of attempts at waiting for the next
  260. // consumer generation.
  261. var err error
  262. var gen *Generation
  263. for attempt := 1; attempt <= r.config.MaxAttempts; attempt++ {
  264. gen, err = cg.Next(r.stctx)
  265. if err == nil {
  266. break
  267. }
  268. if errors.Is(err, r.stctx.Err()) {
  269. return
  270. }
  271. r.stats.errors.observe(1)
  272. r.withErrorLogger(func(l Logger) {
  273. l.Printf("%v", err)
  274. })
  275. // Continue with next attempt...
  276. }
  277. if err != nil {
  278. // All attempts have failed.
  279. select {
  280. case r.runError <- err:
  281. // If somebody's receiving on the runError, let
  282. // them know the error occurred.
  283. default:
  284. // Otherwise, don't block to allow healing.
  285. }
  286. continue
  287. }
  288. r.stats.rebalances.observe(1)
  289. r.subscribe(gen.Assignments)
  290. gen.Start(func(ctx context.Context) {
  291. r.commitLoop(ctx, gen)
  292. })
  293. gen.Start(func(ctx context.Context) {
  294. // wait for the generation to end and then unsubscribe.
  295. select {
  296. case <-ctx.Done():
  297. // continue to next generation
  298. case <-r.stctx.Done():
  299. // this will be the last loop because the reader is closed.
  300. }
  301. r.unsubscribe()
  302. })
  303. }
  304. }
  305. // ReaderConfig is a configuration object used to create new instances of
  306. // Reader.
  307. type ReaderConfig struct {
  308. // The list of broker addresses used to connect to the kafka cluster.
  309. Brokers []string
  310. // GroupID holds the optional consumer group id. If GroupID is specified, then
  311. // Partition should NOT be specified e.g. 0
  312. GroupID string
  313. // GroupTopics allows specifying multiple topics, but can only be used in
  314. // combination with GroupID, as it is a consumer-group feature. As such, if
  315. // GroupID is set, then either Topic or GroupTopics must be defined.
  316. GroupTopics []string
  317. // The topic to read messages from.
  318. Topic string
  319. // Partition to read messages from. Either Partition or GroupID may
  320. // be assigned, but not both
  321. Partition int
  322. // An dialer used to open connections to the kafka server. This field is
  323. // optional, if nil, the default dialer is used instead.
  324. Dialer *Dialer
  325. // The capacity of the internal message queue, defaults to 100 if none is
  326. // set.
  327. QueueCapacity int
  328. // MinBytes indicates to the broker the minimum batch size that the consumer
  329. // will accept. Setting a high minimum when consuming from a low-volume topic
  330. // may result in delayed delivery when the broker does not have enough data to
  331. // satisfy the defined minimum.
  332. //
  333. // Default: 1
  334. MinBytes int
  335. // MaxBytes indicates to the broker the maximum batch size that the consumer
  336. // will accept. The broker will truncate a message to satisfy this maximum, so
  337. // choose a value that is high enough for your largest message size.
  338. //
  339. // Default: 1MB
  340. MaxBytes int
  341. // Maximum amount of time to wait for new data to come when fetching batches
  342. // of messages from kafka.
  343. //
  344. // Default: 10s
  345. MaxWait time.Duration
  346. // ReadBatchTimeout amount of time to wait to fetch message from kafka messages batch.
  347. //
  348. // Default: 10s
  349. ReadBatchTimeout time.Duration
  350. // ReadLagInterval sets the frequency at which the reader lag is updated.
  351. // Setting this field to a negative value disables lag reporting.
  352. ReadLagInterval time.Duration
  353. // GroupBalancers is the priority-ordered list of client-side consumer group
  354. // balancing strategies that will be offered to the coordinator. The first
  355. // strategy that all group members support will be chosen by the leader.
  356. //
  357. // Default: [Range, RoundRobin]
  358. //
  359. // Only used when GroupID is set
  360. GroupBalancers []GroupBalancer
  361. // HeartbeatInterval sets the optional frequency at which the reader sends the consumer
  362. // group heartbeat update.
  363. //
  364. // Default: 3s
  365. //
  366. // Only used when GroupID is set
  367. HeartbeatInterval time.Duration
  368. // CommitInterval indicates the interval at which offsets are committed to
  369. // the broker. If 0, commits will be handled synchronously.
  370. //
  371. // Default: 0
  372. //
  373. // Only used when GroupID is set
  374. CommitInterval time.Duration
  375. // PartitionWatchInterval indicates how often a reader checks for partition changes.
  376. // If a reader sees a partition change (such as a partition add) it will rebalance the group
  377. // picking up new partitions.
  378. //
  379. // Default: 5s
  380. //
  381. // Only used when GroupID is set and WatchPartitionChanges is set.
  382. PartitionWatchInterval time.Duration
  383. // WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
  384. // polling the brokers and rebalancing if any partition changes happen to the topic.
  385. WatchPartitionChanges bool
  386. // SessionTimeout optionally sets the length of time that may pass without a heartbeat
  387. // before the coordinator considers the consumer dead and initiates a rebalance.
  388. //
  389. // Default: 30s
  390. //
  391. // Only used when GroupID is set
  392. SessionTimeout time.Duration
  393. // RebalanceTimeout optionally sets the length of time the coordinator will wait
  394. // for members to join as part of a rebalance. For kafka servers under higher
  395. // load, it may be useful to set this value higher.
  396. //
  397. // Default: 30s
  398. //
  399. // Only used when GroupID is set
  400. RebalanceTimeout time.Duration
  401. // JoinGroupBackoff optionally sets the length of time to wait between re-joining
  402. // the consumer group after an error.
  403. //
  404. // Default: 5s
  405. JoinGroupBackoff time.Duration
  406. // RetentionTime optionally sets the length of time the consumer group will be saved
  407. // by the broker
  408. //
  409. // Default: 24h
  410. //
  411. // Only used when GroupID is set
  412. RetentionTime time.Duration
  413. // StartOffset determines from whence the consumer group should begin
  414. // consuming when it finds a partition without a committed offset. If
  415. // non-zero, it must be set to one of FirstOffset or LastOffset.
  416. //
  417. // Default: FirstOffset
  418. //
  419. // Only used when GroupID is set
  420. StartOffset int64
  421. // BackoffDelayMin optionally sets the smallest amount of time the reader will wait before
  422. // polling for new messages
  423. //
  424. // Default: 100ms
  425. ReadBackoffMin time.Duration
  426. // BackoffDelayMax optionally sets the maximum amount of time the reader will wait before
  427. // polling for new messages
  428. //
  429. // Default: 1s
  430. ReadBackoffMax time.Duration
  431. // If not nil, specifies a logger used to report internal changes within the
  432. // reader.
  433. Logger Logger
  434. // ErrorLogger is the logger used to report errors. If nil, the reader falls
  435. // back to using Logger instead.
  436. ErrorLogger Logger
  437. // IsolationLevel controls the visibility of transactional records.
  438. // ReadUncommitted makes all records visible. With ReadCommitted only
  439. // non-transactional and committed records are visible.
  440. IsolationLevel IsolationLevel
  441. // Limit of how many attempts to connect will be made before returning the error.
  442. //
  443. // The default is to try 3 times.
  444. MaxAttempts int
  445. // OffsetOutOfRangeError indicates that the reader should return an error in
  446. // the event of an OffsetOutOfRange error, rather than retrying indefinitely.
  447. // This flag is being added to retain backwards-compatibility, so it will be
  448. // removed in a future version of kafka-go.
  449. OffsetOutOfRangeError bool
  450. }
  451. // Validate method validates ReaderConfig properties.
  452. func (config *ReaderConfig) Validate() error {
  453. if len(config.Brokers) == 0 {
  454. return errors.New("cannot create a new kafka reader with an empty list of broker addresses")
  455. }
  456. if config.Partition < 0 || config.Partition >= math.MaxInt32 {
  457. return fmt.Errorf("partition number out of bounds: %d", config.Partition)
  458. }
  459. if config.MinBytes < 0 {
  460. return fmt.Errorf("invalid negative minimum batch size (min = %d)", config.MinBytes)
  461. }
  462. if config.MaxBytes < 0 {
  463. return fmt.Errorf("invalid negative maximum batch size (max = %d)", config.MaxBytes)
  464. }
  465. if config.GroupID != "" {
  466. if config.Partition != 0 {
  467. return errors.New("either Partition or GroupID may be specified, but not both")
  468. }
  469. if len(config.Topic) == 0 && len(config.GroupTopics) == 0 {
  470. return errors.New("either Topic or GroupTopics must be specified with GroupID")
  471. }
  472. } else if len(config.Topic) == 0 {
  473. return errors.New("cannot create a new kafka reader with an empty topic")
  474. }
  475. if config.MinBytes > config.MaxBytes {
  476. return fmt.Errorf("minimum batch size greater than the maximum (min = %d, max = %d)", config.MinBytes, config.MaxBytes)
  477. }
  478. if config.ReadBackoffMax < 0 {
  479. return fmt.Errorf("ReadBackoffMax out of bounds: %d", config.ReadBackoffMax)
  480. }
  481. if config.ReadBackoffMin < 0 {
  482. return fmt.Errorf("ReadBackoffMin out of bounds: %d", config.ReadBackoffMin)
  483. }
  484. return nil
  485. }
  486. // ReaderStats is a data structure returned by a call to Reader.Stats that exposes
  487. // details about the behavior of the reader.
  488. type ReaderStats struct {
  489. Dials int64 `metric:"kafka.reader.dial.count" type:"counter"`
  490. Fetches int64 `metric:"kafka.reader.fetch.count" type:"counter"`
  491. Messages int64 `metric:"kafka.reader.message.count" type:"counter"`
  492. Bytes int64 `metric:"kafka.reader.message.bytes" type:"counter"`
  493. Rebalances int64 `metric:"kafka.reader.rebalance.count" type:"counter"`
  494. Timeouts int64 `metric:"kafka.reader.timeout.count" type:"counter"`
  495. Errors int64 `metric:"kafka.reader.error.count" type:"counter"`
  496. DialTime DurationStats `metric:"kafka.reader.dial.seconds"`
  497. ReadTime DurationStats `metric:"kafka.reader.read.seconds"`
  498. WaitTime DurationStats `metric:"kafka.reader.wait.seconds"`
  499. FetchSize SummaryStats `metric:"kafka.reader.fetch.size"`
  500. FetchBytes SummaryStats `metric:"kafka.reader.fetch.bytes"`
  501. Offset int64 `metric:"kafka.reader.offset" type:"gauge"`
  502. Lag int64 `metric:"kafka.reader.lag" type:"gauge"`
  503. MinBytes int64 `metric:"kafka.reader.fetch_bytes.min" type:"gauge"`
  504. MaxBytes int64 `metric:"kafka.reader.fetch_bytes.max" type:"gauge"`
  505. MaxWait time.Duration `metric:"kafka.reader.fetch_wait.max" type:"gauge"`
  506. QueueLength int64 `metric:"kafka.reader.queue.length" type:"gauge"`
  507. QueueCapacity int64 `metric:"kafka.reader.queue.capacity" type:"gauge"`
  508. ClientID string `tag:"client_id"`
  509. Topic string `tag:"topic"`
  510. Partition string `tag:"partition"`
  511. // The original `Fetches` field had a typo where the metric name was called
  512. // "kafak..." instead of "kafka...", in order to offer time to fix monitors
  513. // that may be relying on this mistake we are temporarily introducing this
  514. // field.
  515. DeprecatedFetchesWithTypo int64 `metric:"kafak.reader.fetch.count" type:"counter"`
  516. }
  517. // readerStats is a struct that contains statistics on a reader.
  518. type readerStats struct {
  519. dials counter
  520. fetches counter
  521. messages counter
  522. bytes counter
  523. rebalances counter
  524. timeouts counter
  525. errors counter
  526. dialTime summary
  527. readTime summary
  528. waitTime summary
  529. fetchSize summary
  530. fetchBytes summary
  531. offset gauge
  532. lag gauge
  533. partition string
  534. }
  535. // NewReader creates and returns a new Reader configured with config.
  536. // The offset is initialized to FirstOffset.
  537. func NewReader(config ReaderConfig) *Reader {
  538. if err := config.Validate(); err != nil {
  539. panic(err)
  540. }
  541. if config.GroupID != "" {
  542. if len(config.GroupBalancers) == 0 {
  543. config.GroupBalancers = []GroupBalancer{
  544. RangeGroupBalancer{},
  545. RoundRobinGroupBalancer{},
  546. }
  547. }
  548. }
  549. if config.Dialer == nil {
  550. config.Dialer = DefaultDialer
  551. }
  552. if config.MaxBytes == 0 {
  553. config.MaxBytes = 1e6 // 1 MB
  554. }
  555. if config.MinBytes == 0 {
  556. config.MinBytes = defaultFetchMinBytes
  557. }
  558. if config.MaxWait == 0 {
  559. config.MaxWait = 10 * time.Second
  560. }
  561. if config.ReadBatchTimeout == 0 {
  562. config.ReadBatchTimeout = 10 * time.Second
  563. }
  564. if config.ReadLagInterval == 0 {
  565. config.ReadLagInterval = 1 * time.Minute
  566. }
  567. if config.ReadBackoffMin == 0 {
  568. config.ReadBackoffMin = defaultReadBackoffMin
  569. }
  570. if config.ReadBackoffMax == 0 {
  571. config.ReadBackoffMax = defaultReadBackoffMax
  572. }
  573. if config.ReadBackoffMax < config.ReadBackoffMin {
  574. panic(fmt.Errorf("ReadBackoffMax %d smaller than ReadBackoffMin %d", config.ReadBackoffMax, config.ReadBackoffMin))
  575. }
  576. if config.QueueCapacity == 0 {
  577. config.QueueCapacity = 100
  578. }
  579. if config.MaxAttempts == 0 {
  580. config.MaxAttempts = 3
  581. }
  582. // when configured as a consumer group; stats should report a partition of -1
  583. readerStatsPartition := config.Partition
  584. if config.GroupID != "" {
  585. readerStatsPartition = -1
  586. }
  587. // when configured as a consume group, start version as 1 to ensure that only
  588. // the rebalance function will start readers
  589. version := int64(0)
  590. if config.GroupID != "" {
  591. version = 1
  592. }
  593. stctx, stop := context.WithCancel(context.Background())
  594. r := &Reader{
  595. config: config,
  596. msgs: make(chan readerMessage, config.QueueCapacity),
  597. cancel: func() {},
  598. commits: make(chan commitRequest, config.QueueCapacity),
  599. stop: stop,
  600. offset: FirstOffset,
  601. stctx: stctx,
  602. stats: &readerStats{
  603. dialTime: makeSummary(),
  604. readTime: makeSummary(),
  605. waitTime: makeSummary(),
  606. fetchSize: makeSummary(),
  607. fetchBytes: makeSummary(),
  608. // Generate the string representation of the partition number only
  609. // once when the reader is created.
  610. partition: strconv.Itoa(readerStatsPartition),
  611. },
  612. version: version,
  613. }
  614. if r.useConsumerGroup() {
  615. r.done = make(chan struct{})
  616. r.runError = make(chan error)
  617. cg, err := NewConsumerGroup(ConsumerGroupConfig{
  618. ID: r.config.GroupID,
  619. Brokers: r.config.Brokers,
  620. Dialer: r.config.Dialer,
  621. Topics: r.getTopics(),
  622. GroupBalancers: r.config.GroupBalancers,
  623. HeartbeatInterval: r.config.HeartbeatInterval,
  624. PartitionWatchInterval: r.config.PartitionWatchInterval,
  625. WatchPartitionChanges: r.config.WatchPartitionChanges,
  626. SessionTimeout: r.config.SessionTimeout,
  627. RebalanceTimeout: r.config.RebalanceTimeout,
  628. JoinGroupBackoff: r.config.JoinGroupBackoff,
  629. RetentionTime: r.config.RetentionTime,
  630. StartOffset: r.config.StartOffset,
  631. Logger: r.config.Logger,
  632. ErrorLogger: r.config.ErrorLogger,
  633. })
  634. if err != nil {
  635. panic(err)
  636. }
  637. go r.run(cg)
  638. }
  639. return r
  640. }
  641. // Config returns the reader's configuration.
  642. func (r *Reader) Config() ReaderConfig {
  643. return r.config
  644. }
  645. // Close closes the stream, preventing the program from reading any more
  646. // messages from it.
  647. func (r *Reader) Close() error {
  648. atomic.StoreUint32(&r.once, 1)
  649. r.mutex.Lock()
  650. closed := r.closed
  651. r.closed = true
  652. r.mutex.Unlock()
  653. r.cancel()
  654. r.stop()
  655. r.join.Wait()
  656. if r.done != nil {
  657. <-r.done
  658. }
  659. if !closed {
  660. close(r.msgs)
  661. }
  662. return nil
  663. }
  664. // ReadMessage reads and return the next message from the r. The method call
  665. // blocks until a message becomes available, or an error occurs. The program
  666. // may also specify a context to asynchronously cancel the blocking operation.
  667. //
  668. // The method returns io.EOF to indicate that the reader has been closed.
  669. //
  670. // If consumer groups are used, ReadMessage will automatically commit the
  671. // offset when called. Note that this could result in an offset being committed
  672. // before the message is fully processed.
  673. //
  674. // If more fine-grained control of when offsets are committed is required, it
  675. // is recommended to use FetchMessage with CommitMessages instead.
  676. func (r *Reader) ReadMessage(ctx context.Context) (Message, error) {
  677. m, err := r.FetchMessage(ctx)
  678. if err != nil {
  679. return Message{}, fmt.Errorf("fetching message: %w", err)
  680. }
  681. if r.useConsumerGroup() {
  682. if err := r.CommitMessages(ctx, m); err != nil {
  683. return Message{}, fmt.Errorf("committing message: %w", err)
  684. }
  685. }
  686. return m, nil
  687. }
  688. // FetchMessage reads and return the next message from the r. The method call
  689. // blocks until a message becomes available, or an error occurs. The program
  690. // may also specify a context to asynchronously cancel the blocking operation.
  691. //
  692. // The method returns io.EOF to indicate that the reader has been closed.
  693. //
  694. // FetchMessage does not commit offsets automatically when using consumer groups.
  695. // Use CommitMessages to commit the offset.
  696. func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
  697. r.activateReadLag()
  698. for {
  699. r.mutex.Lock()
  700. if !r.closed && r.version == 0 {
  701. r.start(r.getTopicPartitionOffset())
  702. }
  703. version := r.version
  704. r.mutex.Unlock()
  705. select {
  706. case <-ctx.Done():
  707. return Message{}, ctx.Err()
  708. case err := <-r.runError:
  709. return Message{}, err
  710. case m, ok := <-r.msgs:
  711. if !ok {
  712. return Message{}, io.EOF
  713. }
  714. if m.version >= version {
  715. r.mutex.Lock()
  716. switch {
  717. case m.error != nil:
  718. case version == r.version:
  719. r.offset = m.message.Offset + 1
  720. r.lag = m.watermark - r.offset
  721. }
  722. r.mutex.Unlock()
  723. if errors.Is(m.error, io.EOF) {
  724. // io.EOF is used as a marker to indicate that the stream
  725. // has been closed, in case it was received from the inner
  726. // reader we don't want to confuse the program and replace
  727. // the error with io.ErrUnexpectedEOF.
  728. m.error = io.ErrUnexpectedEOF
  729. }
  730. return m.message, m.error
  731. }
  732. }
  733. }
  734. }
  735. // CommitMessages commits the list of messages passed as argument. The program
  736. // may pass a context to asynchronously cancel the commit operation when it was
  737. // configured to be blocking.
  738. //
  739. // Because kafka consumer groups track a single offset per partition, the
  740. // highest message offset passed to CommitMessages will cause all previous
  741. // messages to be committed. Applications need to account for these Kafka
  742. // limitations when committing messages, and maintain message ordering if they
  743. // need strong delivery guarantees. This property makes it valid to pass only
  744. // the last message seen to CommitMessages in order to move the offset of the
  745. // topic/partition it belonged to forward, effectively committing all previous
  746. // messages in the partition.
  747. func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
  748. if !r.useConsumerGroup() {
  749. return errOnlyAvailableWithGroup
  750. }
  751. var errch <-chan error
  752. creq := commitRequest{
  753. commits: makeCommits(msgs...),
  754. }
  755. if r.useSyncCommits() {
  756. ch := make(chan error, 1)
  757. errch, creq.errch = ch, ch
  758. }
  759. select {
  760. case r.commits <- creq:
  761. case <-ctx.Done():
  762. return ctx.Err()
  763. case <-r.stctx.Done():
  764. // This context is used to ensure we don't allow commits after the
  765. // reader was closed.
  766. return io.ErrClosedPipe
  767. }
  768. if !r.useSyncCommits() {
  769. return nil
  770. }
  771. select {
  772. case <-ctx.Done():
  773. return ctx.Err()
  774. case err := <-errch:
  775. return err
  776. }
  777. }
  778. // ReadLag returns the current lag of the reader by fetching the last offset of
  779. // the topic and partition and computing the difference between that value and
  780. // the offset of the last message returned by ReadMessage.
  781. //
  782. // This method is intended to be used in cases where a program may be unable to
  783. // call ReadMessage to update the value returned by Lag, but still needs to get
  784. // an up to date estimation of how far behind the reader is. For example when
  785. // the consumer is not ready to process the next message.
  786. //
  787. // The function returns a lag of zero when the reader's current offset is
  788. // negative.
  789. func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) {
  790. if r.useConsumerGroup() {
  791. return 0, errNotAvailableWithGroup
  792. }
  793. type offsets struct {
  794. first int64
  795. last int64
  796. }
  797. offch := make(chan offsets, 1)
  798. errch := make(chan error, 1)
  799. go func() {
  800. var off offsets
  801. var err error
  802. for _, broker := range r.config.Brokers {
  803. var conn *Conn
  804. if conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition); err != nil {
  805. continue
  806. }
  807. deadline, _ := ctx.Deadline()
  808. conn.SetDeadline(deadline)
  809. off.first, off.last, err = conn.ReadOffsets()
  810. conn.Close()
  811. if err == nil {
  812. break
  813. }
  814. }
  815. if err != nil {
  816. errch <- err
  817. } else {
  818. offch <- off
  819. }
  820. }()
  821. select {
  822. case off := <-offch:
  823. switch cur := r.Offset(); {
  824. case cur == FirstOffset:
  825. lag = off.last - off.first
  826. case cur == LastOffset:
  827. lag = 0
  828. default:
  829. lag = off.last - cur
  830. }
  831. case err = <-errch:
  832. case <-ctx.Done():
  833. err = ctx.Err()
  834. }
  835. return
  836. }
  837. // Offset returns the current absolute offset of the reader, or -1
  838. // if r is backed by a consumer group.
  839. func (r *Reader) Offset() int64 {
  840. if r.useConsumerGroup() {
  841. return -1
  842. }
  843. r.mutex.Lock()
  844. offset := r.offset
  845. r.mutex.Unlock()
  846. r.withLogger(func(log Logger) {
  847. log.Printf("looking up offset of kafka reader for partition %d of %s: %s", r.config.Partition, r.config.Topic, toHumanOffset(offset))
  848. })
  849. return offset
  850. }
  851. // Lag returns the lag of the last message returned by ReadMessage, or -1
  852. // if r is backed by a consumer group.
  853. func (r *Reader) Lag() int64 {
  854. if r.useConsumerGroup() {
  855. return -1
  856. }
  857. r.mutex.Lock()
  858. lag := r.lag
  859. r.mutex.Unlock()
  860. return lag
  861. }
  862. // SetOffset changes the offset from which the next batch of messages will be
  863. // read. The method fails with io.ErrClosedPipe if the reader has already been closed.
  864. //
  865. // From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first
  866. // or last available offset in the partition. Please note while -1 and -2 were accepted
  867. // to indicate the first or last offset in previous versions, the meanings of the numbers
  868. // were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol
  869. // specification.
  870. func (r *Reader) SetOffset(offset int64) error {
  871. if r.useConsumerGroup() {
  872. return errNotAvailableWithGroup
  873. }
  874. var err error
  875. r.mutex.Lock()
  876. if r.closed {
  877. err = io.ErrClosedPipe
  878. } else if offset != r.offset {
  879. r.withLogger(func(log Logger) {
  880. log.Printf("setting the offset of the kafka reader for partition %d of %s from %s to %s",
  881. r.config.Partition, r.config.Topic, toHumanOffset(r.offset), toHumanOffset(offset))
  882. })
  883. r.offset = offset
  884. if r.version != 0 {
  885. r.start(r.getTopicPartitionOffset())
  886. }
  887. r.activateReadLag()
  888. }
  889. r.mutex.Unlock()
  890. return err
  891. }
  892. // SetOffsetAt changes the offset from which the next batch of messages will be
  893. // read given the timestamp t.
  894. //
  895. // The method fails if the unable to connect partition leader, or unable to read the offset
  896. // given the ts, or if the reader has been closed.
  897. func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error {
  898. r.mutex.Lock()
  899. if r.closed {
  900. r.mutex.Unlock()
  901. return io.ErrClosedPipe
  902. }
  903. r.mutex.Unlock()
  904. if len(r.config.Brokers) < 1 {
  905. return errors.New("no brokers in config")
  906. }
  907. var conn *Conn
  908. var err error
  909. for _, broker := range r.config.Brokers {
  910. conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition)
  911. if err != nil {
  912. continue
  913. }
  914. deadline, _ := ctx.Deadline()
  915. conn.SetDeadline(deadline)
  916. offset, err := conn.ReadOffset(t)
  917. conn.Close()
  918. if err != nil {
  919. return err
  920. }
  921. return r.SetOffset(offset)
  922. }
  923. return fmt.Errorf("error dialing all brokers, one of the errors: %w", err)
  924. }
  925. // Stats returns a snapshot of the reader stats since the last time the method
  926. // was called, or since the reader was created if it is called for the first
  927. // time.
  928. //
  929. // A typical use of this method is to spawn a goroutine that will periodically
  930. // call Stats on a kafka reader and report the metrics to a stats collection
  931. // system.
  932. func (r *Reader) Stats() ReaderStats {
  933. stats := ReaderStats{
  934. Dials: r.stats.dials.snapshot(),
  935. Fetches: r.stats.fetches.snapshot(),
  936. Messages: r.stats.messages.snapshot(),
  937. Bytes: r.stats.bytes.snapshot(),
  938. Rebalances: r.stats.rebalances.snapshot(),
  939. Timeouts: r.stats.timeouts.snapshot(),
  940. Errors: r.stats.errors.snapshot(),
  941. DialTime: r.stats.dialTime.snapshotDuration(),
  942. ReadTime: r.stats.readTime.snapshotDuration(),
  943. WaitTime: r.stats.waitTime.snapshotDuration(),
  944. FetchSize: r.stats.fetchSize.snapshot(),
  945. FetchBytes: r.stats.fetchBytes.snapshot(),
  946. Offset: r.stats.offset.snapshot(),
  947. Lag: r.stats.lag.snapshot(),
  948. MinBytes: int64(r.config.MinBytes),
  949. MaxBytes: int64(r.config.MaxBytes),
  950. MaxWait: r.config.MaxWait,
  951. QueueLength: int64(len(r.msgs)),
  952. QueueCapacity: int64(cap(r.msgs)),
  953. ClientID: r.config.Dialer.ClientID,
  954. Topic: r.config.Topic,
  955. Partition: r.stats.partition,
  956. }
  957. // TODO: remove when we get rid of the deprecated field.
  958. stats.DeprecatedFetchesWithTypo = stats.Fetches
  959. return stats
  960. }
  961. func (r *Reader) getTopicPartitionOffset() map[topicPartition]int64 {
  962. key := topicPartition{topic: r.config.Topic, partition: int32(r.config.Partition)}
  963. return map[topicPartition]int64{key: r.offset}
  964. }
  965. func (r *Reader) withLogger(do func(Logger)) {
  966. if r.config.Logger != nil {
  967. do(r.config.Logger)
  968. }
  969. }
  970. func (r *Reader) withErrorLogger(do func(Logger)) {
  971. if r.config.ErrorLogger != nil {
  972. do(r.config.ErrorLogger)
  973. } else {
  974. r.withLogger(do)
  975. }
  976. }
  977. func (r *Reader) activateReadLag() {
  978. if r.config.ReadLagInterval > 0 && atomic.CompareAndSwapUint32(&r.once, 0, 1) {
  979. // read lag will only be calculated when not using consumer groups
  980. // todo discuss how capturing read lag should interact with rebalancing
  981. if !r.useConsumerGroup() {
  982. go r.readLag(r.stctx)
  983. }
  984. }
  985. }
  986. func (r *Reader) readLag(ctx context.Context) {
  987. ticker := time.NewTicker(r.config.ReadLagInterval)
  988. defer ticker.Stop()
  989. for {
  990. timeout, cancel := context.WithTimeout(ctx, r.config.ReadLagInterval/2)
  991. lag, err := r.ReadLag(timeout)
  992. cancel()
  993. if err != nil {
  994. r.stats.errors.observe(1)
  995. r.withErrorLogger(func(log Logger) {
  996. log.Printf("kafka reader failed to read lag of partition %d of %s: %s", r.config.Partition, r.config.Topic, err)
  997. })
  998. } else {
  999. r.stats.lag.observe(lag)
  1000. }
  1001. select {
  1002. case <-ticker.C:
  1003. case <-ctx.Done():
  1004. return
  1005. }
  1006. }
  1007. }
  1008. func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
  1009. if r.closed {
  1010. // don't start child reader if parent Reader is closed
  1011. return
  1012. }
  1013. ctx, cancel := context.WithCancel(context.Background())
  1014. r.cancel() // always cancel the previous reader
  1015. r.cancel = cancel
  1016. r.version++
  1017. r.join.Add(len(offsetsByPartition))
  1018. for key, offset := range offsetsByPartition {
  1019. go func(ctx context.Context, key topicPartition, offset int64, join *sync.WaitGroup) {
  1020. defer join.Done()
  1021. (&reader{
  1022. dialer: r.config.Dialer,
  1023. logger: r.config.Logger,
  1024. errorLogger: r.config.ErrorLogger,
  1025. brokers: r.config.Brokers,
  1026. topic: key.topic,
  1027. partition: int(key.partition),
  1028. minBytes: r.config.MinBytes,
  1029. maxBytes: r.config.MaxBytes,
  1030. maxWait: r.config.MaxWait,
  1031. readBatchTimeout: r.config.ReadBatchTimeout,
  1032. backoffDelayMin: r.config.ReadBackoffMin,
  1033. backoffDelayMax: r.config.ReadBackoffMax,
  1034. version: r.version,
  1035. msgs: r.msgs,
  1036. stats: r.stats,
  1037. isolationLevel: r.config.IsolationLevel,
  1038. maxAttempts: r.config.MaxAttempts,
  1039. // backwards-compatibility flags
  1040. offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
  1041. }).run(ctx, offset)
  1042. }(ctx, key, offset, &r.join)
  1043. }
  1044. }
  1045. // A reader reads messages from kafka and produces them on its channels, it's
  1046. // used as a way to asynchronously fetch messages while the main program reads
  1047. // them using the high level reader API.
  1048. type reader struct {
  1049. dialer *Dialer
  1050. logger Logger
  1051. errorLogger Logger
  1052. brokers []string
  1053. topic string
  1054. partition int
  1055. minBytes int
  1056. maxBytes int
  1057. maxWait time.Duration
  1058. readBatchTimeout time.Duration
  1059. backoffDelayMin time.Duration
  1060. backoffDelayMax time.Duration
  1061. version int64
  1062. msgs chan<- readerMessage
  1063. stats *readerStats
  1064. isolationLevel IsolationLevel
  1065. maxAttempts int
  1066. offsetOutOfRangeError bool
  1067. }
  1068. type readerMessage struct {
  1069. version int64
  1070. message Message
  1071. watermark int64
  1072. error error
  1073. }
  1074. func (r *reader) run(ctx context.Context, offset int64) {
  1075. // This is the reader's main loop, it only ends if the context is canceled
  1076. // and will keep attempting to reader messages otherwise.
  1077. //
  1078. // Retrying indefinitely has the nice side effect of preventing Read calls
  1079. // on the parent reader to block if connection to the kafka server fails,
  1080. // the reader keeps reporting errors on the error channel which will then
  1081. // be surfaced to the program.
  1082. // If the reader wasn't retrying then the program would block indefinitely
  1083. // on a Read call after reading the first error.
  1084. for attempt := 0; true; attempt++ {
  1085. if attempt != 0 {
  1086. if !sleep(ctx, backoff(attempt, r.backoffDelayMin, r.backoffDelayMax)) {
  1087. return
  1088. }
  1089. }
  1090. r.withLogger(func(log Logger) {
  1091. log.Printf("initializing kafka reader for partition %d of %s starting at offset %d", r.partition, r.topic, toHumanOffset(offset))
  1092. })
  1093. conn, start, err := r.initialize(ctx, offset)
  1094. if err != nil {
  1095. if errors.Is(err, OffsetOutOfRange) {
  1096. if r.offsetOutOfRangeError {
  1097. r.sendError(ctx, err)
  1098. return
  1099. }
  1100. // This would happen if the requested offset is passed the last
  1101. // offset on the partition leader. In that case we're just going
  1102. // to retry later hoping that enough data has been produced.
  1103. r.withErrorLogger(func(log Logger) {
  1104. log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err)
  1105. })
  1106. continue
  1107. }
  1108. // Perform a configured number of attempts before
  1109. // reporting first errors, this helps mitigate
  1110. // situations where the kafka server is temporarily
  1111. // unavailable.
  1112. if attempt >= r.maxAttempts {
  1113. r.sendError(ctx, err)
  1114. } else {
  1115. r.stats.errors.observe(1)
  1116. r.withErrorLogger(func(log Logger) {
  1117. log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err)
  1118. })
  1119. }
  1120. continue
  1121. }
  1122. // Resetting the attempt counter ensures that if a failure occurs after
  1123. // a successful initialization we don't keep increasing the backoff
  1124. // timeout.
  1125. attempt = 0
  1126. // Now we're sure to have an absolute offset number, may anything happen
  1127. // to the connection we know we'll want to restart from this offset.
  1128. offset = start
  1129. errcount := 0
  1130. readLoop:
  1131. for {
  1132. if !sleep(ctx, backoff(errcount, r.backoffDelayMin, r.backoffDelayMax)) {
  1133. conn.Close()
  1134. return
  1135. }
  1136. offset, err = r.read(ctx, offset, conn)
  1137. switch {
  1138. case err == nil:
  1139. errcount = 0
  1140. continue
  1141. case errors.Is(err, io.EOF):
  1142. // done with this batch of messages...carry on. note that this
  1143. // block relies on the batch repackaging real io.EOF errors as
  1144. // io.UnexpectedEOF. otherwise, we would end up swallowing real
  1145. // errors here.
  1146. errcount = 0
  1147. continue
  1148. case errors.Is(err, io.ErrNoProgress):
  1149. // This error is returned by the Conn when it believes the connection
  1150. // has been corrupted, so we need to explicitly close it. Since we are
  1151. // explicitly handling it and a retry will pick up, we can suppress the
  1152. // error metrics and logs for this case.
  1153. conn.Close()
  1154. break readLoop
  1155. case errors.Is(err, UnknownTopicOrPartition):
  1156. r.withErrorLogger(func(log Logger) {
  1157. log.Printf("failed to read from current broker %v for partition %d of %s at offset %d: %v", r.brokers, r.partition, r.topic, toHumanOffset(offset), err)
  1158. })
  1159. conn.Close()
  1160. // The next call to .initialize will re-establish a connection to the proper
  1161. // topic/partition broker combo.
  1162. r.stats.rebalances.observe(1)
  1163. break readLoop
  1164. case errors.Is(err, NotLeaderForPartition):
  1165. r.withErrorLogger(func(log Logger) {
  1166. log.Printf("failed to read from current broker for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err)
  1167. })
  1168. conn.Close()
  1169. // The next call to .initialize will re-establish a connection to the proper
  1170. // partition leader.
  1171. r.stats.rebalances.observe(1)
  1172. break readLoop
  1173. case errors.Is(err, RequestTimedOut):
  1174. // Timeout on the kafka side, this can be safely retried.
  1175. errcount = 0
  1176. r.withLogger(func(log Logger) {
  1177. log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err)
  1178. })
  1179. r.stats.timeouts.observe(1)
  1180. continue
  1181. case errors.Is(err, OffsetOutOfRange):
  1182. first, last, err := r.readOffsets(conn)
  1183. if err != nil {
  1184. r.withErrorLogger(func(log Logger) {
  1185. log.Printf("the kafka reader got an error while attempting to determine whether it was reading before the first offset or after the last offset of partition %d of %s: %s", r.partition, r.topic, err)
  1186. })
  1187. conn.Close()
  1188. break readLoop
  1189. }
  1190. switch {
  1191. case offset < first:
  1192. r.withErrorLogger(func(log Logger) {
  1193. log.Printf("the kafka reader is reading before the first offset for partition %d of %s, skipping from offset %d to %d (%d messages)", r.partition, r.topic, toHumanOffset(offset), first, first-offset)
  1194. })
  1195. offset, errcount = first, 0
  1196. continue // retry immediately so we don't keep falling behind due to the backoff
  1197. case offset < last:
  1198. errcount = 0
  1199. continue // more messages have already become available, retry immediately
  1200. default:
  1201. // We may be reading past the last offset, will retry later.
  1202. r.withErrorLogger(func(log Logger) {
  1203. log.Printf("the kafka reader is reading passed the last offset for partition %d of %s at offset %d", r.partition, r.topic, toHumanOffset(offset))
  1204. })
  1205. }
  1206. case errors.Is(err, context.Canceled):
  1207. // Another reader has taken over, we can safely quit.
  1208. conn.Close()
  1209. return
  1210. case errors.Is(err, errUnknownCodec):
  1211. // The compression codec is either unsupported or has not been
  1212. // imported. This is a fatal error b/c the reader cannot
  1213. // proceed.
  1214. r.sendError(ctx, err)
  1215. break readLoop
  1216. default:
  1217. var kafkaError Error
  1218. if errors.As(err, &kafkaError) {
  1219. r.sendError(ctx, err)
  1220. } else {
  1221. r.withErrorLogger(func(log Logger) {
  1222. log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d: %s", r.partition, r.topic, toHumanOffset(offset), err)
  1223. })
  1224. r.stats.errors.observe(1)
  1225. conn.Close()
  1226. break readLoop
  1227. }
  1228. }
  1229. errcount++
  1230. }
  1231. }
  1232. }
  1233. func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, start int64, err error) {
  1234. for i := 0; i != len(r.brokers) && conn == nil; i++ {
  1235. broker := r.brokers[i]
  1236. var first, last int64
  1237. t0 := time.Now()
  1238. conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition)
  1239. t1 := time.Now()
  1240. r.stats.dials.observe(1)
  1241. r.stats.dialTime.observeDuration(t1.Sub(t0))
  1242. if err != nil {
  1243. continue
  1244. }
  1245. if first, last, err = r.readOffsets(conn); err != nil {
  1246. conn.Close()
  1247. conn = nil
  1248. break
  1249. }
  1250. switch {
  1251. case offset == FirstOffset:
  1252. offset = first
  1253. case offset == LastOffset:
  1254. offset = last
  1255. case offset < first:
  1256. offset = first
  1257. }
  1258. r.withLogger(func(log Logger) {
  1259. log.Printf("the kafka reader for partition %d of %s is seeking to offset %d", r.partition, r.topic, toHumanOffset(offset))
  1260. })
  1261. if start, err = conn.Seek(offset, SeekAbsolute); err != nil {
  1262. conn.Close()
  1263. conn = nil
  1264. break
  1265. }
  1266. conn.SetDeadline(time.Time{})
  1267. }
  1268. return
  1269. }
  1270. func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
  1271. r.stats.fetches.observe(1)
  1272. r.stats.offset.observe(offset)
  1273. t0 := time.Now()
  1274. conn.SetReadDeadline(t0.Add(r.maxWait))
  1275. batch := conn.ReadBatchWith(ReadBatchConfig{
  1276. MinBytes: r.minBytes,
  1277. MaxBytes: r.maxBytes,
  1278. IsolationLevel: r.isolationLevel,
  1279. })
  1280. highWaterMark := batch.HighWaterMark()
  1281. t1 := time.Now()
  1282. r.stats.waitTime.observeDuration(t1.Sub(t0))
  1283. var msg Message
  1284. var err error
  1285. var size int64
  1286. var bytes int64
  1287. for {
  1288. conn.SetReadDeadline(time.Now().Add(r.readBatchTimeout))
  1289. if msg, err = batch.ReadMessage(); err != nil {
  1290. batch.Close()
  1291. break
  1292. }
  1293. n := int64(len(msg.Key) + len(msg.Value))
  1294. r.stats.messages.observe(1)
  1295. r.stats.bytes.observe(n)
  1296. if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
  1297. batch.Close()
  1298. break
  1299. }
  1300. offset = msg.Offset + 1
  1301. r.stats.offset.observe(offset)
  1302. r.stats.lag.observe(highWaterMark - offset)
  1303. size++
  1304. bytes += n
  1305. }
  1306. conn.SetReadDeadline(time.Time{})
  1307. t2 := time.Now()
  1308. r.stats.readTime.observeDuration(t2.Sub(t1))
  1309. r.stats.fetchSize.observe(size)
  1310. r.stats.fetchBytes.observe(bytes)
  1311. return offset, err
  1312. }
  1313. func (r *reader) readOffsets(conn *Conn) (first, last int64, err error) {
  1314. conn.SetDeadline(time.Now().Add(10 * time.Second))
  1315. return conn.ReadOffsets()
  1316. }
  1317. func (r *reader) sendMessage(ctx context.Context, msg Message, watermark int64) error {
  1318. select {
  1319. case r.msgs <- readerMessage{version: r.version, message: msg, watermark: watermark}:
  1320. return nil
  1321. case <-ctx.Done():
  1322. return ctx.Err()
  1323. }
  1324. }
  1325. func (r *reader) sendError(ctx context.Context, err error) error {
  1326. select {
  1327. case r.msgs <- readerMessage{version: r.version, error: err}:
  1328. return nil
  1329. case <-ctx.Done():
  1330. return ctx.Err()
  1331. }
  1332. }
  1333. func (r *reader) withLogger(do func(Logger)) {
  1334. if r.logger != nil {
  1335. do(r.logger)
  1336. }
  1337. }
  1338. func (r *reader) withErrorLogger(do func(Logger)) {
  1339. if r.errorLogger != nil {
  1340. do(r.errorLogger)
  1341. } else {
  1342. r.withLogger(do)
  1343. }
  1344. }
  1345. // extractTopics returns the unique list of topics represented by the set of
  1346. // provided members.
  1347. func extractTopics(members []GroupMember) []string {
  1348. visited := map[string]struct{}{}
  1349. var topics []string
  1350. for _, member := range members {
  1351. for _, topic := range member.Topics {
  1352. if _, seen := visited[topic]; seen {
  1353. continue
  1354. }
  1355. topics = append(topics, topic)
  1356. visited[topic] = struct{}{}
  1357. }
  1358. }
  1359. sort.Strings(topics)
  1360. return topics
  1361. }
  1362. type humanOffset int64
  1363. func toHumanOffset(v int64) humanOffset {
  1364. return humanOffset(v)
  1365. }
  1366. func (offset humanOffset) Format(w fmt.State, _ rune) {
  1367. v := int64(offset)
  1368. switch v {
  1369. case FirstOffset:
  1370. fmt.Fprint(w, "first offset")
  1371. case LastOffset:
  1372. fmt.Fprint(w, "last offset")
  1373. default:
  1374. fmt.Fprint(w, strconv.FormatInt(v, 10))
  1375. }
  1376. }