streamlexer.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package buffer
  2. import (
  3. "io"
  4. )
  5. type block struct {
  6. buf []byte
  7. next int // index in pool plus one
  8. active bool
  9. }
  10. type bufferPool struct {
  11. pool []block
  12. head int // index in pool plus one
  13. tail int // index in pool plus one
  14. pos int // byte pos in tail
  15. }
  16. func (z *bufferPool) swap(oldBuf []byte, size int) []byte {
  17. // find new buffer that can be reused
  18. swap := -1
  19. for i := 0; i < len(z.pool); i++ {
  20. if !z.pool[i].active && size <= cap(z.pool[i].buf) {
  21. swap = i
  22. break
  23. }
  24. }
  25. if swap == -1 { // no free buffer found for reuse
  26. if z.tail == 0 && z.pos >= len(oldBuf) && size <= cap(oldBuf) { // but we can reuse the current buffer!
  27. z.pos -= len(oldBuf)
  28. return oldBuf[:0]
  29. }
  30. // allocate new
  31. z.pool = append(z.pool, block{make([]byte, 0, size), 0, true})
  32. swap = len(z.pool) - 1
  33. }
  34. newBuf := z.pool[swap].buf
  35. // put current buffer into pool
  36. z.pool[swap] = block{oldBuf, 0, true}
  37. if z.head != 0 {
  38. z.pool[z.head-1].next = swap + 1
  39. }
  40. z.head = swap + 1
  41. if z.tail == 0 {
  42. z.tail = swap + 1
  43. }
  44. return newBuf[:0]
  45. }
  46. func (z *bufferPool) free(n int) {
  47. z.pos += n
  48. // move the tail over to next buffers
  49. for z.tail != 0 && z.pos >= len(z.pool[z.tail-1].buf) {
  50. z.pos -= len(z.pool[z.tail-1].buf)
  51. newTail := z.pool[z.tail-1].next
  52. z.pool[z.tail-1].active = false // after this, any thread may pick up the inactive buffer, so it can't be used anymore
  53. z.tail = newTail
  54. }
  55. if z.tail == 0 {
  56. z.head = 0
  57. }
  58. }
  59. // StreamLexer is a buffered reader that allows peeking forward and shifting, taking an io.Reader.
  60. // It keeps data in-memory until Free, taking a byte length, is called to move beyond the data.
  61. type StreamLexer struct {
  62. r io.Reader
  63. err error
  64. pool bufferPool
  65. buf []byte
  66. start int // index in buf
  67. pos int // index in buf
  68. prevStart int
  69. free int
  70. }
  71. // NewStreamLexer returns a new StreamLexer for a given io.Reader with a 4kB estimated buffer size.
  72. // If the io.Reader implements Bytes, that buffer is used instead.
  73. func NewStreamLexer(r io.Reader) *StreamLexer {
  74. return NewStreamLexerSize(r, defaultBufSize)
  75. }
  76. // NewStreamLexerSize returns a new StreamLexer for a given io.Reader and estimated required buffer size.
  77. // If the io.Reader implements Bytes, that buffer is used instead.
  78. func NewStreamLexerSize(r io.Reader, size int) *StreamLexer {
  79. // if reader has the bytes in memory already, use that instead
  80. if buffer, ok := r.(interface {
  81. Bytes() []byte
  82. }); ok {
  83. return &StreamLexer{
  84. err: io.EOF,
  85. buf: buffer.Bytes(),
  86. }
  87. }
  88. return &StreamLexer{
  89. r: r,
  90. buf: make([]byte, 0, size),
  91. }
  92. }
  93. func (z *StreamLexer) read(pos int) byte {
  94. if z.err != nil {
  95. return 0
  96. }
  97. // free unused bytes
  98. z.pool.free(z.free)
  99. z.free = 0
  100. // get new buffer
  101. c := cap(z.buf)
  102. p := pos - z.start + 1
  103. if 2*p > c { // if the token is larger than half the buffer, increase buffer size
  104. c = 2*c + p
  105. }
  106. d := len(z.buf) - z.start
  107. buf := z.pool.swap(z.buf[:z.start], c)
  108. copy(buf[:d], z.buf[z.start:]) // copy the left-overs (unfinished token) from the old buffer
  109. // read in new data for the rest of the buffer
  110. var n int
  111. for pos-z.start >= d && z.err == nil {
  112. n, z.err = z.r.Read(buf[d:cap(buf)])
  113. d += n
  114. }
  115. pos -= z.start
  116. z.pos -= z.start
  117. z.start, z.buf = 0, buf[:d]
  118. if pos >= d {
  119. return 0
  120. }
  121. return z.buf[pos]
  122. }
  123. // Err returns the error returned from io.Reader. It may still return valid bytes for a while though.
  124. func (z *StreamLexer) Err() error {
  125. if z.err == io.EOF && z.pos < len(z.buf) {
  126. return nil
  127. }
  128. return z.err
  129. }
  130. // Free frees up bytes of length n from previously shifted tokens.
  131. // Each call to Shift should at one point be followed by a call to Free with a length returned by ShiftLen.
  132. func (z *StreamLexer) Free(n int) {
  133. z.free += n
  134. }
  135. // Peek returns the ith byte relative to the end position and possibly does an allocation.
  136. // Peek returns zero when an error has occurred, Err returns the error.
  137. // TODO: inline function
  138. func (z *StreamLexer) Peek(pos int) byte {
  139. pos += z.pos
  140. if uint(pos) < uint(len(z.buf)) { // uint for BCE
  141. return z.buf[pos]
  142. }
  143. return z.read(pos)
  144. }
  145. // PeekRune returns the rune and rune length of the ith byte relative to the end position.
  146. func (z *StreamLexer) PeekRune(pos int) (rune, int) {
  147. // from unicode/utf8
  148. c := z.Peek(pos)
  149. if c < 0xC0 {
  150. return rune(c), 1
  151. } else if c < 0xE0 {
  152. return rune(c&0x1F)<<6 | rune(z.Peek(pos+1)&0x3F), 2
  153. } else if c < 0xF0 {
  154. return rune(c&0x0F)<<12 | rune(z.Peek(pos+1)&0x3F)<<6 | rune(z.Peek(pos+2)&0x3F), 3
  155. }
  156. return rune(c&0x07)<<18 | rune(z.Peek(pos+1)&0x3F)<<12 | rune(z.Peek(pos+2)&0x3F)<<6 | rune(z.Peek(pos+3)&0x3F), 4
  157. }
  158. // Move advances the position.
  159. func (z *StreamLexer) Move(n int) {
  160. z.pos += n
  161. }
  162. // Pos returns a mark to which can be rewinded.
  163. func (z *StreamLexer) Pos() int {
  164. return z.pos - z.start
  165. }
  166. // Rewind rewinds the position to the given position.
  167. func (z *StreamLexer) Rewind(pos int) {
  168. z.pos = z.start + pos
  169. }
  170. // Lexeme returns the bytes of the current selection.
  171. func (z *StreamLexer) Lexeme() []byte {
  172. return z.buf[z.start:z.pos]
  173. }
  174. // Skip collapses the position to the end of the selection.
  175. func (z *StreamLexer) Skip() {
  176. z.start = z.pos
  177. }
  178. // Shift returns the bytes of the current selection and collapses the position to the end of the selection.
  179. // It also returns the number of bytes we moved since the last call to Shift. This can be used in calls to Free.
  180. func (z *StreamLexer) Shift() []byte {
  181. if z.pos > len(z.buf) { // make sure we peeked at least as much as we shift
  182. z.read(z.pos - 1)
  183. }
  184. b := z.buf[z.start:z.pos]
  185. z.start = z.pos
  186. return b
  187. }
  188. // ShiftLen returns the number of bytes moved since the last call to ShiftLen. This can be used in calls to Free because it takes into account multiple Shifts or Skips.
  189. func (z *StreamLexer) ShiftLen() int {
  190. n := z.start - z.prevStart
  191. z.prevStart = z.start
  192. return n
  193. }