reader.go 5.6 KB


  1. package lz4
  2. import (
  3. "io"
  4. "github.com/pierrec/lz4/v4/internal/lz4block"
  5. "github.com/pierrec/lz4/v4/internal/lz4errors"
  6. "github.com/pierrec/lz4/v4/internal/lz4stream"
  7. )
  8. var readerStates = []aState{
  9. noState: newState,
  10. errorState: newState,
  11. newState: readState,
  12. readState: closedState,
  13. closedState: newState,
  14. }
  15. // NewReader returns a new LZ4 frame decoder.
  16. func NewReader(r io.Reader) *Reader {
  17. return newReader(r, false)
  18. }
  19. func newReader(r io.Reader, legacy bool) *Reader {
  20. zr := &Reader{frame: lz4stream.NewFrame()}
  21. zr.state.init(readerStates)
  22. _ = zr.Apply(DefaultConcurrency, defaultOnBlockDone)
  23. zr.Reset(r)
  24. return zr
  25. }
  26. // Reader allows reading an LZ4 stream.
  27. type Reader struct {
  28. state _State
  29. src io.Reader // source reader
  30. num int // concurrency level
  31. frame *lz4stream.Frame // frame being read
  32. data []byte // block buffer allocated in non concurrent mode
  33. reads chan []byte // pending data
  34. idx int // size of pending data
  35. handler func(int)
  36. cum uint32
  37. dict []byte
  38. }
  39. func (*Reader) private() {}
  40. func (r *Reader) Apply(options ...Option) (err error) {
  41. defer r.state.check(&err)
  42. switch r.state.state {
  43. case newState:
  44. case errorState:
  45. return r.state.err
  46. default:
  47. return lz4errors.ErrOptionClosedOrError
  48. }
  49. for _, o := range options {
  50. if err = o(r); err != nil {
  51. return
  52. }
  53. }
  54. return
  55. }
  56. // Size returns the size of the underlying uncompressed data, if set in the stream.
  57. func (r *Reader) Size() int {
  58. switch r.state.state {
  59. case readState, closedState:
  60. if r.frame.Descriptor.Flags.Size() {
  61. return int(r.frame.Descriptor.ContentSize)
  62. }
  63. }
  64. return 0
  65. }
  66. func (r *Reader) isNotConcurrent() bool {
  67. return r.num == 1
  68. }
  69. func (r *Reader) init() error {
  70. err := r.frame.ParseHeaders(r.src)
  71. if err != nil {
  72. return err
  73. }
  74. if !r.frame.Descriptor.Flags.BlockIndependence() {
  75. // We can't decompress dependent blocks concurrently.
  76. // Instead of throwing an error to the user, silently drop concurrency
  77. r.num = 1
  78. }
  79. data, err := r.frame.InitR(r.src, r.num)
  80. if err != nil {
  81. return err
  82. }
  83. r.reads = data
  84. r.idx = 0
  85. size := r.frame.Descriptor.Flags.BlockSizeIndex()
  86. r.data = size.Get()
  87. r.cum = 0
  88. return nil
  89. }
  90. func (r *Reader) Read(buf []byte) (n int, err error) {
  91. defer r.state.check(&err)
  92. switch r.state.state {
  93. case readState:
  94. case closedState, errorState:
  95. return 0, r.state.err
  96. case newState:
  97. // First initialization.
  98. if err = r.init(); r.state.next(err) {
  99. return
  100. }
  101. default:
  102. return 0, r.state.fail()
  103. }
  104. for len(buf) > 0 {
  105. var bn int
  106. if r.idx == 0 {
  107. if r.isNotConcurrent() {
  108. bn, err = r.read(buf)
  109. } else {
  110. lz4block.Put(r.data)
  111. r.data = <-r.reads
  112. if len(r.data) == 0 {
  113. // No uncompressed data: something went wrong or we are done.
  114. err = r.frame.Blocks.ErrorR()
  115. }
  116. }
  117. switch err {
  118. case nil:
  119. case io.EOF:
  120. if er := r.frame.CloseR(r.src); er != nil {
  121. err = er
  122. }
  123. lz4block.Put(r.data)
  124. r.data = nil
  125. return
  126. default:
  127. return
  128. }
  129. }
  130. if bn == 0 {
  131. // Fill buf with buffered data.
  132. bn = copy(buf, r.data[r.idx:])
  133. r.idx += bn
  134. if r.idx == len(r.data) {
  135. // All data read, get ready for the next Read.
  136. r.idx = 0
  137. }
  138. }
  139. buf = buf[bn:]
  140. n += bn
  141. r.handler(bn)
  142. }
  143. return
  144. }
  145. // read uncompresses the next block as follow:
  146. // - if buf has enough room, the block is uncompressed into it directly
  147. // and the lenght of used space is returned
  148. // - else, the uncompress data is stored in r.data and 0 is returned
  149. func (r *Reader) read(buf []byte) (int, error) {
  150. block := r.frame.Blocks.Block
  151. _, err := block.Read(r.frame, r.src, r.cum)
  152. if err != nil {
  153. return 0, err
  154. }
  155. var direct bool
  156. dst := r.data[:cap(r.data)]
  157. if len(buf) >= len(dst) {
  158. // Uncompress directly into buf.
  159. direct = true
  160. dst = buf
  161. }
  162. dst, err = block.Uncompress(r.frame, dst, r.dict, true)
  163. if err != nil {
  164. return 0, err
  165. }
  166. if !r.frame.Descriptor.Flags.BlockIndependence() {
  167. if len(r.dict)+len(dst) > 128*1024 {
  168. preserveSize := 64*1024 - len(dst)
  169. if preserveSize < 0 {
  170. preserveSize = 0
  171. }
  172. r.dict = r.dict[len(r.dict)-preserveSize:]
  173. }
  174. r.dict = append(r.dict, dst...)
  175. }
  176. r.cum += uint32(len(dst))
  177. if direct {
  178. return len(dst), nil
  179. }
  180. r.data = dst
  181. return 0, nil
  182. }
  183. // Reset clears the state of the Reader r such that it is equivalent to its
  184. // initial state from NewReader, but instead writing to writer.
  185. // No access to reader is performed.
  186. //
  187. // w.Close must be called before Reset.
  188. func (r *Reader) Reset(reader io.Reader) {
  189. if r.data != nil {
  190. lz4block.Put(r.data)
  191. r.data = nil
  192. }
  193. r.frame.Reset(r.num)
  194. r.state.reset()
  195. r.src = reader
  196. r.reads = nil
  197. }
  198. // WriteTo efficiently uncompresses the data from the Reader underlying source to w.
  199. func (r *Reader) WriteTo(w io.Writer) (n int64, err error) {
  200. switch r.state.state {
  201. case closedState, errorState:
  202. return 0, r.state.err
  203. case newState:
  204. if err = r.init(); r.state.next(err) {
  205. return
  206. }
  207. default:
  208. return 0, r.state.fail()
  209. }
  210. defer r.state.nextd(&err)
  211. var data []byte
  212. if r.isNotConcurrent() {
  213. size := r.frame.Descriptor.Flags.BlockSizeIndex()
  214. data = size.Get()
  215. defer lz4block.Put(data)
  216. }
  217. for {
  218. var bn int
  219. var dst []byte
  220. if r.isNotConcurrent() {
  221. bn, err = r.read(data)
  222. dst = data[:bn]
  223. } else {
  224. lz4block.Put(dst)
  225. dst = <-r.reads
  226. bn = len(dst)
  227. if bn == 0 {
  228. // No uncompressed data: something went wrong or we are done.
  229. err = r.frame.Blocks.ErrorR()
  230. }
  231. }
  232. switch err {
  233. case nil:
  234. case io.EOF:
  235. err = r.frame.CloseR(r.src)
  236. return
  237. default:
  238. return
  239. }
  240. r.handler(bn)
  241. bn, err = w.Write(dst)
  242. n += int64(bn)
  243. if err != nil {
  244. return
  245. }
  246. }
  247. }