message.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592
  1. package kafka
  2. import (
  3. "bufio"
  4. "bytes"
  5. "fmt"
  6. "io"
  7. "time"
  8. )
  9. // Message is a data structure representing kafka messages.
  10. type Message struct {
  11. // Topic indicates which topic this message was consumed from via Reader.
  12. //
  13. // When being used with Writer, this can be used to configured the topic if
  14. // not already specified on the writer itself.
  15. Topic string
  16. // Partition is read-only and MUST NOT be set when writing messages
  17. Partition int
  18. Offset int64
  19. HighWaterMark int64
  20. Key []byte
  21. Value []byte
  22. Headers []Header
  23. // If not set at the creation, Time will be automatically set when
  24. // writing the message.
  25. Time time.Time
  26. }
  27. func (msg Message) message(cw *crc32Writer) message {
  28. m := message{
  29. MagicByte: 1,
  30. Key: msg.Key,
  31. Value: msg.Value,
  32. Timestamp: timestamp(msg.Time),
  33. }
  34. if cw != nil {
  35. m.CRC = m.crc32(cw)
  36. }
  37. return m
  38. }
  39. const timestampSize = 8
  40. func (msg *Message) size() int32 {
  41. return 4 + 1 + 1 + sizeofBytes(msg.Key) + sizeofBytes(msg.Value) + timestampSize
  42. }
  43. type message struct {
  44. CRC int32
  45. MagicByte int8
  46. Attributes int8
  47. Timestamp int64
  48. Key []byte
  49. Value []byte
  50. }
  51. func (m message) crc32(cw *crc32Writer) int32 {
  52. cw.crc32 = 0
  53. cw.writeInt8(m.MagicByte)
  54. cw.writeInt8(m.Attributes)
  55. if m.MagicByte != 0 {
  56. cw.writeInt64(m.Timestamp)
  57. }
  58. cw.writeBytes(m.Key)
  59. cw.writeBytes(m.Value)
  60. return int32(cw.crc32)
  61. }
  62. func (m message) size() int32 {
  63. size := 4 + 1 + 1 + sizeofBytes(m.Key) + sizeofBytes(m.Value)
  64. if m.MagicByte != 0 {
  65. size += timestampSize
  66. }
  67. return size
  68. }
  69. func (m message) writeTo(wb *writeBuffer) {
  70. wb.writeInt32(m.CRC)
  71. wb.writeInt8(m.MagicByte)
  72. wb.writeInt8(m.Attributes)
  73. if m.MagicByte != 0 {
  74. wb.writeInt64(m.Timestamp)
  75. }
  76. wb.writeBytes(m.Key)
  77. wb.writeBytes(m.Value)
  78. }
  79. type messageSetItem struct {
  80. Offset int64
  81. MessageSize int32
  82. Message message
  83. }
  84. func (m messageSetItem) size() int32 {
  85. return 8 + 4 + m.Message.size()
  86. }
  87. func (m messageSetItem) writeTo(wb *writeBuffer) {
  88. wb.writeInt64(m.Offset)
  89. wb.writeInt32(m.MessageSize)
  90. m.Message.writeTo(wb)
  91. }
  92. type messageSet []messageSetItem
  93. func (s messageSet) size() (size int32) {
  94. for _, m := range s {
  95. size += m.size()
  96. }
  97. return
  98. }
  99. func (s messageSet) writeTo(wb *writeBuffer) {
  100. for _, m := range s {
  101. m.writeTo(wb)
  102. }
  103. }
  104. type messageSetReader struct {
  105. empty bool
  106. version int
  107. v1 messageSetReaderV1
  108. v2 messageSetReaderV2
  109. }
  110. func (r *messageSetReader) readMessage(min int64,
  111. key func(*bufio.Reader, int, int) (int, error),
  112. val func(*bufio.Reader, int, int) (int, error),
  113. ) (offset int64, timestamp int64, headers []Header, err error) {
  114. if r.empty {
  115. return 0, 0, nil, RequestTimedOut
  116. }
  117. switch r.version {
  118. case 1:
  119. return r.v1.readMessage(min, key, val)
  120. case 2:
  121. return r.v2.readMessage(min, key, val)
  122. default:
  123. panic("Invalid messageSetReader - unknown message reader version")
  124. }
  125. }
  126. func (r *messageSetReader) remaining() (remain int) {
  127. if r.empty {
  128. return 0
  129. }
  130. switch r.version {
  131. case 1:
  132. return r.v1.remaining()
  133. case 2:
  134. return r.v2.remaining()
  135. default:
  136. panic("Invalid messageSetReader - unknown message reader version")
  137. }
  138. }
  139. func (r *messageSetReader) discard() (err error) {
  140. if r.empty {
  141. return nil
  142. }
  143. switch r.version {
  144. case 1:
  145. return r.v1.discard()
  146. case 2:
  147. return r.v2.discard()
  148. default:
  149. panic("Invalid messageSetReader - unknown message reader version")
  150. }
  151. }
  152. type messageSetReaderV1 struct {
  153. *readerStack
  154. }
  155. type readerStack struct {
  156. reader *bufio.Reader
  157. remain int
  158. base int64
  159. parent *readerStack
  160. }
  161. func newMessageSetReader(reader *bufio.Reader, remain int) (*messageSetReader, error) {
  162. headerLength := 8 + 4 + 4 + 1 // offset + messageSize + crc + magicByte
  163. if headerLength > remain {
  164. return nil, errShortRead
  165. }
  166. b, err := reader.Peek(headerLength)
  167. if err != nil {
  168. return nil, err
  169. }
  170. var version int8 = int8(b[headerLength-1])
  171. switch version {
  172. case 0, 1:
  173. return &messageSetReader{
  174. version: 1,
  175. v1: messageSetReaderV1{&readerStack{
  176. reader: reader,
  177. remain: remain,
  178. }}}, nil
  179. case 2:
  180. mr := &messageSetReader{
  181. version: 2,
  182. v2: messageSetReaderV2{
  183. readerStack: &readerStack{
  184. reader: reader,
  185. remain: remain,
  186. },
  187. messageCount: 0,
  188. }}
  189. return mr, nil
  190. default:
  191. return nil, fmt.Errorf("unsupported message version %d found in fetch response", version)
  192. }
  193. }
  194. func (r *messageSetReaderV1) readMessage(min int64,
  195. key func(*bufio.Reader, int, int) (int, error),
  196. val func(*bufio.Reader, int, int) (int, error),
  197. ) (offset int64, timestamp int64, headers []Header, err error) {
  198. for r.readerStack != nil {
  199. if r.remain == 0 {
  200. r.readerStack = r.parent
  201. continue
  202. }
  203. var attributes int8
  204. if offset, attributes, timestamp, r.remain, err = readMessageHeader(r.reader, r.remain); err != nil {
  205. return
  206. }
  207. // if the message is compressed, decompress it and push a new reader
  208. // onto the stack.
  209. code := attributes & compressionCodecMask
  210. if code != 0 {
  211. var codec CompressionCodec
  212. if codec, err = resolveCodec(code); err != nil {
  213. return
  214. }
  215. // discard next four bytes...will be -1 to indicate null key
  216. if r.remain, err = discardN(r.reader, r.remain, 4); err != nil {
  217. return
  218. }
  219. // read and decompress the contained message set.
  220. var decompressed bytes.Buffer
  221. if r.remain, err = readBytesWith(r.reader, r.remain, func(r *bufio.Reader, sz, n int) (remain int, err error) {
  222. // x4 as a guess that the average compression ratio is near 75%
  223. decompressed.Grow(4 * n)
  224. l := io.LimitedReader{R: r, N: int64(n)}
  225. d := codec.NewReader(&l)
  226. _, err = decompressed.ReadFrom(d)
  227. remain = sz - (n - int(l.N))
  228. d.Close()
  229. return
  230. }); err != nil {
  231. return
  232. }
  233. // the compressed message's offset will be equal to the offset of
  234. // the last message in the set. within the compressed set, the
  235. // offsets will be relative, so we have to scan through them to
  236. // get the base offset. for example, if there are four compressed
  237. // messages at offsets 10-13, then the container message will have
  238. // offset 13 and the contained messages will be 0,1,2,3. the base
  239. // offset for the container, then is 13-3=10.
  240. if offset, err = extractOffset(offset, decompressed.Bytes()); err != nil {
  241. return
  242. }
  243. r.readerStack = &readerStack{
  244. // Allocate a buffer of size 0, which gets capped at 16 bytes
  245. // by the bufio package. We are already reading buffered data
  246. // here, no need to reserve another 4KB buffer.
  247. reader: bufio.NewReaderSize(&decompressed, 0),
  248. remain: decompressed.Len(),
  249. base: offset,
  250. parent: r.readerStack,
  251. }
  252. continue
  253. }
  254. // adjust the offset in case we're reading compressed messages. the
  255. // base will be zero otherwise.
  256. offset += r.base
  257. // When the messages are compressed kafka may return messages at an
  258. // earlier offset than the one that was requested, it's the client's
  259. // responsibility to ignore those.
  260. if offset < min {
  261. if r.remain, err = discardBytes(r.reader, r.remain); err != nil {
  262. return
  263. }
  264. if r.remain, err = discardBytes(r.reader, r.remain); err != nil {
  265. return
  266. }
  267. continue
  268. }
  269. if r.remain, err = readBytesWith(r.reader, r.remain, key); err != nil {
  270. return
  271. }
  272. r.remain, err = readBytesWith(r.reader, r.remain, val)
  273. return
  274. }
  275. err = errShortRead
  276. return
  277. }
  278. func (r *messageSetReaderV1) remaining() (remain int) {
  279. for s := r.readerStack; s != nil; s = s.parent {
  280. remain += s.remain
  281. }
  282. return
  283. }
  284. func (r *messageSetReaderV1) discard() (err error) {
  285. if r.readerStack == nil {
  286. return
  287. }
  288. // rewind up to the top-most reader b/c it's the only one that's doing
  289. // actual i/o. the rest are byte buffers that have been pushed on the stack
  290. // while reading compressed message sets.
  291. for r.parent != nil {
  292. r.readerStack = r.parent
  293. }
  294. r.remain, err = discardN(r.reader, r.remain, r.remain)
  295. return
  296. }
  297. func extractOffset(base int64, msgSet []byte) (offset int64, err error) {
  298. r, remain := bufio.NewReader(bytes.NewReader(msgSet)), len(msgSet)
  299. for remain > 0 {
  300. if remain, err = readInt64(r, remain, &offset); err != nil {
  301. return
  302. }
  303. var sz int32
  304. if remain, err = readInt32(r, remain, &sz); err != nil {
  305. return
  306. }
  307. if remain, err = discardN(r, remain, int(sz)); err != nil {
  308. return
  309. }
  310. }
  311. offset = base - offset
  312. return
  313. }
  314. type messageSetHeaderV2 struct {
  315. firstOffset int64
  316. length int32
  317. partitionLeaderEpoch int32
  318. magic int8
  319. crc int32
  320. batchAttributes int16
  321. lastOffsetDelta int32
  322. firstTimestamp int64
  323. maxTimestamp int64
  324. producerId int64
  325. producerEpoch int16
  326. firstSequence int32
  327. }
  328. type timestampType int8
  329. const (
  330. createTime timestampType = 0
  331. logAppendTime timestampType = 1
  332. )
  333. type transactionType int8
  334. const (
  335. nonTransactional transactionType = 0
  336. transactional transactionType = 1
  337. )
  338. type controlType int8
  339. const (
  340. nonControlMessage controlType = 0
  341. controlMessage controlType = 1
  342. )
  343. func (h *messageSetHeaderV2) compression() int8 {
  344. return int8(h.batchAttributes & 7)
  345. }
  346. func (h *messageSetHeaderV2) timestampType() timestampType {
  347. return timestampType((h.batchAttributes & (1 << 3)) >> 3)
  348. }
  349. func (h *messageSetHeaderV2) transactionType() transactionType {
  350. return transactionType((h.batchAttributes & (1 << 4)) >> 4)
  351. }
  352. func (h *messageSetHeaderV2) controlType() controlType {
  353. return controlType((h.batchAttributes & (1 << 5)) >> 5)
  354. }
  355. type messageSetReaderV2 struct {
  356. *readerStack
  357. messageCount int
  358. header messageSetHeaderV2
  359. }
  360. func (r *messageSetReaderV2) readHeader() (err error) {
  361. h := &r.header
  362. if r.remain, err = readInt64(r.reader, r.remain, &h.firstOffset); err != nil {
  363. return
  364. }
  365. if r.remain, err = readInt32(r.reader, r.remain, &h.length); err != nil {
  366. return
  367. }
  368. if r.remain, err = readInt32(r.reader, r.remain, &h.partitionLeaderEpoch); err != nil {
  369. return
  370. }
  371. if r.remain, err = readInt8(r.reader, r.remain, &h.magic); err != nil {
  372. return
  373. }
  374. if r.remain, err = readInt32(r.reader, r.remain, &h.crc); err != nil {
  375. return
  376. }
  377. if r.remain, err = readInt16(r.reader, r.remain, &h.batchAttributes); err != nil {
  378. return
  379. }
  380. if r.remain, err = readInt32(r.reader, r.remain, &h.lastOffsetDelta); err != nil {
  381. return
  382. }
  383. if r.remain, err = readInt64(r.reader, r.remain, &h.firstTimestamp); err != nil {
  384. return
  385. }
  386. if r.remain, err = readInt64(r.reader, r.remain, &h.maxTimestamp); err != nil {
  387. return
  388. }
  389. if r.remain, err = readInt64(r.reader, r.remain, &h.producerId); err != nil {
  390. return
  391. }
  392. if r.remain, err = readInt16(r.reader, r.remain, &h.producerEpoch); err != nil {
  393. return
  394. }
  395. if r.remain, err = readInt32(r.reader, r.remain, &h.firstSequence); err != nil {
  396. return
  397. }
  398. var messageCount int32
  399. if r.remain, err = readInt32(r.reader, r.remain, &messageCount); err != nil {
  400. return
  401. }
  402. r.messageCount = int(messageCount)
  403. return nil
  404. }
  405. func (r *messageSetReaderV2) readMessage(min int64,
  406. key func(*bufio.Reader, int, int) (int, error),
  407. val func(*bufio.Reader, int, int) (int, error),
  408. ) (offset int64, timestamp int64, headers []Header, err error) {
  409. if r.messageCount == 0 {
  410. if r.remain == 0 {
  411. if r.parent != nil {
  412. r.readerStack = r.parent
  413. }
  414. }
  415. if err = r.readHeader(); err != nil {
  416. return
  417. }
  418. if code := r.header.compression(); code != 0 {
  419. var codec CompressionCodec
  420. if codec, err = resolveCodec(code); err != nil {
  421. return
  422. }
  423. var batchRemain = int(r.header.length - 49)
  424. if batchRemain > r.remain {
  425. err = errShortRead
  426. return
  427. }
  428. var decompressed bytes.Buffer
  429. decompressed.Grow(4 * batchRemain)
  430. l := io.LimitedReader{R: r.reader, N: int64(batchRemain)}
  431. d := codec.NewReader(&l)
  432. _, err = decompressed.ReadFrom(d)
  433. r.remain = r.remain - (batchRemain - int(l.N))
  434. d.Close()
  435. if err != nil {
  436. return
  437. }
  438. r.readerStack = &readerStack{
  439. reader: bufio.NewReaderSize(&decompressed, 0),
  440. remain: decompressed.Len(),
  441. base: -1, // base is unused here
  442. parent: r.readerStack,
  443. }
  444. }
  445. }
  446. var length int64
  447. if r.remain, err = readVarInt(r.reader, r.remain, &length); err != nil {
  448. return
  449. }
  450. var attrs int8
  451. if r.remain, err = readInt8(r.reader, r.remain, &attrs); err != nil {
  452. return
  453. }
  454. var timestampDelta int64
  455. if r.remain, err = readVarInt(r.reader, r.remain, &timestampDelta); err != nil {
  456. return
  457. }
  458. var offsetDelta int64
  459. if r.remain, err = readVarInt(r.reader, r.remain, &offsetDelta); err != nil {
  460. return
  461. }
  462. var keyLen int64
  463. if r.remain, err = readVarInt(r.reader, r.remain, &keyLen); err != nil {
  464. return
  465. }
  466. if r.remain, err = key(r.reader, r.remain, int(keyLen)); err != nil {
  467. return
  468. }
  469. var valueLen int64
  470. if r.remain, err = readVarInt(r.reader, r.remain, &valueLen); err != nil {
  471. return
  472. }
  473. if r.remain, err = val(r.reader, r.remain, int(valueLen)); err != nil {
  474. return
  475. }
  476. var headerCount int64
  477. if r.remain, err = readVarInt(r.reader, r.remain, &headerCount); err != nil {
  478. return
  479. }
  480. headers = make([]Header, headerCount)
  481. for i := 0; i < int(headerCount); i++ {
  482. if err = r.readMessageHeader(&headers[i]); err != nil {
  483. return
  484. }
  485. }
  486. r.messageCount--
  487. return r.header.firstOffset + offsetDelta, r.header.firstTimestamp + timestampDelta, headers, nil
  488. }
  489. func (r *messageSetReaderV2) readMessageHeader(header *Header) (err error) {
  490. var keyLen int64
  491. if r.remain, err = readVarInt(r.reader, r.remain, &keyLen); err != nil {
  492. return
  493. }
  494. if header.Key, r.remain, err = readNewString(r.reader, r.remain, int(keyLen)); err != nil {
  495. return
  496. }
  497. var valLen int64
  498. if r.remain, err = readVarInt(r.reader, r.remain, &valLen); err != nil {
  499. return
  500. }
  501. if header.Value, r.remain, err = readNewBytes(r.reader, r.remain, int(valLen)); err != nil {
  502. return
  503. }
  504. return nil
  505. }
  506. func (r *messageSetReaderV2) remaining() (remain int) {
  507. return r.remain
  508. }
  509. func (r *messageSetReaderV2) discard() (err error) {
  510. r.remain, err = discardN(r.reader, r.remain, r.remain)
  511. return
  512. }