message_reader.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. package kafka
  2. import (
  3. "bufio"
  4. "bytes"
  5. "fmt"
  6. "io"
  7. "log"
  8. )
  9. type readBytesFunc func(*bufio.Reader, int, int) (int, error)
  10. // messageSetReader processes the messages encoded into a fetch response.
  11. // The response may contain a mix of Record Batches (newer format) and Messages
  12. // (older format).
  13. type messageSetReader struct {
  14. *readerStack // used for decompressing compressed messages and record batches
  15. empty bool // if true, short circuits messageSetReader methods
  16. debug bool // enable debug log messages
  17. // How many bytes are expected to remain in the response.
  18. //
  19. // This is used to detect truncation of the response.
  20. lengthRemain int
  21. decompressed *bytes.Buffer
  22. }
  23. type readerStack struct {
  24. reader *bufio.Reader
  25. remain int
  26. base int64
  27. parent *readerStack
  28. count int // how many messages left in the current message set
  29. header messagesHeader // the current header for a subset of messages within the set.
  30. }
  31. // messagesHeader describes a set of records. there may be many messagesHeader's in a message set.
  32. type messagesHeader struct {
  33. firstOffset int64
  34. length int32
  35. crc int32
  36. magic int8
  37. // v1 composes attributes specific to v0 and v1 message headers
  38. v1 struct {
  39. attributes int8
  40. timestamp int64
  41. }
  42. // v2 composes attributes specific to v2 message headers
  43. v2 struct {
  44. leaderEpoch int32
  45. attributes int16
  46. lastOffsetDelta int32
  47. firstTimestamp int64
  48. lastTimestamp int64
  49. producerID int64
  50. producerEpoch int16
  51. baseSequence int32
  52. count int32
  53. }
  54. }
  55. func (h messagesHeader) compression() (codec CompressionCodec, err error) {
  56. const compressionCodecMask = 0x07
  57. var code int8
  58. switch h.magic {
  59. case 0, 1:
  60. code = h.v1.attributes & compressionCodecMask
  61. case 2:
  62. code = int8(h.v2.attributes & compressionCodecMask)
  63. default:
  64. err = h.badMagic()
  65. return
  66. }
  67. if code != 0 {
  68. codec, err = resolveCodec(code)
  69. }
  70. return
  71. }
  72. func (h messagesHeader) badMagic() error {
  73. return fmt.Errorf("unsupported magic byte %d in header", h.magic)
  74. }
  75. func newMessageSetReader(reader *bufio.Reader, remain int) (*messageSetReader, error) {
  76. res := &messageSetReader{
  77. readerStack: &readerStack{
  78. reader: reader,
  79. remain: remain,
  80. },
  81. decompressed: acquireBuffer(),
  82. }
  83. err := res.readHeader()
  84. return res, err
  85. }
  86. func (r *messageSetReader) remaining() (remain int) {
  87. if r.empty {
  88. return 0
  89. }
  90. for s := r.readerStack; s != nil; s = s.parent {
  91. remain += s.remain
  92. }
  93. return
  94. }
  95. func (r *messageSetReader) discard() (err error) {
  96. switch {
  97. case r.empty:
  98. case r.readerStack == nil:
  99. default:
  100. // rewind up to the top-most reader b/c it's the only one that's doing
  101. // actual i/o. the rest are byte buffers that have been pushed on the stack
  102. // while reading compressed message sets.
  103. for r.parent != nil {
  104. r.readerStack = r.parent
  105. }
  106. err = r.discardN(r.remain)
  107. }
  108. return
  109. }
  110. func (r *messageSetReader) readMessage(min int64, key readBytesFunc, val readBytesFunc) (
  111. offset int64, lastOffset int64, timestamp int64, headers []Header, err error) {
  112. if r.empty {
  113. err = RequestTimedOut
  114. return
  115. }
  116. if err = r.readHeader(); err != nil {
  117. return
  118. }
  119. switch r.header.magic {
  120. case 0, 1:
  121. offset, timestamp, headers, err = r.readMessageV1(min, key, val)
  122. // Set an invalid value so that it can be ignored
  123. lastOffset = -1
  124. case 2:
  125. offset, lastOffset, timestamp, headers, err = r.readMessageV2(min, key, val)
  126. default:
  127. err = r.header.badMagic()
  128. }
  129. return
  130. }
  131. func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readBytesFunc) (
  132. offset int64, timestamp int64, headers []Header, err error) {
  133. for r.readerStack != nil {
  134. if r.remain == 0 {
  135. r.readerStack = r.parent
  136. continue
  137. }
  138. if err = r.readHeader(); err != nil {
  139. return
  140. }
  141. offset = r.header.firstOffset
  142. timestamp = r.header.v1.timestamp
  143. var codec CompressionCodec
  144. if codec, err = r.header.compression(); err != nil {
  145. return
  146. }
  147. if r.debug {
  148. r.log("Reading with codec=%T", codec)
  149. }
  150. if codec != nil {
  151. // discard next four bytes...will be -1 to indicate null key
  152. if err = r.discardN(4); err != nil {
  153. return
  154. }
  155. // read and decompress the contained message set.
  156. r.decompressed.Reset()
  157. if err = r.readBytesWith(func(br *bufio.Reader, sz int, n int) (remain int, err error) {
  158. // x4 as a guess that the average compression ratio is near 75%
  159. r.decompressed.Grow(4 * n)
  160. limitReader := io.LimitedReader{R: br, N: int64(n)}
  161. codecReader := codec.NewReader(&limitReader)
  162. _, err = r.decompressed.ReadFrom(codecReader)
  163. remain = sz - (n - int(limitReader.N))
  164. codecReader.Close()
  165. return
  166. }); err != nil {
  167. return
  168. }
  169. // the compressed message's offset will be equal to the offset of
  170. // the last message in the set. within the compressed set, the
  171. // offsets will be relative, so we have to scan through them to
  172. // get the base offset. for example, if there are four compressed
  173. // messages at offsets 10-13, then the container message will have
  174. // offset 13 and the contained messages will be 0,1,2,3. the base
  175. // offset for the container, then is 13-3=10.
  176. if offset, err = extractOffset(offset, r.decompressed.Bytes()); err != nil {
  177. return
  178. }
  179. // mark the outer message as being read
  180. r.markRead()
  181. // then push the decompressed bytes onto the stack.
  182. r.readerStack = &readerStack{
  183. // Allocate a buffer of size 0, which gets capped at 16 bytes
  184. // by the bufio package. We are already reading buffered data
  185. // here, no need to reserve another 4KB buffer.
  186. reader: bufio.NewReaderSize(r.decompressed, 0),
  187. remain: r.decompressed.Len(),
  188. base: offset,
  189. parent: r.readerStack,
  190. }
  191. continue
  192. }
  193. // adjust the offset in case we're reading compressed messages. the
  194. // base will be zero otherwise.
  195. offset += r.base
  196. // When the messages are compressed kafka may return messages at an
  197. // earlier offset than the one that was requested, it's the client's
  198. // responsibility to ignore those.
  199. //
  200. // At this point, the message header has been read, so discarding
  201. // the rest of the message means we have to discard the key, and then
  202. // the value. Each of those are preceded by a 4-byte length. Discarding
  203. // them is then reading that length variable and then discarding that
  204. // amount.
  205. if offset < min {
  206. // discard the key
  207. if err = r.discardBytes(); err != nil {
  208. return
  209. }
  210. // discard the value
  211. if err = r.discardBytes(); err != nil {
  212. return
  213. }
  214. // since we have fully consumed the message, mark as read
  215. r.markRead()
  216. continue
  217. }
  218. if err = r.readBytesWith(key); err != nil {
  219. return
  220. }
  221. if err = r.readBytesWith(val); err != nil {
  222. return
  223. }
  224. r.markRead()
  225. return
  226. }
  227. err = errShortRead
  228. return
  229. }
  230. func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readBytesFunc) (
  231. offset int64, lastOffset int64, timestamp int64, headers []Header, err error) {
  232. if err = r.readHeader(); err != nil {
  233. return
  234. }
  235. if r.count == int(r.header.v2.count) { // first time reading this set, so check for compression headers.
  236. var codec CompressionCodec
  237. if codec, err = r.header.compression(); err != nil {
  238. return
  239. }
  240. if codec != nil {
  241. batchRemain := int(r.header.length - 49) // TODO: document this magic number
  242. if batchRemain > r.remain {
  243. err = errShortRead
  244. return
  245. }
  246. if batchRemain < 0 {
  247. err = fmt.Errorf("batch remain < 0 (%d)", batchRemain)
  248. return
  249. }
  250. r.decompressed.Reset()
  251. // x4 as a guess that the average compression ratio is near 75%
  252. r.decompressed.Grow(4 * batchRemain)
  253. limitReader := io.LimitedReader{R: r.reader, N: int64(batchRemain)}
  254. codecReader := codec.NewReader(&limitReader)
  255. _, err = r.decompressed.ReadFrom(codecReader)
  256. codecReader.Close()
  257. if err != nil {
  258. return
  259. }
  260. r.remain -= batchRemain - int(limitReader.N)
  261. r.readerStack = &readerStack{
  262. reader: bufio.NewReaderSize(r.decompressed, 0), // the new stack reads from the decompressed buffer
  263. remain: r.decompressed.Len(),
  264. base: -1, // base is unused here
  265. parent: r.readerStack,
  266. header: r.header,
  267. count: r.count,
  268. }
  269. // all of the messages in this set are in the decompressed set just pushed onto the reader
  270. // stack. here we set the parent count to 0 so that when the child set is exhausted, the
  271. // reader will then try to read the header of the next message set
  272. r.readerStack.parent.count = 0
  273. }
  274. }
  275. remainBefore := r.remain
  276. var length int64
  277. if err = r.readVarInt(&length); err != nil {
  278. return
  279. }
  280. lengthOfLength := remainBefore - r.remain
  281. var attrs int8
  282. if err = r.readInt8(&attrs); err != nil {
  283. return
  284. }
  285. var timestampDelta int64
  286. if err = r.readVarInt(&timestampDelta); err != nil {
  287. return
  288. }
  289. timestamp = r.header.v2.firstTimestamp + timestampDelta
  290. var offsetDelta int64
  291. if err = r.readVarInt(&offsetDelta); err != nil {
  292. return
  293. }
  294. offset = r.header.firstOffset + offsetDelta
  295. if err = r.runFunc(key); err != nil {
  296. return
  297. }
  298. if err = r.runFunc(val); err != nil {
  299. return
  300. }
  301. var headerCount int64
  302. if err = r.readVarInt(&headerCount); err != nil {
  303. return
  304. }
  305. if headerCount > 0 {
  306. headers = make([]Header, headerCount)
  307. for i := range headers {
  308. if err = r.readMessageHeader(&headers[i]); err != nil {
  309. return
  310. }
  311. }
  312. }
  313. lastOffset = r.header.firstOffset + int64(r.header.v2.lastOffsetDelta)
  314. r.lengthRemain -= int(length) + lengthOfLength
  315. r.markRead()
  316. return
  317. }
  318. func (r *messageSetReader) discardBytes() (err error) {
  319. r.remain, err = discardBytes(r.reader, r.remain)
  320. return
  321. }
  322. func (r *messageSetReader) discardN(sz int) (err error) {
  323. r.remain, err = discardN(r.reader, r.remain, sz)
  324. return
  325. }
  326. func (r *messageSetReader) markRead() {
  327. if r.count == 0 {
  328. panic("markRead: negative count")
  329. }
  330. r.count--
  331. r.unwindStack()
  332. if r.debug {
  333. r.log("Mark read remain=%d", r.remain)
  334. }
  335. }
  336. func (r *messageSetReader) unwindStack() {
  337. for r.count == 0 {
  338. if r.remain == 0 {
  339. if r.parent != nil {
  340. if r.debug {
  341. r.log("Popped reader stack")
  342. }
  343. r.readerStack = r.parent
  344. continue
  345. }
  346. }
  347. break
  348. }
  349. }
  350. func (r *messageSetReader) readMessageHeader(header *Header) (err error) {
  351. var keyLen int64
  352. if err = r.readVarInt(&keyLen); err != nil {
  353. return
  354. }
  355. if header.Key, err = r.readNewString(int(keyLen)); err != nil {
  356. return
  357. }
  358. var valLen int64
  359. if err = r.readVarInt(&valLen); err != nil {
  360. return
  361. }
  362. if header.Value, err = r.readNewBytes(int(valLen)); err != nil {
  363. return
  364. }
  365. return nil
  366. }
  367. func (r *messageSetReader) runFunc(rbFunc readBytesFunc) (err error) {
  368. var length int64
  369. if err = r.readVarInt(&length); err != nil {
  370. return
  371. }
  372. if r.remain, err = rbFunc(r.reader, r.remain, int(length)); err != nil {
  373. return
  374. }
  375. return
  376. }
  377. func (r *messageSetReader) readHeader() (err error) {
  378. if r.count > 0 {
  379. // currently reading a set of messages, no need to read a header until they are exhausted.
  380. return
  381. }
  382. r.header = messagesHeader{}
  383. if err = r.readInt64(&r.header.firstOffset); err != nil {
  384. return
  385. }
  386. if err = r.readInt32(&r.header.length); err != nil {
  387. return
  388. }
  389. var crcOrLeaderEpoch int32
  390. if err = r.readInt32(&crcOrLeaderEpoch); err != nil {
  391. return
  392. }
  393. if err = r.readInt8(&r.header.magic); err != nil {
  394. return
  395. }
  396. switch r.header.magic {
  397. case 0:
  398. r.header.crc = crcOrLeaderEpoch
  399. if err = r.readInt8(&r.header.v1.attributes); err != nil {
  400. return
  401. }
  402. r.count = 1
  403. // Set arbitrary non-zero length so that we always assume the
  404. // message is truncated since bytes remain.
  405. r.lengthRemain = 1
  406. if r.debug {
  407. r.log("Read v0 header with offset=%d len=%d magic=%d attributes=%d", r.header.firstOffset, r.header.length, r.header.magic, r.header.v1.attributes)
  408. }
  409. case 1:
  410. r.header.crc = crcOrLeaderEpoch
  411. if err = r.readInt8(&r.header.v1.attributes); err != nil {
  412. return
  413. }
  414. if err = r.readInt64(&r.header.v1.timestamp); err != nil {
  415. return
  416. }
  417. r.count = 1
  418. // Set arbitrary non-zero length so that we always assume the
  419. // message is truncated since bytes remain.
  420. r.lengthRemain = 1
  421. if r.debug {
  422. r.log("Read v1 header with remain=%d offset=%d magic=%d and attributes=%d", r.remain, r.header.firstOffset, r.header.magic, r.header.v1.attributes)
  423. }
  424. case 2:
  425. r.header.v2.leaderEpoch = crcOrLeaderEpoch
  426. if err = r.readInt32(&r.header.crc); err != nil {
  427. return
  428. }
  429. if err = r.readInt16(&r.header.v2.attributes); err != nil {
  430. return
  431. }
  432. if err = r.readInt32(&r.header.v2.lastOffsetDelta); err != nil {
  433. return
  434. }
  435. if err = r.readInt64(&r.header.v2.firstTimestamp); err != nil {
  436. return
  437. }
  438. if err = r.readInt64(&r.header.v2.lastTimestamp); err != nil {
  439. return
  440. }
  441. if err = r.readInt64(&r.header.v2.producerID); err != nil {
  442. return
  443. }
  444. if err = r.readInt16(&r.header.v2.producerEpoch); err != nil {
  445. return
  446. }
  447. if err = r.readInt32(&r.header.v2.baseSequence); err != nil {
  448. return
  449. }
  450. if err = r.readInt32(&r.header.v2.count); err != nil {
  451. return
  452. }
  453. r.count = int(r.header.v2.count)
  454. // Subtracts the header bytes from the length
  455. r.lengthRemain = int(r.header.length) - 49
  456. if r.debug {
  457. r.log("Read v2 header with count=%d offset=%d len=%d magic=%d attributes=%d", r.count, r.header.firstOffset, r.header.length, r.header.magic, r.header.v2.attributes)
  458. }
  459. default:
  460. err = r.header.badMagic()
  461. return
  462. }
  463. return
  464. }
  465. func (r *messageSetReader) readNewBytes(len int) (res []byte, err error) {
  466. res, r.remain, err = readNewBytes(r.reader, r.remain, len)
  467. return
  468. }
  469. func (r *messageSetReader) readNewString(len int) (res string, err error) {
  470. res, r.remain, err = readNewString(r.reader, r.remain, len)
  471. return
  472. }
  473. func (r *messageSetReader) readInt8(val *int8) (err error) {
  474. r.remain, err = readInt8(r.reader, r.remain, val)
  475. return
  476. }
  477. func (r *messageSetReader) readInt16(val *int16) (err error) {
  478. r.remain, err = readInt16(r.reader, r.remain, val)
  479. return
  480. }
  481. func (r *messageSetReader) readInt32(val *int32) (err error) {
  482. r.remain, err = readInt32(r.reader, r.remain, val)
  483. return
  484. }
  485. func (r *messageSetReader) readInt64(val *int64) (err error) {
  486. r.remain, err = readInt64(r.reader, r.remain, val)
  487. return
  488. }
  489. func (r *messageSetReader) readVarInt(val *int64) (err error) {
  490. r.remain, err = readVarInt(r.reader, r.remain, val)
  491. return
  492. }
  493. func (r *messageSetReader) readBytesWith(fn readBytesFunc) (err error) {
  494. r.remain, err = readBytesWith(r.reader, r.remain, fn)
  495. return
  496. }
  497. func (r *messageSetReader) log(msg string, args ...interface{}) {
  498. log.Printf("[DEBUG] "+msg, args...)
  499. }
  500. func extractOffset(base int64, msgSet []byte) (offset int64, err error) {
  501. r, remain := bufio.NewReader(bytes.NewReader(msgSet)), len(msgSet)
  502. for remain > 0 {
  503. if remain, err = readInt64(r, remain, &offset); err != nil {
  504. return
  505. }
  506. var sz int32
  507. if remain, err = readInt32(r, remain, &sz); err != nil {
  508. return
  509. }
  510. if remain, err = discardN(r, remain, int(sz)); err != nil {
  511. return
  512. }
  513. }
  514. offset = base - offset
  515. return
  516. }