reader.go 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552
  1. package kafka
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "sort"
  9. "strconv"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. const (
  15. LastOffset int64 = -1 // The most recent offset available for a partition.
  16. FirstOffset int64 = -2 // The least recent offset available for a partition.
  17. )
  18. const (
  19. // defaultCommitRetries holds the number commit attempts to make
  20. // before giving up
  21. defaultCommitRetries = 3
  22. )
  23. const (
  24. // defaultFetchMinBytes of 1 byte means that fetch requests are answered as
  25. // soon as a single byte of data is available or the fetch request times out
  26. // waiting for data to arrive.
  27. defaultFetchMinBytes = 1
  28. )
  29. var (
  30. errOnlyAvailableWithGroup = errors.New("unavailable when GroupID is not set")
  31. errNotAvailableWithGroup = errors.New("unavailable when GroupID is set")
  32. )
  33. const (
  34. // defaultReadBackoffMax/Min sets the boundaries for how long the reader wait before
  35. // polling for new messages
  36. defaultReadBackoffMin = 100 * time.Millisecond
  37. defaultReadBackoffMax = 1 * time.Second
  38. )
  39. // Reader provides a high-level API for consuming messages from kafka.
  40. //
  41. // A Reader automatically manages reconnections to a kafka server, and
  42. // blocking methods have context support for asynchronous cancellations.
  43. //
  44. // Note that it is important to call `Close()` on a `Reader` when a process exits.
  45. // The kafka server needs a graceful disconnect to stop it from continuing to
  46. // attempt to send messages to the connected clients. The given example will not
  47. // call `Close()` if the process is terminated with SIGINT (ctrl-c at the shell) or
  48. // SIGTERM (as docker stop or a kubernetes restart does). This can result in a
  49. // delay when a new reader on the same topic connects (e.g. new process started
  50. // or new container running). Use a `signal.Notify` handler to close the reader on
  51. // process shutdown.
  52. type Reader struct {
  53. // immutable fields of the reader
  54. config ReaderConfig
  55. // communication channels between the parent reader and its subreaders
  56. msgs chan readerMessage
  57. // mutable fields of the reader (synchronized on the mutex)
  58. mutex sync.Mutex
  59. join sync.WaitGroup
  60. cancel context.CancelFunc
  61. stop context.CancelFunc
  62. done chan struct{}
  63. commits chan commitRequest
  64. version int64 // version holds the generation of the spawned readers
  65. offset int64
  66. lag int64
  67. closed bool
  68. // Without a group subscription (when Reader.config.GroupID == ""),
  69. // when errors occur, the Reader gets a synthetic readerMessage with
  70. // a non-nil err set. With group subscriptions however, when an error
  71. // occurs in Reader.run, there's no reader running (sic, cf. reader vs.
  72. // Reader) and there's no way to let the high-level methods like
  73. // FetchMessage know that an error indeed occurred. If an error in run
  74. // occurs, it will be non-block-sent to this unbuffered channel, where
  75. // the high-level methods can select{} on it and notify the caller.
  76. runError chan error
  77. // reader stats are all made of atomic values, no need for synchronization.
  78. once uint32
  79. stctx context.Context
  80. // reader stats are all made of atomic values, no need for synchronization.
  81. // Use a pointer to ensure 64-bit alignment of the values.
  82. stats *readerStats
  83. }
  84. // useConsumerGroup indicates whether the Reader is part of a consumer group.
  85. func (r *Reader) useConsumerGroup() bool { return r.config.GroupID != "" }
  86. func (r *Reader) getTopics() []string {
  87. if len(r.config.GroupTopics) > 0 {
  88. return r.config.GroupTopics[:]
  89. }
  90. return []string{r.config.Topic}
  91. }
  92. // useSyncCommits indicates whether the Reader is configured to perform sync or
  93. // async commits.
  94. func (r *Reader) useSyncCommits() bool { return r.config.CommitInterval == 0 }
  95. func (r *Reader) unsubscribe() {
  96. r.cancel()
  97. r.join.Wait()
  98. // it would be interesting to drain the r.msgs channel at this point since
  99. // it will contain buffered messages for partitions that may not be
  100. // re-assigned to this reader in the next consumer group generation.
  101. // however, draining the channel could race with the client calling
  102. // ReadMessage, which could result in messages delivered and/or committed
  103. // with gaps in the offset. for now, we will err on the side of caution and
  104. // potentially have those messages be reprocessed in the next generation by
  105. // another consumer to avoid such a race.
  106. }
  107. func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
  108. offsets := make(map[topicPartition]int64)
  109. for topic, assignments := range allAssignments {
  110. for _, assignment := range assignments {
  111. key := topicPartition{
  112. topic: topic,
  113. partition: int32(assignment.ID),
  114. }
  115. offsets[key] = assignment.Offset
  116. }
  117. }
  118. r.mutex.Lock()
  119. r.start(offsets)
  120. r.mutex.Unlock()
  121. r.withLogger(func(l Logger) {
  122. l.Printf("subscribed to topics and partitions: %+v", offsets)
  123. })
  124. }
  125. func (r *Reader) waitThrottleTime(throttleTimeMS int32) {
  126. if throttleTimeMS == 0 {
  127. return
  128. }
  129. t := time.NewTimer(time.Duration(throttleTimeMS) * time.Millisecond)
  130. defer t.Stop()
  131. select {
  132. case <-r.stctx.Done():
  133. return
  134. case <-t.C:
  135. }
  136. }
  137. // commitOffsetsWithRetry attempts to commit the specified offsets and retries
  138. // up to the specified number of times
  139. func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash, retries int) (err error) {
  140. const (
  141. backoffDelayMin = 100 * time.Millisecond
  142. backoffDelayMax = 5 * time.Second
  143. )
  144. for attempt := 0; attempt < retries; attempt++ {
  145. if attempt != 0 {
  146. if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
  147. return
  148. }
  149. }
  150. if err = gen.CommitOffsets(offsetStash); err == nil {
  151. return
  152. }
  153. }
  154. return // err will not be nil
  155. }
  156. // offsetStash holds offsets by topic => partition => offset
  157. type offsetStash map[string]map[int]int64
  158. // merge updates the offsetStash with the offsets from the provided messages
  159. func (o offsetStash) merge(commits []commit) {
  160. for _, c := range commits {
  161. offsetsByPartition, ok := o[c.topic]
  162. if !ok {
  163. offsetsByPartition = map[int]int64{}
  164. o[c.topic] = offsetsByPartition
  165. }
  166. if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset {
  167. offsetsByPartition[c.partition] = c.offset
  168. }
  169. }
  170. }
  171. // reset clears the contents of the offsetStash
  172. func (o offsetStash) reset() {
  173. for key := range o {
  174. delete(o, key)
  175. }
  176. }
  177. // commitLoopImmediate handles each commit synchronously
  178. func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) {
  179. offsets := offsetStash{}
  180. for {
  181. select {
  182. case <-ctx.Done():
  183. return
  184. case req := <-r.commits:
  185. offsets.merge(req.commits)
  186. req.errch <- r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
  187. offsets.reset()
  188. }
  189. }
  190. }
  191. // commitLoopInterval handles each commit asynchronously with a period defined
  192. // by ReaderConfig.CommitInterval
  193. func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) {
  194. ticker := time.NewTicker(r.config.CommitInterval)
  195. defer ticker.Stop()
  196. // the offset stash should not survive rebalances b/c the consumer may
  197. // receive new assignments.
  198. offsets := offsetStash{}
  199. commit := func() {
  200. if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil {
  201. r.withErrorLogger(func(l Logger) { l.Printf(err.Error()) })
  202. } else {
  203. offsets.reset()
  204. }
  205. }
  206. for {
  207. select {
  208. case <-ctx.Done():
  209. // drain the commit channel in order to prepare the final commit.
  210. for hasCommits := true; hasCommits; {
  211. select {
  212. case req := <-r.commits:
  213. offsets.merge(req.commits)
  214. default:
  215. hasCommits = false
  216. }
  217. }
  218. commit()
  219. return
  220. case <-ticker.C:
  221. commit()
  222. case req := <-r.commits:
  223. offsets.merge(req.commits)
  224. }
  225. }
  226. }
  227. // commitLoop processes commits off the commit chan
  228. func (r *Reader) commitLoop(ctx context.Context, gen *Generation) {
  229. r.withLogger(func(l Logger) {
  230. l.Printf("started commit for group %s\n", r.config.GroupID)
  231. })
  232. defer r.withLogger(func(l Logger) {
  233. l.Printf("stopped commit for group %s\n", r.config.GroupID)
  234. })
  235. if r.config.CommitInterval == 0 {
  236. r.commitLoopImmediate(ctx, gen)
  237. } else {
  238. r.commitLoopInterval(ctx, gen)
  239. }
  240. }
  241. // run provides the main consumer group management loop. Each iteration performs the
  242. // handshake to join the Reader to the consumer group.
  243. //
  244. // This function is responsible for closing the consumer group upon exit.
  245. func (r *Reader) run(cg *ConsumerGroup) {
  246. defer close(r.done)
  247. defer cg.Close()
  248. r.withLogger(func(l Logger) {
  249. l.Printf("entering loop for consumer group, %v\n", r.config.GroupID)
  250. })
  251. for {
  252. // Limit the number of attempts at waiting for the next
  253. // consumer generation.
  254. var err error
  255. var gen *Generation
  256. for attempt := 1; attempt <= r.config.MaxAttempts; attempt++ {
  257. gen, err = cg.Next(r.stctx)
  258. if err == nil {
  259. break
  260. }
  261. if err == r.stctx.Err() {
  262. return
  263. }
  264. r.stats.errors.observe(1)
  265. r.withErrorLogger(func(l Logger) {
  266. l.Printf(err.Error())
  267. })
  268. // Continue with next attempt...
  269. }
  270. if err != nil {
  271. // All attempts have failed.
  272. select {
  273. case r.runError <- err:
  274. // If somebody's receiving on the runError, let
  275. // them know the error occurred.
  276. default:
  277. // Otherwise, don't block to allow healing.
  278. }
  279. continue
  280. }
  281. r.stats.rebalances.observe(1)
  282. r.subscribe(gen.Assignments)
  283. gen.Start(func(ctx context.Context) {
  284. r.commitLoop(ctx, gen)
  285. })
  286. gen.Start(func(ctx context.Context) {
  287. // wait for the generation to end and then unsubscribe.
  288. select {
  289. case <-ctx.Done():
  290. // continue to next generation
  291. case <-r.stctx.Done():
  292. // this will be the last loop because the reader is closed.
  293. }
  294. r.unsubscribe()
  295. })
  296. }
  297. }
  298. // ReaderConfig is a configuration object used to create new instances of
  299. // Reader.
  300. type ReaderConfig struct {
  301. // The list of broker addresses used to connect to the kafka cluster.
  302. Brokers []string
  303. // GroupID holds the optional consumer group id. If GroupID is specified, then
  304. // Partition should NOT be specified e.g. 0
  305. GroupID string
  306. // GroupTopics allows specifying multiple topics, but can only be used in
  307. // combination with GroupID, as it is a consumer-group feature. As such, if
  308. // GroupID is set, then either Topic or GroupTopics must be defined.
  309. GroupTopics []string
  310. // The topic to read messages from.
  311. Topic string
  312. // Partition to read messages from. Either Partition or GroupID may
  313. // be assigned, but not both
  314. Partition int
  315. // An dialer used to open connections to the kafka server. This field is
  316. // optional, if nil, the default dialer is used instead.
  317. Dialer *Dialer
  318. // The capacity of the internal message queue, defaults to 100 if none is
  319. // set.
  320. QueueCapacity int
  321. // MinBytes indicates to the broker the minimum batch size that the consumer
  322. // will accept. Setting a high minimum when consuming from a low-volume topic
  323. // may result in delayed delivery when the broker does not have enough data to
  324. // satisfy the defined minimum.
  325. //
  326. // Default: 1
  327. MinBytes int
  328. // MaxBytes indicates to the broker the maximum batch size that the consumer
  329. // will accept. The broker will truncate a message to satisfy this maximum, so
  330. // choose a value that is high enough for your largest message size.
  331. //
  332. // Default: 1MB
  333. MaxBytes int
  334. // Maximum amount of time to wait for new data to come when fetching batches
  335. // of messages from kafka.
  336. //
  337. // Default: 10s
  338. MaxWait time.Duration
  339. // ReadLagInterval sets the frequency at which the reader lag is updated.
  340. // Setting this field to a negative value disables lag reporting.
  341. ReadLagInterval time.Duration
  342. // GroupBalancers is the priority-ordered list of client-side consumer group
  343. // balancing strategies that will be offered to the coordinator. The first
  344. // strategy that all group members support will be chosen by the leader.
  345. //
  346. // Default: [Range, RoundRobin]
  347. //
  348. // Only used when GroupID is set
  349. GroupBalancers []GroupBalancer
  350. // HeartbeatInterval sets the optional frequency at which the reader sends the consumer
  351. // group heartbeat update.
  352. //
  353. // Default: 3s
  354. //
  355. // Only used when GroupID is set
  356. HeartbeatInterval time.Duration
  357. // CommitInterval indicates the interval at which offsets are committed to
  358. // the broker. If 0, commits will be handled synchronously.
  359. //
  360. // Default: 0
  361. //
  362. // Only used when GroupID is set
  363. CommitInterval time.Duration
  364. // PartitionWatchInterval indicates how often a reader checks for partition changes.
  365. // If a reader sees a partition change (such as a partition add) it will rebalance the group
  366. // picking up new partitions.
  367. //
  368. // Default: 5s
  369. //
  370. // Only used when GroupID is set and WatchPartitionChanges is set.
  371. PartitionWatchInterval time.Duration
  372. // WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
  373. // polling the brokers and rebalancing if any partition changes happen to the topic.
  374. WatchPartitionChanges bool
  375. // SessionTimeout optionally sets the length of time that may pass without a heartbeat
  376. // before the coordinator considers the consumer dead and initiates a rebalance.
  377. //
  378. // Default: 30s
  379. //
  380. // Only used when GroupID is set
  381. SessionTimeout time.Duration
  382. // RebalanceTimeout optionally sets the length of time the coordinator will wait
  383. // for members to join as part of a rebalance. For kafka servers under higher
  384. // load, it may be useful to set this value higher.
  385. //
  386. // Default: 30s
  387. //
  388. // Only used when GroupID is set
  389. RebalanceTimeout time.Duration
  390. // JoinGroupBackoff optionally sets the length of time to wait between re-joining
  391. // the consumer group after an error.
  392. //
  393. // Default: 5s
  394. JoinGroupBackoff time.Duration
  395. // RetentionTime optionally sets the length of time the consumer group will be saved
  396. // by the broker
  397. //
  398. // Default: 24h
  399. //
  400. // Only used when GroupID is set
  401. RetentionTime time.Duration
  402. // StartOffset determines from whence the consumer group should begin
  403. // consuming when it finds a partition without a committed offset. If
  404. // non-zero, it must be set to one of FirstOffset or LastOffset.
  405. //
  406. // Default: FirstOffset
  407. //
  408. // Only used when GroupID is set
  409. StartOffset int64
  410. // BackoffDelayMin optionally sets the smallest amount of time the reader will wait before
  411. // polling for new messages
  412. //
  413. // Default: 100ms
  414. ReadBackoffMin time.Duration
  415. // BackoffDelayMax optionally sets the maximum amount of time the reader will wait before
  416. // polling for new messages
  417. //
  418. // Default: 1s
  419. ReadBackoffMax time.Duration
  420. // If not nil, specifies a logger used to report internal changes within the
  421. // reader.
  422. Logger Logger
  423. // ErrorLogger is the logger used to report errors. If nil, the reader falls
  424. // back to using Logger instead.
  425. ErrorLogger Logger
  426. // IsolationLevel controls the visibility of transactional records.
  427. // ReadUncommitted makes all records visible. With ReadCommitted only
  428. // non-transactional and committed records are visible.
  429. IsolationLevel IsolationLevel
  430. // Limit of how many attempts will be made before delivering the error.
  431. //
  432. // The default is to try 3 times.
  433. MaxAttempts int
  434. }
  435. // Validate method validates ReaderConfig properties.
  436. func (config *ReaderConfig) Validate() error {
  437. if len(config.Brokers) == 0 {
  438. return errors.New("cannot create a new kafka reader with an empty list of broker addresses")
  439. }
  440. if config.Partition < 0 || config.Partition >= math.MaxInt32 {
  441. return errors.New(fmt.Sprintf("partition number out of bounds: %d", config.Partition))
  442. }
  443. if config.MinBytes < 0 {
  444. return errors.New(fmt.Sprintf("invalid negative minimum batch size (min = %d)", config.MinBytes))
  445. }
  446. if config.MaxBytes < 0 {
  447. return errors.New(fmt.Sprintf("invalid negative maximum batch size (max = %d)", config.MaxBytes))
  448. }
  449. if config.GroupID != "" {
  450. if config.Partition != 0 {
  451. return errors.New("either Partition or GroupID may be specified, but not both")
  452. }
  453. if len(config.Topic) == 0 && len(config.GroupTopics) == 0 {
  454. return errors.New("either Topic or GroupTopics must be specified with GroupID")
  455. }
  456. } else if len(config.Topic) == 0 {
  457. return errors.New("cannot create a new kafka reader with an empty topic")
  458. }
  459. if config.MinBytes > config.MaxBytes {
  460. return errors.New(fmt.Sprintf("minimum batch size greater than the maximum (min = %d, max = %d)", config.MinBytes, config.MaxBytes))
  461. }
  462. if config.ReadBackoffMax < 0 {
  463. return errors.New(fmt.Sprintf("ReadBackoffMax out of bounds: %d", config.ReadBackoffMax))
  464. }
  465. if config.ReadBackoffMin < 0 {
  466. return errors.New(fmt.Sprintf("ReadBackoffMin out of bounds: %d", config.ReadBackoffMin))
  467. }
  468. return nil
  469. }
  470. // ReaderStats is a data structure returned by a call to Reader.Stats that exposes
  471. // details about the behavior of the reader.
  472. type ReaderStats struct {
  473. Dials int64 `metric:"kafka.reader.dial.count" type:"counter"`
  474. Fetches int64 `metric:"kafka.reader.fetch.count" type:"counter"`
  475. Messages int64 `metric:"kafka.reader.message.count" type:"counter"`
  476. Bytes int64 `metric:"kafka.reader.message.bytes" type:"counter"`
  477. Rebalances int64 `metric:"kafka.reader.rebalance.count" type:"counter"`
  478. Timeouts int64 `metric:"kafka.reader.timeout.count" type:"counter"`
  479. Errors int64 `metric:"kafka.reader.error.count" type:"counter"`
  480. DialTime DurationStats `metric:"kafka.reader.dial.seconds"`
  481. ReadTime DurationStats `metric:"kafka.reader.read.seconds"`
  482. WaitTime DurationStats `metric:"kafka.reader.wait.seconds"`
  483. FetchSize SummaryStats `metric:"kafka.reader.fetch.size"`
  484. FetchBytes SummaryStats `metric:"kafka.reader.fetch.bytes"`
  485. Offset int64 `metric:"kafka.reader.offset" type:"gauge"`
  486. Lag int64 `metric:"kafka.reader.lag" type:"gauge"`
  487. MinBytes int64 `metric:"kafka.reader.fetch_bytes.min" type:"gauge"`
  488. MaxBytes int64 `metric:"kafka.reader.fetch_bytes.max" type:"gauge"`
  489. MaxWait time.Duration `metric:"kafka.reader.fetch_wait.max" type:"gauge"`
  490. QueueLength int64 `metric:"kafka.reader.queue.length" type:"gauge"`
  491. QueueCapacity int64 `metric:"kafka.reader.queue.capacity" type:"gauge"`
  492. ClientID string `tag:"client_id"`
  493. Topic string `tag:"topic"`
  494. Partition string `tag:"partition"`
  495. // The original `Fetches` field had a typo where the metric name was called
  496. // "kafak..." instead of "kafka...", in order to offer time to fix monitors
  497. // that may be relying on this mistake we are temporarily introducing this
  498. // field.
  499. DeprecatedFetchesWithTypo int64 `metric:"kafak.reader.fetch.count" type:"counter"`
  500. }
  501. // readerStats is a struct that contains statistics on a reader.
  502. type readerStats struct {
  503. dials counter
  504. fetches counter
  505. messages counter
  506. bytes counter
  507. rebalances counter
  508. timeouts counter
  509. errors counter
  510. dialTime summary
  511. readTime summary
  512. waitTime summary
  513. fetchSize summary
  514. fetchBytes summary
  515. offset gauge
  516. lag gauge
  517. partition string
  518. }
  519. // NewReader creates and returns a new Reader configured with config.
  520. // The offset is initialized to FirstOffset.
  521. func NewReader(config ReaderConfig) *Reader {
  522. if err := config.Validate(); err != nil {
  523. panic(err)
  524. }
  525. if config.GroupID != "" {
  526. if len(config.GroupBalancers) == 0 {
  527. config.GroupBalancers = []GroupBalancer{
  528. RangeGroupBalancer{},
  529. RoundRobinGroupBalancer{},
  530. }
  531. }
  532. }
  533. if config.Dialer == nil {
  534. config.Dialer = DefaultDialer
  535. }
  536. if config.MaxBytes == 0 {
  537. config.MaxBytes = 1e6 // 1 MB
  538. }
  539. if config.MinBytes == 0 {
  540. config.MinBytes = defaultFetchMinBytes
  541. }
  542. if config.MaxWait == 0 {
  543. config.MaxWait = 10 * time.Second
  544. }
  545. if config.ReadLagInterval == 0 {
  546. config.ReadLagInterval = 1 * time.Minute
  547. }
  548. if config.ReadBackoffMin == 0 {
  549. config.ReadBackoffMin = defaultReadBackoffMin
  550. }
  551. if config.ReadBackoffMax == 0 {
  552. config.ReadBackoffMax = defaultReadBackoffMax
  553. }
  554. if config.ReadBackoffMax < config.ReadBackoffMin {
  555. panic(fmt.Errorf("ReadBackoffMax %d smaller than ReadBackoffMin %d", config.ReadBackoffMax, config.ReadBackoffMin))
  556. }
  557. if config.QueueCapacity == 0 {
  558. config.QueueCapacity = 100
  559. }
  560. if config.MaxAttempts == 0 {
  561. config.MaxAttempts = 3
  562. }
  563. // when configured as a consumer group; stats should report a partition of -1
  564. readerStatsPartition := config.Partition
  565. if config.GroupID != "" {
  566. readerStatsPartition = -1
  567. }
  568. // when configured as a consume group, start version as 1 to ensure that only
  569. // the rebalance function will start readers
  570. version := int64(0)
  571. if config.GroupID != "" {
  572. version = 1
  573. }
  574. stctx, stop := context.WithCancel(context.Background())
  575. r := &Reader{
  576. config: config,
  577. msgs: make(chan readerMessage, config.QueueCapacity),
  578. cancel: func() {},
  579. commits: make(chan commitRequest, config.QueueCapacity),
  580. stop: stop,
  581. offset: FirstOffset,
  582. stctx: stctx,
  583. stats: &readerStats{
  584. dialTime: makeSummary(),
  585. readTime: makeSummary(),
  586. waitTime: makeSummary(),
  587. fetchSize: makeSummary(),
  588. fetchBytes: makeSummary(),
  589. // Generate the string representation of the partition number only
  590. // once when the reader is created.
  591. partition: strconv.Itoa(readerStatsPartition),
  592. },
  593. version: version,
  594. }
  595. if r.useConsumerGroup() {
  596. r.done = make(chan struct{})
  597. r.runError = make(chan error)
  598. cg, err := NewConsumerGroup(ConsumerGroupConfig{
  599. ID: r.config.GroupID,
  600. Brokers: r.config.Brokers,
  601. Dialer: r.config.Dialer,
  602. Topics: r.getTopics(),
  603. GroupBalancers: r.config.GroupBalancers,
  604. HeartbeatInterval: r.config.HeartbeatInterval,
  605. PartitionWatchInterval: r.config.PartitionWatchInterval,
  606. WatchPartitionChanges: r.config.WatchPartitionChanges,
  607. SessionTimeout: r.config.SessionTimeout,
  608. RebalanceTimeout: r.config.RebalanceTimeout,
  609. JoinGroupBackoff: r.config.JoinGroupBackoff,
  610. RetentionTime: r.config.RetentionTime,
  611. StartOffset: r.config.StartOffset,
  612. Logger: r.config.Logger,
  613. ErrorLogger: r.config.ErrorLogger,
  614. })
  615. if err != nil {
  616. panic(err)
  617. }
  618. go r.run(cg)
  619. }
  620. return r
  621. }
  622. // Config returns the reader's configuration.
  623. func (r *Reader) Config() ReaderConfig {
  624. return r.config
  625. }
  626. // Close closes the stream, preventing the program from reading any more
  627. // messages from it.
  628. func (r *Reader) Close() error {
  629. atomic.StoreUint32(&r.once, 1)
  630. r.mutex.Lock()
  631. closed := r.closed
  632. r.closed = true
  633. r.mutex.Unlock()
  634. r.cancel()
  635. r.stop()
  636. r.join.Wait()
  637. if r.done != nil {
  638. <-r.done
  639. }
  640. if !closed {
  641. close(r.msgs)
  642. }
  643. return nil
  644. }
  645. // ReadMessage reads and return the next message from the r. The method call
  646. // blocks until a message becomes available, or an error occurs. The program
  647. // may also specify a context to asynchronously cancel the blocking operation.
  648. //
  649. // The method returns io.EOF to indicate that the reader has been closed.
  650. //
  651. // If consumer groups are used, ReadMessage will automatically commit the
  652. // offset when called. Note that this could result in an offset being committed
  653. // before the message is fully processed.
  654. //
  655. // If more fine grained control of when offsets are committed is required, it
  656. // is recommended to use FetchMessage with CommitMessages instead.
  657. func (r *Reader) ReadMessage(ctx context.Context) (Message, error) {
  658. m, err := r.FetchMessage(ctx)
  659. if err != nil {
  660. return Message{}, err
  661. }
  662. if r.useConsumerGroup() {
  663. if err := r.CommitMessages(ctx, m); err != nil {
  664. return Message{}, err
  665. }
  666. }
  667. return m, nil
  668. }
  669. // FetchMessage reads and return the next message from the r. The method call
  670. // blocks until a message becomes available, or an error occurs. The program
  671. // may also specify a context to asynchronously cancel the blocking operation.
  672. //
  673. // The method returns io.EOF to indicate that the reader has been closed.
  674. //
  675. // FetchMessage does not commit offsets automatically when using consumer groups.
  676. // Use CommitMessages to commit the offset.
  677. func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
  678. r.activateReadLag()
  679. for {
  680. r.mutex.Lock()
  681. if !r.closed && r.version == 0 {
  682. r.start(r.getTopicPartitionOffset())
  683. }
  684. version := r.version
  685. r.mutex.Unlock()
  686. select {
  687. case <-ctx.Done():
  688. return Message{}, ctx.Err()
  689. case err := <-r.runError:
  690. return Message{}, err
  691. case m, ok := <-r.msgs:
  692. if !ok {
  693. return Message{}, io.EOF
  694. }
  695. if m.version >= version {
  696. r.mutex.Lock()
  697. switch {
  698. case m.error != nil:
  699. case version == r.version:
  700. r.offset = m.message.Offset + 1
  701. r.lag = m.watermark - r.offset
  702. }
  703. r.mutex.Unlock()
  704. switch m.error {
  705. case nil:
  706. case io.EOF:
  707. // io.EOF is used as a marker to indicate that the stream
  708. // has been closed, in case it was received from the inner
  709. // reader we don't want to confuse the program and replace
  710. // the error with io.ErrUnexpectedEOF.
  711. m.error = io.ErrUnexpectedEOF
  712. }
  713. return m.message, m.error
  714. }
  715. }
  716. }
  717. }
  718. // CommitMessages commits the list of messages passed as argument. The program
  719. // may pass a context to asynchronously cancel the commit operation when it was
  720. // configured to be blocking.
  721. func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
  722. if !r.useConsumerGroup() {
  723. return errOnlyAvailableWithGroup
  724. }
  725. var errch <-chan error
  726. var creq = commitRequest{
  727. commits: makeCommits(msgs...),
  728. }
  729. if r.useSyncCommits() {
  730. ch := make(chan error, 1)
  731. errch, creq.errch = ch, ch
  732. }
  733. select {
  734. case r.commits <- creq:
  735. case <-ctx.Done():
  736. return ctx.Err()
  737. case <-r.stctx.Done():
  738. // This context is used to ensure we don't allow commits after the
  739. // reader was closed.
  740. return io.ErrClosedPipe
  741. }
  742. if !r.useSyncCommits() {
  743. return nil
  744. }
  745. select {
  746. case <-ctx.Done():
  747. return ctx.Err()
  748. case err := <-errch:
  749. return err
  750. }
  751. }
  752. // ReadLag returns the current lag of the reader by fetching the last offset of
  753. // the topic and partition and computing the difference between that value and
  754. // the offset of the last message returned by ReadMessage.
  755. //
  756. // This method is intended to be used in cases where a program may be unable to
  757. // call ReadMessage to update the value returned by Lag, but still needs to get
  758. // an up to date estimation of how far behind the reader is. For example when
  759. // the consumer is not ready to process the next message.
  760. //
  761. // The function returns a lag of zero when the reader's current offset is
  762. // negative.
  763. func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) {
  764. if r.useConsumerGroup() {
  765. return 0, errNotAvailableWithGroup
  766. }
  767. type offsets struct {
  768. first int64
  769. last int64
  770. }
  771. offch := make(chan offsets, 1)
  772. errch := make(chan error, 1)
  773. go func() {
  774. var off offsets
  775. var err error
  776. for _, broker := range r.config.Brokers {
  777. var conn *Conn
  778. if conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition); err != nil {
  779. continue
  780. }
  781. deadline, _ := ctx.Deadline()
  782. conn.SetDeadline(deadline)
  783. off.first, off.last, err = conn.ReadOffsets()
  784. conn.Close()
  785. if err == nil {
  786. break
  787. }
  788. }
  789. if err != nil {
  790. errch <- err
  791. } else {
  792. offch <- off
  793. }
  794. }()
  795. select {
  796. case off := <-offch:
  797. switch cur := r.Offset(); {
  798. case cur == FirstOffset:
  799. lag = off.last - off.first
  800. case cur == LastOffset:
  801. lag = 0
  802. default:
  803. lag = off.last - cur
  804. }
  805. case err = <-errch:
  806. case <-ctx.Done():
  807. err = ctx.Err()
  808. }
  809. return
  810. }
  811. // Offset returns the current absolute offset of the reader, or -1
  812. // if r is backed by a consumer group.
  813. func (r *Reader) Offset() int64 {
  814. if r.useConsumerGroup() {
  815. return -1
  816. }
  817. r.mutex.Lock()
  818. offset := r.offset
  819. r.mutex.Unlock()
  820. r.withLogger(func(log Logger) {
  821. log.Printf("looking up offset of kafka reader for partition %d of %s: %d", r.config.Partition, r.config.Topic, offset)
  822. })
  823. return offset
  824. }
  825. // Lag returns the lag of the last message returned by ReadMessage, or -1
  826. // if r is backed by a consumer group.
  827. func (r *Reader) Lag() int64 {
  828. if r.useConsumerGroup() {
  829. return -1
  830. }
  831. r.mutex.Lock()
  832. lag := r.lag
  833. r.mutex.Unlock()
  834. return lag
  835. }
  836. // SetOffset changes the offset from which the next batch of messages will be
  837. // read. The method fails with io.ErrClosedPipe if the reader has already been closed.
  838. //
  839. // From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first
  840. // or last available offset in the partition. Please note while -1 and -2 were accepted
  841. // to indicate the first or last offset in previous versions, the meanings of the numbers
  842. // were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol
  843. // specification.
  844. func (r *Reader) SetOffset(offset int64) error {
  845. if r.useConsumerGroup() {
  846. return errNotAvailableWithGroup
  847. }
  848. var err error
  849. r.mutex.Lock()
  850. if r.closed {
  851. err = io.ErrClosedPipe
  852. } else if offset != r.offset {
  853. r.withLogger(func(log Logger) {
  854. log.Printf("setting the offset of the kafka reader for partition %d of %s from %d to %d",
  855. r.config.Partition, r.config.Topic, r.offset, offset)
  856. })
  857. r.offset = offset
  858. if r.version != 0 {
  859. r.start(r.getTopicPartitionOffset())
  860. }
  861. r.activateReadLag()
  862. }
  863. r.mutex.Unlock()
  864. return err
  865. }
  866. // SetOffsetAt changes the offset from which the next batch of messages will be
  867. // read given the timestamp t.
  868. //
  869. // The method fails if the unable to connect partition leader, or unable to read the offset
  870. // given the ts, or if the reader has been closed.
  871. func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error {
  872. r.mutex.Lock()
  873. if r.closed {
  874. r.mutex.Unlock()
  875. return io.ErrClosedPipe
  876. }
  877. r.mutex.Unlock()
  878. for _, broker := range r.config.Brokers {
  879. conn, err := r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition)
  880. if err != nil {
  881. continue
  882. }
  883. deadline, _ := ctx.Deadline()
  884. conn.SetDeadline(deadline)
  885. offset, err := conn.ReadOffset(t)
  886. conn.Close()
  887. if err != nil {
  888. return err
  889. }
  890. return r.SetOffset(offset)
  891. }
  892. return fmt.Errorf("error setting offset for timestamp %+v", t)
  893. }
  894. // Stats returns a snapshot of the reader stats since the last time the method
  895. // was called, or since the reader was created if it is called for the first
  896. // time.
  897. //
  898. // A typical use of this method is to spawn a goroutine that will periodically
  899. // call Stats on a kafka reader and report the metrics to a stats collection
  900. // system.
  901. func (r *Reader) Stats() ReaderStats {
  902. stats := ReaderStats{
  903. Dials: r.stats.dials.snapshot(),
  904. Fetches: r.stats.fetches.snapshot(),
  905. Messages: r.stats.messages.snapshot(),
  906. Bytes: r.stats.bytes.snapshot(),
  907. Rebalances: r.stats.rebalances.snapshot(),
  908. Timeouts: r.stats.timeouts.snapshot(),
  909. Errors: r.stats.errors.snapshot(),
  910. DialTime: r.stats.dialTime.snapshotDuration(),
  911. ReadTime: r.stats.readTime.snapshotDuration(),
  912. WaitTime: r.stats.waitTime.snapshotDuration(),
  913. FetchSize: r.stats.fetchSize.snapshot(),
  914. FetchBytes: r.stats.fetchBytes.snapshot(),
  915. Offset: r.stats.offset.snapshot(),
  916. Lag: r.stats.lag.snapshot(),
  917. MinBytes: int64(r.config.MinBytes),
  918. MaxBytes: int64(r.config.MaxBytes),
  919. MaxWait: r.config.MaxWait,
  920. QueueLength: int64(len(r.msgs)),
  921. QueueCapacity: int64(cap(r.msgs)),
  922. ClientID: r.config.Dialer.ClientID,
  923. Topic: r.config.Topic,
  924. Partition: r.stats.partition,
  925. }
  926. // TODO: remove when we get rid of the deprecated field.
  927. stats.DeprecatedFetchesWithTypo = stats.Fetches
  928. return stats
  929. }
  930. func (r *Reader) getTopicPartitionOffset() map[topicPartition]int64 {
  931. key := topicPartition{topic: r.config.Topic, partition: int32(r.config.Partition)}
  932. return map[topicPartition]int64{key: r.offset}
  933. }
  934. func (r *Reader) withLogger(do func(Logger)) {
  935. if r.config.Logger != nil {
  936. do(r.config.Logger)
  937. }
  938. }
  939. func (r *Reader) withErrorLogger(do func(Logger)) {
  940. if r.config.ErrorLogger != nil {
  941. do(r.config.ErrorLogger)
  942. } else {
  943. r.withLogger(do)
  944. }
  945. }
  946. func (r *Reader) activateReadLag() {
  947. if r.config.ReadLagInterval > 0 && atomic.CompareAndSwapUint32(&r.once, 0, 1) {
  948. // read lag will only be calculated when not using consumer groups
  949. // todo discuss how capturing read lag should interact with rebalancing
  950. if !r.useConsumerGroup() {
  951. go r.readLag(r.stctx)
  952. }
  953. }
  954. }
  955. func (r *Reader) readLag(ctx context.Context) {
  956. ticker := time.NewTicker(r.config.ReadLagInterval)
  957. defer ticker.Stop()
  958. for {
  959. timeout, cancel := context.WithTimeout(ctx, r.config.ReadLagInterval/2)
  960. lag, err := r.ReadLag(timeout)
  961. cancel()
  962. if err != nil {
  963. r.stats.errors.observe(1)
  964. r.withErrorLogger(func(log Logger) {
  965. log.Printf("kafka reader failed to read lag of partition %d of %s: %s", r.config.Partition, r.config.Topic, err)
  966. })
  967. } else {
  968. r.stats.lag.observe(lag)
  969. }
  970. select {
  971. case <-ticker.C:
  972. case <-ctx.Done():
  973. return
  974. }
  975. }
  976. }
  977. func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
  978. if r.closed {
  979. // don't start child reader if parent Reader is closed
  980. return
  981. }
  982. ctx, cancel := context.WithCancel(context.Background())
  983. r.cancel() // always cancel the previous reader
  984. r.cancel = cancel
  985. r.version++
  986. r.join.Add(len(offsetsByPartition))
  987. for key, offset := range offsetsByPartition {
  988. go func(ctx context.Context, key topicPartition, offset int64, join *sync.WaitGroup) {
  989. defer join.Done()
  990. (&reader{
  991. dialer: r.config.Dialer,
  992. logger: r.config.Logger,
  993. errorLogger: r.config.ErrorLogger,
  994. brokers: r.config.Brokers,
  995. topic: key.topic,
  996. partition: int(key.partition),
  997. minBytes: r.config.MinBytes,
  998. maxBytes: r.config.MaxBytes,
  999. maxWait: r.config.MaxWait,
  1000. backoffDelayMin: r.config.ReadBackoffMin,
  1001. backoffDelayMax: r.config.ReadBackoffMax,
  1002. version: r.version,
  1003. msgs: r.msgs,
  1004. stats: r.stats,
  1005. isolationLevel: r.config.IsolationLevel,
  1006. maxAttempts: r.config.MaxAttempts,
  1007. }).run(ctx, offset)
  1008. }(ctx, key, offset, &r.join)
  1009. }
  1010. }
  1011. // A reader reads messages from kafka and produces them on its channels, it's
  1012. // used as an way to asynchronously fetch messages while the main program reads
  1013. // them using the high level reader API.
  1014. type reader struct {
  1015. dialer *Dialer
  1016. logger Logger
  1017. errorLogger Logger
  1018. brokers []string
  1019. topic string
  1020. partition int
  1021. minBytes int
  1022. maxBytes int
  1023. maxWait time.Duration
  1024. backoffDelayMin time.Duration
  1025. backoffDelayMax time.Duration
  1026. version int64
  1027. msgs chan<- readerMessage
  1028. stats *readerStats
  1029. isolationLevel IsolationLevel
  1030. maxAttempts int
  1031. }
  1032. type readerMessage struct {
  1033. version int64
  1034. message Message
  1035. watermark int64
  1036. error error
  1037. }
  1038. func (r *reader) run(ctx context.Context, offset int64) {
  1039. // This is the reader's main loop, it only ends if the context is canceled
  1040. // and will keep attempting to reader messages otherwise.
  1041. //
  1042. // Retrying indefinitely has the nice side effect of preventing Read calls
  1043. // on the parent reader to block if connection to the kafka server fails,
  1044. // the reader keeps reporting errors on the error channel which will then
  1045. // be surfaced to the program.
  1046. // If the reader wasn't retrying then the program would block indefinitely
  1047. // on a Read call after reading the first error.
  1048. for attempt := 0; true; attempt++ {
  1049. if attempt != 0 {
  1050. if !sleep(ctx, backoff(attempt, r.backoffDelayMin, r.backoffDelayMax)) {
  1051. return
  1052. }
  1053. }
  1054. r.withLogger(func(log Logger) {
  1055. log.Printf("initializing kafka reader for partition %d of %s starting at offset %d", r.partition, r.topic, offset)
  1056. })
  1057. conn, start, err := r.initialize(ctx, offset)
  1058. switch err {
  1059. case nil:
  1060. case OffsetOutOfRange:
  1061. // This would happen if the requested offset is passed the last
  1062. // offset on the partition leader. In that case we're just going
  1063. // to retry later hoping that enough data has been produced.
  1064. r.withErrorLogger(func(log Logger) {
  1065. log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, OffsetOutOfRange)
  1066. })
  1067. continue
  1068. default:
  1069. // Perform a configured number of attempts before
  1070. // reporting first errors, this helps mitigate
  1071. // situations where the kafka server is temporarily
  1072. // unavailable.
  1073. if attempt >= r.maxAttempts {
  1074. r.sendError(ctx, err)
  1075. } else {
  1076. r.stats.errors.observe(1)
  1077. r.withErrorLogger(func(log Logger) {
  1078. log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err)
  1079. })
  1080. }
  1081. continue
  1082. }
  1083. // Resetting the attempt counter ensures that if a failure occurs after
  1084. // a successful initialization we don't keep increasing the backoff
  1085. // timeout.
  1086. attempt = 0
  1087. // Now we're sure to have an absolute offset number, may anything happen
  1088. // to the connection we know we'll want to restart from this offset.
  1089. offset = start
  1090. errcount := 0
  1091. readLoop:
  1092. for {
  1093. if !sleep(ctx, backoff(errcount, r.backoffDelayMin, r.backoffDelayMax)) {
  1094. conn.Close()
  1095. return
  1096. }
  1097. switch offset, err = r.read(ctx, offset, conn); err {
  1098. case nil:
  1099. errcount = 0
  1100. case io.EOF:
  1101. // done with this batch of messages...carry on. note that this
  1102. // block relies on the batch repackaging real io.EOF errors as
  1103. // io.UnexpectedEOF. otherwise, we would end up swallowing real
  1104. // errors here.
  1105. errcount = 0
  1106. case UnknownTopicOrPartition:
  1107. r.withErrorLogger(func(log Logger) {
  1108. log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, offset, r.brokers)
  1109. })
  1110. conn.Close()
  1111. // The next call to .initialize will re-establish a connection to the proper
  1112. // topic/partition broker combo.
  1113. r.stats.rebalances.observe(1)
  1114. break readLoop
  1115. case NotLeaderForPartition:
  1116. r.withErrorLogger(func(log Logger) {
  1117. log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, offset)
  1118. })
  1119. conn.Close()
  1120. // The next call to .initialize will re-establish a connection to the proper
  1121. // partition leader.
  1122. r.stats.rebalances.observe(1)
  1123. break readLoop
  1124. case RequestTimedOut:
  1125. // Timeout on the kafka side, this can be safely retried.
  1126. errcount = 0
  1127. r.withLogger(func(log Logger) {
  1128. log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, offset)
  1129. })
  1130. r.stats.timeouts.observe(1)
  1131. continue
  1132. case OffsetOutOfRange:
  1133. first, last, err := r.readOffsets(conn)
  1134. if err != nil {
  1135. r.withErrorLogger(func(log Logger) {
  1136. log.Printf("the kafka reader got an error while attempting to determine whether it was reading before the first offset or after the last offset of partition %d of %s: %s", r.partition, r.topic, err)
  1137. })
  1138. conn.Close()
  1139. break readLoop
  1140. }
  1141. switch {
  1142. case offset < first:
  1143. r.withErrorLogger(func(log Logger) {
  1144. log.Printf("the kafka reader is reading before the first offset for partition %d of %s, skipping from offset %d to %d (%d messages)", r.partition, r.topic, offset, first, first-offset)
  1145. })
  1146. offset, errcount = first, 0
  1147. continue // retry immediately so we don't keep falling behind due to the backoff
  1148. case offset < last:
  1149. errcount = 0
  1150. continue // more messages have already become available, retry immediately
  1151. default:
  1152. // We may be reading past the last offset, will retry later.
  1153. r.withErrorLogger(func(log Logger) {
  1154. log.Printf("the kafka reader is reading passed the last offset for partition %d of %s at offset %d", r.partition, r.topic, offset)
  1155. })
  1156. }
  1157. case context.Canceled:
  1158. // Another reader has taken over, we can safely quit.
  1159. conn.Close()
  1160. return
  1161. case errUnknownCodec:
  1162. // The compression codec is either unsupported or has not been
  1163. // imported. This is a fatal error b/c the reader cannot
  1164. // proceed.
  1165. r.sendError(ctx, err)
  1166. break readLoop
  1167. default:
  1168. if _, ok := err.(Error); ok {
  1169. r.sendError(ctx, err)
  1170. } else {
  1171. r.withErrorLogger(func(log Logger) {
  1172. log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d: %s", r.partition, r.topic, offset, err)
  1173. })
  1174. r.stats.errors.observe(1)
  1175. conn.Close()
  1176. break readLoop
  1177. }
  1178. }
  1179. errcount++
  1180. }
  1181. }
  1182. }
  1183. func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, start int64, err error) {
  1184. for i := 0; i != len(r.brokers) && conn == nil; i++ {
  1185. var broker = r.brokers[i]
  1186. var first, last int64
  1187. t0 := time.Now()
  1188. conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition)
  1189. t1 := time.Now()
  1190. r.stats.dials.observe(1)
  1191. r.stats.dialTime.observeDuration(t1.Sub(t0))
  1192. if err != nil {
  1193. continue
  1194. }
  1195. if first, last, err = r.readOffsets(conn); err != nil {
  1196. conn.Close()
  1197. conn = nil
  1198. break
  1199. }
  1200. switch {
  1201. case offset == FirstOffset:
  1202. offset = first
  1203. case offset == LastOffset:
  1204. offset = last
  1205. case offset < first:
  1206. offset = first
  1207. }
  1208. r.withLogger(func(log Logger) {
  1209. log.Printf("the kafka reader for partition %d of %s is seeking to offset %d", r.partition, r.topic, offset)
  1210. })
  1211. if start, err = conn.Seek(offset, SeekAbsolute); err != nil {
  1212. conn.Close()
  1213. conn = nil
  1214. break
  1215. }
  1216. conn.SetDeadline(time.Time{})
  1217. }
  1218. return
  1219. }
  1220. func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
  1221. r.stats.fetches.observe(1)
  1222. r.stats.offset.observe(offset)
  1223. t0 := time.Now()
  1224. conn.SetReadDeadline(t0.Add(r.maxWait))
  1225. batch := conn.ReadBatchWith(ReadBatchConfig{
  1226. MinBytes: r.minBytes,
  1227. MaxBytes: r.maxBytes,
  1228. IsolationLevel: r.isolationLevel,
  1229. })
  1230. highWaterMark := batch.HighWaterMark()
  1231. t1 := time.Now()
  1232. r.stats.waitTime.observeDuration(t1.Sub(t0))
  1233. var msg Message
  1234. var err error
  1235. var size int64
  1236. var bytes int64
  1237. const safetyTimeout = 10 * time.Second
  1238. deadline := time.Now().Add(safetyTimeout)
  1239. conn.SetReadDeadline(deadline)
  1240. for {
  1241. if now := time.Now(); deadline.Sub(now) < (safetyTimeout / 2) {
  1242. deadline = now.Add(safetyTimeout)
  1243. conn.SetReadDeadline(deadline)
  1244. }
  1245. if msg, err = batch.ReadMessage(); err != nil {
  1246. batch.Close()
  1247. break
  1248. }
  1249. n := int64(len(msg.Key) + len(msg.Value))
  1250. r.stats.messages.observe(1)
  1251. r.stats.bytes.observe(n)
  1252. if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
  1253. batch.Close()
  1254. break
  1255. }
  1256. offset = msg.Offset + 1
  1257. r.stats.offset.observe(offset)
  1258. r.stats.lag.observe(highWaterMark - offset)
  1259. size++
  1260. bytes += n
  1261. }
  1262. conn.SetReadDeadline(time.Time{})
  1263. t2 := time.Now()
  1264. r.stats.readTime.observeDuration(t2.Sub(t1))
  1265. r.stats.fetchSize.observe(size)
  1266. r.stats.fetchBytes.observe(bytes)
  1267. return offset, err
  1268. }
  1269. func (r *reader) readOffsets(conn *Conn) (first, last int64, err error) {
  1270. conn.SetDeadline(time.Now().Add(10 * time.Second))
  1271. return conn.ReadOffsets()
  1272. }
  1273. func (r *reader) sendMessage(ctx context.Context, msg Message, watermark int64) error {
  1274. select {
  1275. case r.msgs <- readerMessage{version: r.version, message: msg, watermark: watermark}:
  1276. return nil
  1277. case <-ctx.Done():
  1278. return ctx.Err()
  1279. }
  1280. }
  1281. func (r *reader) sendError(ctx context.Context, err error) error {
  1282. select {
  1283. case r.msgs <- readerMessage{version: r.version, error: err}:
  1284. return nil
  1285. case <-ctx.Done():
  1286. return ctx.Err()
  1287. }
  1288. }
  1289. func (r *reader) withLogger(do func(Logger)) {
  1290. if r.logger != nil {
  1291. do(r.logger)
  1292. }
  1293. }
  1294. func (r *reader) withErrorLogger(do func(Logger)) {
  1295. if r.errorLogger != nil {
  1296. do(r.errorLogger)
  1297. } else {
  1298. r.withLogger(do)
  1299. }
  1300. }
  1301. // extractTopics returns the unique list of topics represented by the set of
  1302. // provided members
  1303. func extractTopics(members []GroupMember) []string {
  1304. var visited = map[string]struct{}{}
  1305. var topics []string
  1306. for _, member := range members {
  1307. for _, topic := range member.Topics {
  1308. if _, seen := visited[topic]; seen {
  1309. continue
  1310. }
  1311. topics = append(topics, topic)
  1312. visited[topic] = struct{}{}
  1313. }
  1314. }
  1315. sort.Strings(topics)
  1316. return topics
  1317. }