read.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. package kafka
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "reflect"
  8. )
  9. var errShortRead = errors.New("not enough bytes available to load the response")
  10. func peekRead(r *bufio.Reader, sz int, n int, f func([]byte)) (int, error) {
  11. if n > sz {
  12. return sz, errShortRead
  13. }
  14. b, err := r.Peek(n)
  15. if err != nil {
  16. return sz, err
  17. }
  18. f(b)
  19. return discardN(r, sz, n)
  20. }
  21. func readInt8(r *bufio.Reader, sz int, v *int8) (int, error) {
  22. return peekRead(r, sz, 1, func(b []byte) { *v = makeInt8(b) })
  23. }
  24. func readInt16(r *bufio.Reader, sz int, v *int16) (int, error) {
  25. return peekRead(r, sz, 2, func(b []byte) { *v = makeInt16(b) })
  26. }
  27. func readInt32(r *bufio.Reader, sz int, v *int32) (int, error) {
  28. return peekRead(r, sz, 4, func(b []byte) { *v = makeInt32(b) })
  29. }
  30. func readInt64(r *bufio.Reader, sz int, v *int64) (int, error) {
  31. return peekRead(r, sz, 8, func(b []byte) { *v = makeInt64(b) })
  32. }
  33. func readVarInt(r *bufio.Reader, sz int, v *int64) (remain int, err error) {
  34. // Optimistically assume that most of the time, there will be data buffered
  35. // in the reader. If this is not the case, the buffer will be refilled after
  36. // consuming zero bytes from the input.
  37. input, _ := r.Peek(r.Buffered())
  38. x := uint64(0)
  39. s := uint(0)
  40. for {
  41. if len(input) > sz {
  42. input = input[:sz]
  43. }
  44. for i, b := range input {
  45. if b < 0x80 {
  46. x |= uint64(b) << s
  47. *v = int64(x>>1) ^ -(int64(x) & 1)
  48. n, err := r.Discard(i + 1)
  49. return sz - n, err
  50. }
  51. x |= uint64(b&0x7f) << s
  52. s += 7
  53. }
  54. // Make room in the input buffer to load more data from the underlying
  55. // stream. The x and s variables are left untouched, ensuring that the
  56. // varint decoding can continue on the next loop iteration.
  57. n, _ := r.Discard(len(input))
  58. sz -= n
  59. if sz == 0 {
  60. return 0, errShortRead
  61. }
  62. // Fill the buffer: ask for one more byte, but in practice the reader
  63. // will load way more from the underlying stream.
  64. if _, err := r.Peek(1); err != nil {
  65. if errors.Is(err, io.EOF) {
  66. err = errShortRead
  67. }
  68. return sz, err
  69. }
  70. // Grab as many bytes as possible from the buffer, then go on to the
  71. // next loop iteration which is going to consume it.
  72. input, _ = r.Peek(r.Buffered())
  73. }
  74. }
  75. func readBool(r *bufio.Reader, sz int, v *bool) (int, error) {
  76. return peekRead(r, sz, 1, func(b []byte) { *v = b[0] != 0 })
  77. }
  78. func readString(r *bufio.Reader, sz int, v *string) (int, error) {
  79. return readStringWith(r, sz, func(r *bufio.Reader, sz int, n int) (remain int, err error) {
  80. *v, remain, err = readNewString(r, sz, n)
  81. return
  82. })
  83. }
  84. func readStringWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int, int) (int, error)) (int, error) {
  85. var err error
  86. var len int16
  87. if sz, err = readInt16(r, sz, &len); err != nil {
  88. return sz, err
  89. }
  90. n := int(len)
  91. if n > sz {
  92. return sz, errShortRead
  93. }
  94. return cb(r, sz, n)
  95. }
  96. func readNewString(r *bufio.Reader, sz int, n int) (string, int, error) {
  97. b, sz, err := readNewBytes(r, sz, n)
  98. return string(b), sz, err
  99. }
  100. func readBytes(r *bufio.Reader, sz int, v *[]byte) (int, error) {
  101. return readBytesWith(r, sz, func(r *bufio.Reader, sz int, n int) (remain int, err error) {
  102. *v, remain, err = readNewBytes(r, sz, n)
  103. return
  104. })
  105. }
  106. func readBytesWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int, int) (int, error)) (int, error) {
  107. var err error
  108. var n int
  109. if sz, err = readArrayLen(r, sz, &n); err != nil {
  110. return sz, err
  111. }
  112. if n > sz {
  113. return sz, errShortRead
  114. }
  115. return cb(r, sz, n)
  116. }
  117. func readNewBytes(r *bufio.Reader, sz int, n int) ([]byte, int, error) {
  118. var err error
  119. var b []byte
  120. var shortRead bool
  121. if n > 0 {
  122. if sz < n {
  123. n = sz
  124. shortRead = true
  125. }
  126. b = make([]byte, n)
  127. n, err = io.ReadFull(r, b)
  128. b = b[:n]
  129. sz -= n
  130. if err == nil && shortRead {
  131. err = errShortRead
  132. }
  133. }
  134. return b, sz, err
  135. }
  136. func readArrayLen(r *bufio.Reader, sz int, n *int) (int, error) {
  137. var err error
  138. var len int32
  139. if sz, err = readInt32(r, sz, &len); err != nil {
  140. return sz, err
  141. }
  142. *n = int(len)
  143. return sz, nil
  144. }
  145. func readArrayWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int) (int, error)) (int, error) {
  146. var err error
  147. var len int32
  148. if sz, err = readInt32(r, sz, &len); err != nil {
  149. return sz, err
  150. }
  151. for n := int(len); n > 0; n-- {
  152. if sz, err = cb(r, sz); err != nil {
  153. break
  154. }
  155. }
  156. return sz, err
  157. }
  158. func readStringArray(r *bufio.Reader, sz int, v *[]string) (remain int, err error) {
  159. var content []string
  160. fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
  161. var value string
  162. if fnRemain, fnErr = readString(r, size, &value); fnErr != nil {
  163. return
  164. }
  165. content = append(content, value)
  166. return
  167. }
  168. if remain, err = readArrayWith(r, sz, fn); err != nil {
  169. return
  170. }
  171. *v = content
  172. return
  173. }
  174. func readMapStringInt32(r *bufio.Reader, sz int, v *map[string][]int32) (remain int, err error) {
  175. var len int32
  176. if remain, err = readInt32(r, sz, &len); err != nil {
  177. return
  178. }
  179. content := make(map[string][]int32, len)
  180. for i := 0; i < int(len); i++ {
  181. var key string
  182. var values []int32
  183. if remain, err = readString(r, remain, &key); err != nil {
  184. return
  185. }
  186. fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
  187. var value int32
  188. if fnRemain, fnErr = readInt32(r, size, &value); fnErr != nil {
  189. return
  190. }
  191. values = append(values, value)
  192. return
  193. }
  194. if remain, err = readArrayWith(r, remain, fn); err != nil {
  195. return
  196. }
  197. content[key] = values
  198. }
  199. *v = content
  200. return
  201. }
  202. func read(r *bufio.Reader, sz int, a interface{}) (int, error) {
  203. switch v := a.(type) {
  204. case *int8:
  205. return readInt8(r, sz, v)
  206. case *int16:
  207. return readInt16(r, sz, v)
  208. case *int32:
  209. return readInt32(r, sz, v)
  210. case *int64:
  211. return readInt64(r, sz, v)
  212. case *bool:
  213. return readBool(r, sz, v)
  214. case *string:
  215. return readString(r, sz, v)
  216. case *[]byte:
  217. return readBytes(r, sz, v)
  218. }
  219. switch v := reflect.ValueOf(a).Elem(); v.Kind() {
  220. case reflect.Struct:
  221. return readStruct(r, sz, v)
  222. case reflect.Slice:
  223. return readSlice(r, sz, v)
  224. default:
  225. panic(fmt.Sprintf("unsupported type: %T", a))
  226. }
  227. }
  228. func readStruct(r *bufio.Reader, sz int, v reflect.Value) (int, error) {
  229. var err error
  230. for i, n := 0, v.NumField(); i != n; i++ {
  231. if sz, err = read(r, sz, v.Field(i).Addr().Interface()); err != nil {
  232. return sz, err
  233. }
  234. }
  235. return sz, nil
  236. }
  237. func readSlice(r *bufio.Reader, sz int, v reflect.Value) (int, error) {
  238. var err error
  239. var len int32
  240. if sz, err = readInt32(r, sz, &len); err != nil {
  241. return sz, err
  242. }
  243. if n := int(len); n < 0 {
  244. v.Set(reflect.Zero(v.Type()))
  245. } else {
  246. v.Set(reflect.MakeSlice(v.Type(), n, n))
  247. for i := 0; i != n; i++ {
  248. if sz, err = read(r, sz, v.Index(i).Addr().Interface()); err != nil {
  249. return sz, err
  250. }
  251. }
  252. }
  253. return sz, nil
  254. }
  255. func readFetchResponseHeaderV2(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) {
  256. var n int32
  257. var p struct {
  258. Partition int32
  259. ErrorCode int16
  260. HighwaterMarkOffset int64
  261. MessageSetSize int32
  262. }
  263. if remain, err = readInt32(r, size, &throttle); err != nil {
  264. return
  265. }
  266. if remain, err = readInt32(r, remain, &n); err != nil {
  267. return
  268. }
  269. // This error should never trigger, unless there's a bug in the kafka client
  270. // or server.
  271. if n != 1 {
  272. err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n)
  273. return
  274. }
  275. // We ignore the topic name because we've requests messages for a single
  276. // topic, unless there's a bug in the kafka server we will have received
  277. // the name of the topic that we requested.
  278. if remain, err = discardString(r, remain); err != nil {
  279. return
  280. }
  281. if remain, err = readInt32(r, remain, &n); err != nil {
  282. return
  283. }
  284. // This error should never trigger, unless there's a bug in the kafka client
  285. // or server.
  286. if n != 1 {
  287. err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n)
  288. return
  289. }
  290. if remain, err = read(r, remain, &p); err != nil {
  291. return
  292. }
  293. if p.ErrorCode != 0 {
  294. err = Error(p.ErrorCode)
  295. return
  296. }
  297. // This error should never trigger, unless there's a bug in the kafka client
  298. // or server.
  299. if remain != int(p.MessageSetSize) {
  300. err = fmt.Errorf("the size of the message set in a fetch response doesn't match the number of remaining bytes (message set size = %d, remaining bytes = %d)", p.MessageSetSize, remain)
  301. return
  302. }
  303. watermark = p.HighwaterMarkOffset
  304. return
  305. }
  306. func readFetchResponseHeaderV5(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) {
  307. var n int32
  308. type AbortedTransaction struct {
  309. ProducerId int64
  310. FirstOffset int64
  311. }
  312. var p struct {
  313. Partition int32
  314. ErrorCode int16
  315. HighwaterMarkOffset int64
  316. LastStableOffset int64
  317. LogStartOffset int64
  318. }
  319. var messageSetSize int32
  320. var abortedTransactions []AbortedTransaction
  321. if remain, err = readInt32(r, size, &throttle); err != nil {
  322. return
  323. }
  324. if remain, err = readInt32(r, remain, &n); err != nil {
  325. return
  326. }
  327. // This error should never trigger, unless there's a bug in the kafka client
  328. // or server.
  329. if n != 1 {
  330. err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n)
  331. return
  332. }
  333. // We ignore the topic name because we've requests messages for a single
  334. // topic, unless there's a bug in the kafka server we will have received
  335. // the name of the topic that we requested.
  336. if remain, err = discardString(r, remain); err != nil {
  337. return
  338. }
  339. if remain, err = readInt32(r, remain, &n); err != nil {
  340. return
  341. }
  342. // This error should never trigger, unless there's a bug in the kafka client
  343. // or server.
  344. if n != 1 {
  345. err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n)
  346. return
  347. }
  348. if remain, err = read(r, remain, &p); err != nil {
  349. return
  350. }
  351. var abortedTransactionLen int
  352. if remain, err = readArrayLen(r, remain, &abortedTransactionLen); err != nil {
  353. return
  354. }
  355. if abortedTransactionLen == -1 {
  356. abortedTransactions = nil
  357. } else {
  358. abortedTransactions = make([]AbortedTransaction, abortedTransactionLen)
  359. for i := 0; i < abortedTransactionLen; i++ {
  360. if remain, err = read(r, remain, &abortedTransactions[i]); err != nil {
  361. return
  362. }
  363. }
  364. }
  365. if p.ErrorCode != 0 {
  366. err = Error(p.ErrorCode)
  367. return
  368. }
  369. remain, err = readInt32(r, remain, &messageSetSize)
  370. if err != nil {
  371. return
  372. }
  373. // This error should never trigger, unless there's a bug in the kafka client
  374. // or server.
  375. if remain != int(messageSetSize) {
  376. err = fmt.Errorf("the size of the message set in a fetch response doesn't match the number of remaining bytes (message set size = %d, remaining bytes = %d)", messageSetSize, remain)
  377. return
  378. }
  379. watermark = p.HighwaterMarkOffset
  380. return
  381. }
  382. func readFetchResponseHeaderV10(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) {
  383. var n int32
  384. var errorCode int16
  385. type AbortedTransaction struct {
  386. ProducerId int64
  387. FirstOffset int64
  388. }
  389. var p struct {
  390. Partition int32
  391. ErrorCode int16
  392. HighwaterMarkOffset int64
  393. LastStableOffset int64
  394. LogStartOffset int64
  395. }
  396. var messageSetSize int32
  397. var abortedTransactions []AbortedTransaction
  398. if remain, err = readInt32(r, size, &throttle); err != nil {
  399. return
  400. }
  401. if remain, err = readInt16(r, remain, &errorCode); err != nil {
  402. return
  403. }
  404. if errorCode != 0 {
  405. err = Error(errorCode)
  406. return
  407. }
  408. if remain, err = discardInt32(r, remain); err != nil {
  409. return
  410. }
  411. if remain, err = readInt32(r, remain, &n); err != nil {
  412. return
  413. }
  414. // This error should never trigger, unless there's a bug in the kafka client
  415. // or server.
  416. if n != 1 {
  417. err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n)
  418. return
  419. }
  420. // We ignore the topic name because we've requests messages for a single
  421. // topic, unless there's a bug in the kafka server we will have received
  422. // the name of the topic that we requested.
  423. if remain, err = discardString(r, remain); err != nil {
  424. return
  425. }
  426. if remain, err = readInt32(r, remain, &n); err != nil {
  427. return
  428. }
  429. // This error should never trigger, unless there's a bug in the kafka client
  430. // or server.
  431. if n != 1 {
  432. err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n)
  433. return
  434. }
  435. if remain, err = read(r, remain, &p); err != nil {
  436. return
  437. }
  438. var abortedTransactionLen int
  439. if remain, err = readArrayLen(r, remain, &abortedTransactionLen); err != nil {
  440. return
  441. }
  442. if abortedTransactionLen == -1 {
  443. abortedTransactions = nil
  444. } else {
  445. abortedTransactions = make([]AbortedTransaction, abortedTransactionLen)
  446. for i := 0; i < abortedTransactionLen; i++ {
  447. if remain, err = read(r, remain, &abortedTransactions[i]); err != nil {
  448. return
  449. }
  450. }
  451. }
  452. if p.ErrorCode != 0 {
  453. err = Error(p.ErrorCode)
  454. return
  455. }
  456. remain, err = readInt32(r, remain, &messageSetSize)
  457. if err != nil {
  458. return
  459. }
  460. // This error should never trigger, unless there's a bug in the kafka client
  461. // or server.
  462. if remain != int(messageSetSize) {
  463. err = fmt.Errorf("the size of the message set in a fetch response doesn't match the number of remaining bytes (message set size = %d, remaining bytes = %d)", messageSetSize, remain)
  464. return
  465. }
  466. watermark = p.HighwaterMarkOffset
  467. return
  468. }