buffer.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634
  1. package protocol
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "sync"
  9. "sync/atomic"
  10. )
  11. // Bytes is an interface implemented by types that represent immutable
  12. // sequences of bytes.
  13. //
  14. // Bytes values are used to abstract the location where record keys and
  15. // values are read from (e.g. in-memory buffers, network sockets, files).
  16. //
  17. // The Close method should be called to release resources held by the object
  18. // when the program is done with it.
  19. //
  20. // Bytes values are generally not safe to use concurrently from multiple
  21. // goroutines.
  22. type Bytes interface {
  23. io.ReadCloser
  24. // Returns the number of bytes remaining to be read from the payload.
  25. Len() int
  26. }
  27. // NewBytes constructs a Bytes value from b.
  28. //
  29. // The returned value references b, it does not make a copy of the backing
  30. // array.
  31. //
  32. // If b is nil, nil is returned to represent a null BYTES value in the kafka
  33. // protocol.
  34. func NewBytes(b []byte) Bytes {
  35. if b == nil {
  36. return nil
  37. }
  38. r := new(bytesReader)
  39. r.Reset(b)
  40. return r
  41. }
  42. // ReadAll is similar to ioutil.ReadAll, but it takes advantage of knowing the
  43. // length of b to minimize the memory footprint.
  44. //
  45. // The function returns a nil slice if b is nil.
  46. func ReadAll(b Bytes) ([]byte, error) {
  47. if b == nil {
  48. return nil, nil
  49. }
  50. s := make([]byte, b.Len())
  51. _, err := io.ReadFull(b, s)
  52. return s, err
  53. }
  54. type bytesReader struct{ bytes.Reader }
  55. func (*bytesReader) Close() error { return nil }
  56. type refCount uintptr
  57. func (rc *refCount) ref() { atomic.AddUintptr((*uintptr)(rc), 1) }
  58. func (rc *refCount) unref(onZero func()) {
  59. if atomic.AddUintptr((*uintptr)(rc), ^uintptr(0)) == 0 {
  60. onZero()
  61. }
  62. }
  63. const (
  64. // Size of the memory buffer for a single page. We use a farily
  65. // large size here (64 KiB) because batches exchanged with kafka
  66. // tend to be multiple kilobytes in size, sometimes hundreds.
  67. // Using large pages amortizes the overhead of the page metadata
  68. // and algorithms to manage the pages.
  69. pageSize = 65536
  70. )
  71. type page struct {
  72. refc refCount
  73. offset int64
  74. length int
  75. buffer *[pageSize]byte
  76. }
  77. func newPage(offset int64) *page {
  78. p, _ := pagePool.Get().(*page)
  79. if p != nil {
  80. p.offset = offset
  81. p.length = 0
  82. p.ref()
  83. } else {
  84. p = &page{
  85. refc: 1,
  86. offset: offset,
  87. buffer: &[pageSize]byte{},
  88. }
  89. }
  90. return p
  91. }
  92. func (p *page) ref() { p.refc.ref() }
  93. func (p *page) unref() { p.refc.unref(func() { pagePool.Put(p) }) }
  94. func (p *page) slice(begin, end int64) []byte {
  95. i, j := begin-p.offset, end-p.offset
  96. if i < 0 {
  97. i = 0
  98. } else if i > pageSize {
  99. i = pageSize
  100. }
  101. if j < 0 {
  102. j = 0
  103. } else if j > pageSize {
  104. j = pageSize
  105. }
  106. if i < j {
  107. return p.buffer[i:j]
  108. }
  109. return nil
  110. }
  111. func (p *page) Cap() int { return pageSize }
  112. func (p *page) Len() int { return p.length }
  113. func (p *page) Size() int64 { return int64(p.length) }
  114. func (p *page) Truncate(n int) {
  115. if n < p.length {
  116. p.length = n
  117. }
  118. }
  119. func (p *page) ReadAt(b []byte, off int64) (int, error) {
  120. if off -= p.offset; off < 0 || off > pageSize {
  121. panic("offset out of range")
  122. }
  123. if off > int64(p.length) {
  124. return 0, nil
  125. }
  126. return copy(b, p.buffer[off:p.length]), nil
  127. }
  128. func (p *page) ReadFrom(r io.Reader) (int64, error) {
  129. n, err := io.ReadFull(r, p.buffer[p.length:])
  130. if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
  131. err = nil
  132. }
  133. p.length += n
  134. return int64(n), err
  135. }
  136. func (p *page) WriteAt(b []byte, off int64) (int, error) {
  137. if off -= p.offset; off < 0 || off > pageSize {
  138. panic("offset out of range")
  139. }
  140. n := copy(p.buffer[off:], b)
  141. if end := int(off) + n; end > p.length {
  142. p.length = end
  143. }
  144. return n, nil
  145. }
  146. func (p *page) Write(b []byte) (int, error) {
  147. return p.WriteAt(b, p.offset+int64(p.length))
  148. }
  149. var (
  150. _ io.ReaderAt = (*page)(nil)
  151. _ io.ReaderFrom = (*page)(nil)
  152. _ io.Writer = (*page)(nil)
  153. _ io.WriterAt = (*page)(nil)
  154. )
  155. type pageBuffer struct {
  156. refc refCount
  157. pages contiguousPages
  158. length int
  159. cursor int
  160. }
  161. func newPageBuffer() *pageBuffer {
  162. b, _ := pageBufferPool.Get().(*pageBuffer)
  163. if b != nil {
  164. b.cursor = 0
  165. b.refc.ref()
  166. } else {
  167. b = &pageBuffer{
  168. refc: 1,
  169. pages: make(contiguousPages, 0, 16),
  170. }
  171. }
  172. return b
  173. }
  174. func (pb *pageBuffer) refTo(ref *pageRef, begin, end int64) {
  175. length := end - begin
  176. if length > math.MaxUint32 {
  177. panic("reference to contiguous buffer pages exceeds the maximum size of 4 GB")
  178. }
  179. ref.pages = append(ref.buffer[:0], pb.pages.slice(begin, end)...)
  180. ref.pages.ref()
  181. ref.offset = begin
  182. ref.length = uint32(length)
  183. }
  184. func (pb *pageBuffer) ref(begin, end int64) *pageRef {
  185. ref := new(pageRef)
  186. pb.refTo(ref, begin, end)
  187. return ref
  188. }
  189. func (pb *pageBuffer) unref() {
  190. pb.refc.unref(func() {
  191. pb.pages.unref()
  192. pb.pages.clear()
  193. pb.pages = pb.pages[:0]
  194. pb.length = 0
  195. pageBufferPool.Put(pb)
  196. })
  197. }
  198. func (pb *pageBuffer) newPage() *page {
  199. return newPage(int64(pb.length))
  200. }
  201. func (pb *pageBuffer) Close() error {
  202. return nil
  203. }
  204. func (pb *pageBuffer) Len() int {
  205. return pb.length - pb.cursor
  206. }
  207. func (pb *pageBuffer) Size() int64 {
  208. return int64(pb.length)
  209. }
  210. func (pb *pageBuffer) Discard(n int) (int, error) {
  211. remain := pb.length - pb.cursor
  212. if remain < n {
  213. n = remain
  214. }
  215. pb.cursor += n
  216. return n, nil
  217. }
  218. func (pb *pageBuffer) Truncate(n int) {
  219. if n < pb.length {
  220. pb.length = n
  221. if n < pb.cursor {
  222. pb.cursor = n
  223. }
  224. for i := range pb.pages {
  225. if p := pb.pages[i]; p.length <= n {
  226. n -= p.length
  227. } else {
  228. if n > 0 {
  229. pb.pages[i].Truncate(n)
  230. i++
  231. }
  232. pb.pages[i:].unref()
  233. pb.pages[i:].clear()
  234. pb.pages = pb.pages[:i]
  235. break
  236. }
  237. }
  238. }
  239. }
  240. func (pb *pageBuffer) Seek(offset int64, whence int) (int64, error) {
  241. c, err := seek(int64(pb.cursor), int64(pb.length), offset, whence)
  242. if err != nil {
  243. return -1, err
  244. }
  245. pb.cursor = int(c)
  246. return c, nil
  247. }
  248. func (pb *pageBuffer) ReadByte() (byte, error) {
  249. b := [1]byte{}
  250. _, err := pb.Read(b[:])
  251. return b[0], err
  252. }
  253. func (pb *pageBuffer) Read(b []byte) (int, error) {
  254. if pb.cursor >= pb.length {
  255. return 0, io.EOF
  256. }
  257. n, err := pb.ReadAt(b, int64(pb.cursor))
  258. pb.cursor += n
  259. return n, err
  260. }
  261. func (pb *pageBuffer) ReadAt(b []byte, off int64) (int, error) {
  262. return pb.pages.ReadAt(b, off)
  263. }
  264. func (pb *pageBuffer) ReadFrom(r io.Reader) (int64, error) {
  265. if len(pb.pages) == 0 {
  266. pb.pages = append(pb.pages, pb.newPage())
  267. }
  268. rn := int64(0)
  269. for {
  270. tail := pb.pages[len(pb.pages)-1]
  271. free := tail.Cap() - tail.Len()
  272. if free == 0 {
  273. tail = pb.newPage()
  274. free = pageSize
  275. pb.pages = append(pb.pages, tail)
  276. }
  277. n, err := tail.ReadFrom(r)
  278. pb.length += int(n)
  279. rn += n
  280. if n < int64(free) {
  281. return rn, err
  282. }
  283. }
  284. }
  285. func (pb *pageBuffer) WriteString(s string) (int, error) {
  286. return pb.Write([]byte(s))
  287. }
  288. func (pb *pageBuffer) Write(b []byte) (int, error) {
  289. wn := len(b)
  290. if wn == 0 {
  291. return 0, nil
  292. }
  293. if len(pb.pages) == 0 {
  294. pb.pages = append(pb.pages, pb.newPage())
  295. }
  296. for len(b) != 0 {
  297. tail := pb.pages[len(pb.pages)-1]
  298. free := tail.Cap() - tail.Len()
  299. if len(b) <= free {
  300. tail.Write(b)
  301. pb.length += len(b)
  302. break
  303. }
  304. tail.Write(b[:free])
  305. b = b[free:]
  306. pb.length += free
  307. pb.pages = append(pb.pages, pb.newPage())
  308. }
  309. return wn, nil
  310. }
  311. func (pb *pageBuffer) WriteAt(b []byte, off int64) (int, error) {
  312. n, err := pb.pages.WriteAt(b, off)
  313. if err != nil {
  314. return n, err
  315. }
  316. if n < len(b) {
  317. pb.Write(b[n:])
  318. }
  319. return len(b), nil
  320. }
  321. func (pb *pageBuffer) WriteTo(w io.Writer) (int64, error) {
  322. var wn int
  323. var err error
  324. pb.pages.scan(int64(pb.cursor), int64(pb.length), func(b []byte) bool {
  325. var n int
  326. n, err = w.Write(b)
  327. wn += n
  328. return err == nil
  329. })
  330. pb.cursor += wn
  331. return int64(wn), err
  332. }
  333. var (
  334. _ io.ReaderAt = (*pageBuffer)(nil)
  335. _ io.ReaderFrom = (*pageBuffer)(nil)
  336. _ io.StringWriter = (*pageBuffer)(nil)
  337. _ io.Writer = (*pageBuffer)(nil)
  338. _ io.WriterAt = (*pageBuffer)(nil)
  339. _ io.WriterTo = (*pageBuffer)(nil)
  340. pagePool sync.Pool
  341. pageBufferPool sync.Pool
  342. )
  343. type contiguousPages []*page
  344. func (pages contiguousPages) ref() {
  345. for _, p := range pages {
  346. p.ref()
  347. }
  348. }
  349. func (pages contiguousPages) unref() {
  350. for _, p := range pages {
  351. p.unref()
  352. }
  353. }
  354. func (pages contiguousPages) clear() {
  355. for i := range pages {
  356. pages[i] = nil
  357. }
  358. }
  359. func (pages contiguousPages) ReadAt(b []byte, off int64) (int, error) {
  360. rn := 0
  361. for _, p := range pages.slice(off, off+int64(len(b))) {
  362. n, _ := p.ReadAt(b, off)
  363. b = b[n:]
  364. rn += n
  365. off += int64(n)
  366. }
  367. return rn, nil
  368. }
  369. func (pages contiguousPages) WriteAt(b []byte, off int64) (int, error) {
  370. wn := 0
  371. for _, p := range pages.slice(off, off+int64(len(b))) {
  372. n, _ := p.WriteAt(b, off)
  373. b = b[n:]
  374. wn += n
  375. off += int64(n)
  376. }
  377. return wn, nil
  378. }
  379. func (pages contiguousPages) slice(begin, end int64) contiguousPages {
  380. i := pages.indexOf(begin)
  381. j := pages.indexOf(end)
  382. if j < len(pages) {
  383. j++
  384. }
  385. return pages[i:j]
  386. }
  387. func (pages contiguousPages) indexOf(offset int64) int {
  388. if len(pages) == 0 {
  389. return 0
  390. }
  391. return int((offset - pages[0].offset) / pageSize)
  392. }
  393. func (pages contiguousPages) scan(begin, end int64, f func([]byte) bool) {
  394. for _, p := range pages.slice(begin, end) {
  395. if !f(p.slice(begin, end)) {
  396. break
  397. }
  398. }
  399. }
  400. var (
  401. _ io.ReaderAt = contiguousPages{}
  402. _ io.WriterAt = contiguousPages{}
  403. )
  404. type pageRef struct {
  405. buffer [2]*page
  406. pages contiguousPages
  407. offset int64
  408. cursor int64
  409. length uint32
  410. once uint32
  411. }
  412. func (ref *pageRef) unref() {
  413. if atomic.CompareAndSwapUint32(&ref.once, 0, 1) {
  414. ref.pages.unref()
  415. ref.pages.clear()
  416. ref.pages = nil
  417. ref.offset = 0
  418. ref.cursor = 0
  419. ref.length = 0
  420. }
  421. }
  422. func (ref *pageRef) Len() int { return int(ref.Size() - ref.cursor) }
  423. func (ref *pageRef) Size() int64 { return int64(ref.length) }
  424. func (ref *pageRef) Close() error { ref.unref(); return nil }
  425. func (ref *pageRef) String() string {
  426. return fmt.Sprintf("[offset=%d cursor=%d length=%d]", ref.offset, ref.cursor, ref.length)
  427. }
  428. func (ref *pageRef) Seek(offset int64, whence int) (int64, error) {
  429. c, err := seek(ref.cursor, int64(ref.length), offset, whence)
  430. if err != nil {
  431. return -1, err
  432. }
  433. ref.cursor = c
  434. return c, nil
  435. }
  436. func (ref *pageRef) ReadByte() (byte, error) {
  437. var c byte
  438. var ok bool
  439. ref.scan(ref.cursor, func(b []byte) bool {
  440. c, ok = b[0], true
  441. return false
  442. })
  443. if ok {
  444. ref.cursor++
  445. } else {
  446. return 0, io.EOF
  447. }
  448. return c, nil
  449. }
  450. func (ref *pageRef) Read(b []byte) (int, error) {
  451. if ref.cursor >= int64(ref.length) {
  452. return 0, io.EOF
  453. }
  454. n, err := ref.ReadAt(b, ref.cursor)
  455. ref.cursor += int64(n)
  456. return n, err
  457. }
  458. func (ref *pageRef) ReadAt(b []byte, off int64) (int, error) {
  459. limit := ref.offset + int64(ref.length)
  460. off += ref.offset
  461. if off >= limit {
  462. return 0, io.EOF
  463. }
  464. if off+int64(len(b)) > limit {
  465. b = b[:limit-off]
  466. }
  467. if len(b) == 0 {
  468. return 0, nil
  469. }
  470. n, err := ref.pages.ReadAt(b, off)
  471. if n == 0 && err == nil {
  472. err = io.EOF
  473. }
  474. return n, err
  475. }
  476. func (ref *pageRef) WriteTo(w io.Writer) (wn int64, err error) {
  477. ref.scan(ref.cursor, func(b []byte) bool {
  478. var n int
  479. n, err = w.Write(b)
  480. wn += int64(n)
  481. return err == nil
  482. })
  483. ref.cursor += wn
  484. return
  485. }
  486. func (ref *pageRef) scan(off int64, f func([]byte) bool) {
  487. begin := ref.offset + off
  488. end := ref.offset + int64(ref.length)
  489. ref.pages.scan(begin, end, f)
  490. }
  491. var (
  492. _ io.Closer = (*pageRef)(nil)
  493. _ io.Seeker = (*pageRef)(nil)
  494. _ io.Reader = (*pageRef)(nil)
  495. _ io.ReaderAt = (*pageRef)(nil)
  496. _ io.WriterTo = (*pageRef)(nil)
  497. )
  498. type pageRefAllocator struct {
  499. refs []pageRef
  500. head int
  501. size int
  502. }
  503. func (a *pageRefAllocator) newPageRef() *pageRef {
  504. if a.head == len(a.refs) {
  505. a.refs = make([]pageRef, a.size)
  506. a.head = 0
  507. }
  508. ref := &a.refs[a.head]
  509. a.head++
  510. return ref
  511. }
  512. func seek(cursor, limit, offset int64, whence int) (int64, error) {
  513. switch whence {
  514. case io.SeekStart:
  515. // absolute offset
  516. case io.SeekCurrent:
  517. offset = cursor + offset
  518. case io.SeekEnd:
  519. offset = limit - offset
  520. default:
  521. return -1, fmt.Errorf("seek: invalid whence value: %d", whence)
  522. }
  523. if offset < 0 {
  524. offset = 0
  525. }
  526. if offset > limit {
  527. offset = limit
  528. }
  529. return offset, nil
  530. }
  531. func closeBytes(b Bytes) {
  532. if b != nil {
  533. b.Close()
  534. }
  535. }