reader.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062
  1. // Copyright 2011 The Snappy-Go Authors. All rights reserved.
  2. // Copyright (c) 2019+ Klaus Post. All rights reserved.
  3. // Use of this source code is governed by a BSD-style
  4. // license that can be found in the LICENSE file.
  5. package s2
  6. import (
  7. "errors"
  8. "fmt"
  9. "io"
  10. "io/ioutil"
  11. "math"
  12. "runtime"
  13. "sync"
  14. )
  15. // ErrCantSeek is returned if the stream cannot be seeked.
  16. type ErrCantSeek struct {
  17. Reason string
  18. }
  19. // Error returns the error as string.
  20. func (e ErrCantSeek) Error() string {
  21. return fmt.Sprintf("s2: Can't seek because %s", e.Reason)
  22. }
  23. // NewReader returns a new Reader that decompresses from r, using the framing
  24. // format described at
  25. // https://github.com/google/snappy/blob/master/framing_format.txt with S2 changes.
  26. func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
  27. nr := Reader{
  28. r: r,
  29. maxBlock: maxBlockSize,
  30. }
  31. for _, opt := range opts {
  32. if err := opt(&nr); err != nil {
  33. nr.err = err
  34. return &nr
  35. }
  36. }
  37. nr.maxBufSize = MaxEncodedLen(nr.maxBlock) + checksumSize
  38. if nr.lazyBuf > 0 {
  39. nr.buf = make([]byte, MaxEncodedLen(nr.lazyBuf)+checksumSize)
  40. } else {
  41. nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize)
  42. }
  43. nr.readHeader = nr.ignoreStreamID
  44. nr.paramsOK = true
  45. return &nr
  46. }
  47. // ReaderOption is an option for creating a decoder.
  48. type ReaderOption func(*Reader) error
  49. // ReaderMaxBlockSize allows to control allocations if the stream
  50. // has been compressed with a smaller WriterBlockSize, or with the default 1MB.
  51. // Blocks must be this size or smaller to decompress,
  52. // otherwise the decoder will return ErrUnsupported.
  53. //
  54. // For streams compressed with Snappy this can safely be set to 64KB (64 << 10).
  55. //
  56. // Default is the maximum limit of 4MB.
  57. func ReaderMaxBlockSize(blockSize int) ReaderOption {
  58. return func(r *Reader) error {
  59. if blockSize > maxBlockSize || blockSize <= 0 {
  60. return errors.New("s2: block size too large. Must be <= 4MB and > 0")
  61. }
  62. if r.lazyBuf == 0 && blockSize < defaultBlockSize {
  63. r.lazyBuf = blockSize
  64. }
  65. r.maxBlock = blockSize
  66. return nil
  67. }
  68. }
  69. // ReaderAllocBlock allows to control upfront stream allocations
  70. // and not allocate for frames bigger than this initially.
  71. // If frames bigger than this is seen a bigger buffer will be allocated.
  72. //
  73. // Default is 1MB, which is default output size.
  74. func ReaderAllocBlock(blockSize int) ReaderOption {
  75. return func(r *Reader) error {
  76. if blockSize > maxBlockSize || blockSize < 1024 {
  77. return errors.New("s2: invalid ReaderAllocBlock. Must be <= 4MB and >= 1024")
  78. }
  79. r.lazyBuf = blockSize
  80. return nil
  81. }
  82. }
  83. // ReaderIgnoreStreamIdentifier will make the reader skip the expected
  84. // stream identifier at the beginning of the stream.
  85. // This can be used when serving a stream that has been forwarded to a specific point.
  86. func ReaderIgnoreStreamIdentifier() ReaderOption {
  87. return func(r *Reader) error {
  88. r.ignoreStreamID = true
  89. return nil
  90. }
  91. }
  92. // ReaderSkippableCB will register a callback for chuncks with the specified ID.
  93. // ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
  94. // For each chunk with the ID, the callback is called with the content.
  95. // Any returned non-nil error will abort decompression.
  96. // Only one callback per ID is supported, latest sent will be used.
  97. func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption {
  98. return func(r *Reader) error {
  99. if id < 0x80 || id > 0xfd {
  100. return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)")
  101. }
  102. r.skippableCB[id] = fn
  103. return nil
  104. }
  105. }
  106. // ReaderIgnoreCRC will make the reader skip CRC calculation and checks.
  107. func ReaderIgnoreCRC() ReaderOption {
  108. return func(r *Reader) error {
  109. r.ignoreCRC = true
  110. return nil
  111. }
  112. }
  113. // Reader is an io.Reader that can read Snappy-compressed bytes.
  114. type Reader struct {
  115. r io.Reader
  116. err error
  117. decoded []byte
  118. buf []byte
  119. skippableCB [0x80]func(r io.Reader) error
  120. blockStart int64 // Uncompressed offset at start of current.
  121. index *Index
  122. // decoded[i:j] contains decoded bytes that have not yet been passed on.
  123. i, j int
  124. // maximum block size allowed.
  125. maxBlock int
  126. // maximum expected buffer size.
  127. maxBufSize int
  128. // alloc a buffer this size if > 0.
  129. lazyBuf int
  130. readHeader bool
  131. paramsOK bool
  132. snappyFrame bool
  133. ignoreStreamID bool
  134. ignoreCRC bool
  135. }
  136. // GetBufferCapacity returns the capacity of the internal buffer.
  137. // This might be useful to know when reusing the same reader in combination
  138. // with the lazy buffer option.
  139. func (r *Reader) GetBufferCapacity() int {
  140. return cap(r.buf)
  141. }
  142. // ensureBufferSize will ensure that the buffer can take at least n bytes.
  143. // If false is returned the buffer exceeds maximum allowed size.
  144. func (r *Reader) ensureBufferSize(n int) bool {
  145. if n > r.maxBufSize {
  146. r.err = ErrCorrupt
  147. return false
  148. }
  149. if cap(r.buf) >= n {
  150. return true
  151. }
  152. // Realloc buffer.
  153. r.buf = make([]byte, n)
  154. return true
  155. }
  156. // Reset discards any buffered data, resets all state, and switches the Snappy
  157. // reader to read from r. This permits reusing a Reader rather than allocating
  158. // a new one.
  159. func (r *Reader) Reset(reader io.Reader) {
  160. if !r.paramsOK {
  161. return
  162. }
  163. r.index = nil
  164. r.r = reader
  165. r.err = nil
  166. r.i = 0
  167. r.j = 0
  168. r.blockStart = 0
  169. r.readHeader = r.ignoreStreamID
  170. }
  171. func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
  172. if _, r.err = io.ReadFull(r.r, p); r.err != nil {
  173. if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
  174. r.err = ErrCorrupt
  175. }
  176. return false
  177. }
  178. return true
  179. }
  180. // skippable will skip n bytes.
  181. // If the supplied reader supports seeking that is used.
  182. // tmp is used as a temporary buffer for reading.
  183. // The supplied slice does not need to be the size of the read.
  184. func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) {
  185. if id < 0x80 {
  186. r.err = fmt.Errorf("interbal error: skippable id < 0x80")
  187. return false
  188. }
  189. if fn := r.skippableCB[id-0x80]; fn != nil {
  190. rd := io.LimitReader(r.r, int64(n))
  191. r.err = fn(rd)
  192. if r.err != nil {
  193. return false
  194. }
  195. _, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp)
  196. return r.err == nil
  197. }
  198. if rs, ok := r.r.(io.ReadSeeker); ok {
  199. _, err := rs.Seek(int64(n), io.SeekCurrent)
  200. if err == nil {
  201. return true
  202. }
  203. if err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
  204. r.err = ErrCorrupt
  205. return false
  206. }
  207. }
  208. for n > 0 {
  209. if n < len(tmp) {
  210. tmp = tmp[:n]
  211. }
  212. if _, r.err = io.ReadFull(r.r, tmp); r.err != nil {
  213. if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
  214. r.err = ErrCorrupt
  215. }
  216. return false
  217. }
  218. n -= len(tmp)
  219. }
  220. return true
  221. }
  222. // Read satisfies the io.Reader interface.
  223. func (r *Reader) Read(p []byte) (int, error) {
  224. if r.err != nil {
  225. return 0, r.err
  226. }
  227. for {
  228. if r.i < r.j {
  229. n := copy(p, r.decoded[r.i:r.j])
  230. r.i += n
  231. return n, nil
  232. }
  233. if !r.readFull(r.buf[:4], true) {
  234. return 0, r.err
  235. }
  236. chunkType := r.buf[0]
  237. if !r.readHeader {
  238. if chunkType != chunkTypeStreamIdentifier {
  239. r.err = ErrCorrupt
  240. return 0, r.err
  241. }
  242. r.readHeader = true
  243. }
  244. chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
  245. // The chunk types are specified at
  246. // https://github.com/google/snappy/blob/master/framing_format.txt
  247. switch chunkType {
  248. case chunkTypeCompressedData:
  249. r.blockStart += int64(r.j)
  250. // Section 4.2. Compressed data (chunk type 0x00).
  251. if chunkLen < checksumSize {
  252. r.err = ErrCorrupt
  253. return 0, r.err
  254. }
  255. if !r.ensureBufferSize(chunkLen) {
  256. if r.err == nil {
  257. r.err = ErrUnsupported
  258. }
  259. return 0, r.err
  260. }
  261. buf := r.buf[:chunkLen]
  262. if !r.readFull(buf, false) {
  263. return 0, r.err
  264. }
  265. checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
  266. buf = buf[checksumSize:]
  267. n, err := DecodedLen(buf)
  268. if err != nil {
  269. r.err = err
  270. return 0, r.err
  271. }
  272. if r.snappyFrame && n > maxSnappyBlockSize {
  273. r.err = ErrCorrupt
  274. return 0, r.err
  275. }
  276. if n > len(r.decoded) {
  277. if n > r.maxBlock {
  278. r.err = ErrCorrupt
  279. return 0, r.err
  280. }
  281. r.decoded = make([]byte, n)
  282. }
  283. if _, err := Decode(r.decoded, buf); err != nil {
  284. r.err = err
  285. return 0, r.err
  286. }
  287. if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
  288. r.err = ErrCRC
  289. return 0, r.err
  290. }
  291. r.i, r.j = 0, n
  292. continue
  293. case chunkTypeUncompressedData:
  294. r.blockStart += int64(r.j)
  295. // Section 4.3. Uncompressed data (chunk type 0x01).
  296. if chunkLen < checksumSize {
  297. r.err = ErrCorrupt
  298. return 0, r.err
  299. }
  300. if !r.ensureBufferSize(chunkLen) {
  301. if r.err == nil {
  302. r.err = ErrUnsupported
  303. }
  304. return 0, r.err
  305. }
  306. buf := r.buf[:checksumSize]
  307. if !r.readFull(buf, false) {
  308. return 0, r.err
  309. }
  310. checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
  311. // Read directly into r.decoded instead of via r.buf.
  312. n := chunkLen - checksumSize
  313. if r.snappyFrame && n > maxSnappyBlockSize {
  314. r.err = ErrCorrupt
  315. return 0, r.err
  316. }
  317. if n > len(r.decoded) {
  318. if n > r.maxBlock {
  319. r.err = ErrCorrupt
  320. return 0, r.err
  321. }
  322. r.decoded = make([]byte, n)
  323. }
  324. if !r.readFull(r.decoded[:n], false) {
  325. return 0, r.err
  326. }
  327. if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
  328. r.err = ErrCRC
  329. return 0, r.err
  330. }
  331. r.i, r.j = 0, n
  332. continue
  333. case chunkTypeStreamIdentifier:
  334. // Section 4.1. Stream identifier (chunk type 0xff).
  335. if chunkLen != len(magicBody) {
  336. r.err = ErrCorrupt
  337. return 0, r.err
  338. }
  339. if !r.readFull(r.buf[:len(magicBody)], false) {
  340. return 0, r.err
  341. }
  342. if string(r.buf[:len(magicBody)]) != magicBody {
  343. if string(r.buf[:len(magicBody)]) != magicBodySnappy {
  344. r.err = ErrCorrupt
  345. return 0, r.err
  346. } else {
  347. r.snappyFrame = true
  348. }
  349. } else {
  350. r.snappyFrame = false
  351. }
  352. continue
  353. }
  354. if chunkType <= 0x7f {
  355. // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
  356. // fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
  357. r.err = ErrUnsupported
  358. return 0, r.err
  359. }
  360. // Section 4.4 Padding (chunk type 0xfe).
  361. // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
  362. if chunkLen > maxChunkSize {
  363. // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
  364. r.err = ErrUnsupported
  365. return 0, r.err
  366. }
  367. // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
  368. if !r.skippable(r.buf, chunkLen, false, chunkType) {
  369. return 0, r.err
  370. }
  371. }
  372. }
  373. // DecodeConcurrent will decode the full stream to w.
  374. // This function should not be combined with reading, seeking or other operations.
  375. // Up to 'concurrent' goroutines will be used.
  376. // If <= 0, runtime.NumCPU will be used.
  377. // On success the number of bytes decompressed nil and is returned.
  378. // This is mainly intended for bigger streams.
  379. func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, err error) {
  380. if r.i > 0 || r.j > 0 || r.blockStart > 0 {
  381. return 0, errors.New("DecodeConcurrent called after ")
  382. }
  383. if concurrent <= 0 {
  384. concurrent = runtime.NumCPU()
  385. }
  386. // Write to output
  387. var errMu sync.Mutex
  388. var aErr error
  389. setErr := func(e error) (ok bool) {
  390. errMu.Lock()
  391. defer errMu.Unlock()
  392. if e == nil {
  393. return aErr == nil
  394. }
  395. if aErr == nil {
  396. aErr = e
  397. }
  398. return false
  399. }
  400. hasErr := func() (ok bool) {
  401. errMu.Lock()
  402. v := aErr != nil
  403. errMu.Unlock()
  404. return v
  405. }
  406. var aWritten int64
  407. toRead := make(chan []byte, concurrent)
  408. writtenBlocks := make(chan []byte, concurrent)
  409. queue := make(chan chan []byte, concurrent)
  410. reUse := make(chan chan []byte, concurrent)
  411. for i := 0; i < concurrent; i++ {
  412. toRead <- make([]byte, 0, r.maxBufSize)
  413. writtenBlocks <- make([]byte, 0, r.maxBufSize)
  414. reUse <- make(chan []byte, 1)
  415. }
  416. // Writer
  417. var wg sync.WaitGroup
  418. wg.Add(1)
  419. go func() {
  420. defer wg.Done()
  421. for toWrite := range queue {
  422. entry := <-toWrite
  423. reUse <- toWrite
  424. if hasErr() {
  425. writtenBlocks <- entry
  426. continue
  427. }
  428. n, err := w.Write(entry)
  429. want := len(entry)
  430. writtenBlocks <- entry
  431. if err != nil {
  432. setErr(err)
  433. continue
  434. }
  435. if n != want {
  436. setErr(io.ErrShortWrite)
  437. continue
  438. }
  439. aWritten += int64(n)
  440. }
  441. }()
  442. // Reader
  443. defer func() {
  444. close(queue)
  445. if r.err != nil {
  446. err = r.err
  447. setErr(r.err)
  448. }
  449. wg.Wait()
  450. if err == nil {
  451. err = aErr
  452. }
  453. written = aWritten
  454. }()
  455. for !hasErr() {
  456. if !r.readFull(r.buf[:4], true) {
  457. if r.err == io.EOF {
  458. r.err = nil
  459. }
  460. return 0, r.err
  461. }
  462. chunkType := r.buf[0]
  463. if !r.readHeader {
  464. if chunkType != chunkTypeStreamIdentifier {
  465. r.err = ErrCorrupt
  466. return 0, r.err
  467. }
  468. r.readHeader = true
  469. }
  470. chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
  471. // The chunk types are specified at
  472. // https://github.com/google/snappy/blob/master/framing_format.txt
  473. switch chunkType {
  474. case chunkTypeCompressedData:
  475. r.blockStart += int64(r.j)
  476. // Section 4.2. Compressed data (chunk type 0x00).
  477. if chunkLen < checksumSize {
  478. r.err = ErrCorrupt
  479. return 0, r.err
  480. }
  481. if chunkLen > r.maxBufSize {
  482. r.err = ErrCorrupt
  483. return 0, r.err
  484. }
  485. orgBuf := <-toRead
  486. buf := orgBuf[:chunkLen]
  487. if !r.readFull(buf, false) {
  488. return 0, r.err
  489. }
  490. checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
  491. buf = buf[checksumSize:]
  492. n, err := DecodedLen(buf)
  493. if err != nil {
  494. r.err = err
  495. return 0, r.err
  496. }
  497. if r.snappyFrame && n > maxSnappyBlockSize {
  498. r.err = ErrCorrupt
  499. return 0, r.err
  500. }
  501. if n > r.maxBlock {
  502. r.err = ErrCorrupt
  503. return 0, r.err
  504. }
  505. wg.Add(1)
  506. decoded := <-writtenBlocks
  507. entry := <-reUse
  508. queue <- entry
  509. go func() {
  510. defer wg.Done()
  511. decoded = decoded[:n]
  512. _, err := Decode(decoded, buf)
  513. toRead <- orgBuf
  514. if err != nil {
  515. writtenBlocks <- decoded
  516. setErr(err)
  517. return
  518. }
  519. if !r.ignoreCRC && crc(decoded) != checksum {
  520. writtenBlocks <- decoded
  521. setErr(ErrCRC)
  522. return
  523. }
  524. entry <- decoded
  525. }()
  526. continue
  527. case chunkTypeUncompressedData:
  528. // Section 4.3. Uncompressed data (chunk type 0x01).
  529. if chunkLen < checksumSize {
  530. r.err = ErrCorrupt
  531. return 0, r.err
  532. }
  533. if chunkLen > r.maxBufSize {
  534. r.err = ErrCorrupt
  535. return 0, r.err
  536. }
  537. // Grab write buffer
  538. orgBuf := <-writtenBlocks
  539. buf := orgBuf[:checksumSize]
  540. if !r.readFull(buf, false) {
  541. return 0, r.err
  542. }
  543. checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
  544. // Read content.
  545. n := chunkLen - checksumSize
  546. if r.snappyFrame && n > maxSnappyBlockSize {
  547. r.err = ErrCorrupt
  548. return 0, r.err
  549. }
  550. if n > r.maxBlock {
  551. r.err = ErrCorrupt
  552. return 0, r.err
  553. }
  554. // Read uncompressed
  555. buf = orgBuf[:n]
  556. if !r.readFull(buf, false) {
  557. return 0, r.err
  558. }
  559. if !r.ignoreCRC && crc(buf) != checksum {
  560. r.err = ErrCRC
  561. return 0, r.err
  562. }
  563. entry := <-reUse
  564. queue <- entry
  565. entry <- buf
  566. continue
  567. case chunkTypeStreamIdentifier:
  568. // Section 4.1. Stream identifier (chunk type 0xff).
  569. if chunkLen != len(magicBody) {
  570. r.err = ErrCorrupt
  571. return 0, r.err
  572. }
  573. if !r.readFull(r.buf[:len(magicBody)], false) {
  574. return 0, r.err
  575. }
  576. if string(r.buf[:len(magicBody)]) != magicBody {
  577. if string(r.buf[:len(magicBody)]) != magicBodySnappy {
  578. r.err = ErrCorrupt
  579. return 0, r.err
  580. } else {
  581. r.snappyFrame = true
  582. }
  583. } else {
  584. r.snappyFrame = false
  585. }
  586. continue
  587. }
  588. if chunkType <= 0x7f {
  589. // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
  590. // fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
  591. r.err = ErrUnsupported
  592. return 0, r.err
  593. }
  594. // Section 4.4 Padding (chunk type 0xfe).
  595. // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
  596. if chunkLen > maxChunkSize {
  597. // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
  598. r.err = ErrUnsupported
  599. return 0, r.err
  600. }
  601. // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
  602. if !r.skippable(r.buf, chunkLen, false, chunkType) {
  603. return 0, r.err
  604. }
  605. }
  606. return 0, r.err
  607. }
  608. // Skip will skip n bytes forward in the decompressed output.
  609. // For larger skips this consumes less CPU and is faster than reading output and discarding it.
  610. // CRC is not checked on skipped blocks.
  611. // io.ErrUnexpectedEOF is returned if the stream ends before all bytes have been skipped.
  612. // If a decoding error is encountered subsequent calls to Read will also fail.
  613. func (r *Reader) Skip(n int64) error {
  614. if n < 0 {
  615. return errors.New("attempted negative skip")
  616. }
  617. if r.err != nil {
  618. return r.err
  619. }
  620. for n > 0 {
  621. if r.i < r.j {
  622. // Skip in buffer.
  623. // decoded[i:j] contains decoded bytes that have not yet been passed on.
  624. left := int64(r.j - r.i)
  625. if left >= n {
  626. tmp := int64(r.i) + n
  627. if tmp > math.MaxInt32 {
  628. return errors.New("s2: internal overflow in skip")
  629. }
  630. r.i = int(tmp)
  631. return nil
  632. }
  633. n -= int64(r.j - r.i)
  634. r.i = r.j
  635. }
  636. // Buffer empty; read blocks until we have content.
  637. if !r.readFull(r.buf[:4], true) {
  638. if r.err == io.EOF {
  639. r.err = io.ErrUnexpectedEOF
  640. }
  641. return r.err
  642. }
  643. chunkType := r.buf[0]
  644. if !r.readHeader {
  645. if chunkType != chunkTypeStreamIdentifier {
  646. r.err = ErrCorrupt
  647. return r.err
  648. }
  649. r.readHeader = true
  650. }
  651. chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
  652. // The chunk types are specified at
  653. // https://github.com/google/snappy/blob/master/framing_format.txt
  654. switch chunkType {
  655. case chunkTypeCompressedData:
  656. r.blockStart += int64(r.j)
  657. // Section 4.2. Compressed data (chunk type 0x00).
  658. if chunkLen < checksumSize {
  659. r.err = ErrCorrupt
  660. return r.err
  661. }
  662. if !r.ensureBufferSize(chunkLen) {
  663. if r.err == nil {
  664. r.err = ErrUnsupported
  665. }
  666. return r.err
  667. }
  668. buf := r.buf[:chunkLen]
  669. if !r.readFull(buf, false) {
  670. return r.err
  671. }
  672. checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
  673. buf = buf[checksumSize:]
  674. dLen, err := DecodedLen(buf)
  675. if err != nil {
  676. r.err = err
  677. return r.err
  678. }
  679. if dLen > r.maxBlock {
  680. r.err = ErrCorrupt
  681. return r.err
  682. }
  683. // Check if destination is within this block
  684. if int64(dLen) > n {
  685. if len(r.decoded) < dLen {
  686. r.decoded = make([]byte, dLen)
  687. }
  688. if _, err := Decode(r.decoded, buf); err != nil {
  689. r.err = err
  690. return r.err
  691. }
  692. if crc(r.decoded[:dLen]) != checksum {
  693. r.err = ErrCorrupt
  694. return r.err
  695. }
  696. } else {
  697. // Skip block completely
  698. n -= int64(dLen)
  699. r.blockStart += int64(dLen)
  700. dLen = 0
  701. }
  702. r.i, r.j = 0, dLen
  703. continue
  704. case chunkTypeUncompressedData:
  705. r.blockStart += int64(r.j)
  706. // Section 4.3. Uncompressed data (chunk type 0x01).
  707. if chunkLen < checksumSize {
  708. r.err = ErrCorrupt
  709. return r.err
  710. }
  711. if !r.ensureBufferSize(chunkLen) {
  712. if r.err != nil {
  713. r.err = ErrUnsupported
  714. }
  715. return r.err
  716. }
  717. buf := r.buf[:checksumSize]
  718. if !r.readFull(buf, false) {
  719. return r.err
  720. }
  721. checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
  722. // Read directly into r.decoded instead of via r.buf.
  723. n2 := chunkLen - checksumSize
  724. if n2 > len(r.decoded) {
  725. if n2 > r.maxBlock {
  726. r.err = ErrCorrupt
  727. return r.err
  728. }
  729. r.decoded = make([]byte, n2)
  730. }
  731. if !r.readFull(r.decoded[:n2], false) {
  732. return r.err
  733. }
  734. if int64(n2) < n {
  735. if crc(r.decoded[:n2]) != checksum {
  736. r.err = ErrCorrupt
  737. return r.err
  738. }
  739. }
  740. r.i, r.j = 0, n2
  741. continue
  742. case chunkTypeStreamIdentifier:
  743. // Section 4.1. Stream identifier (chunk type 0xff).
  744. if chunkLen != len(magicBody) {
  745. r.err = ErrCorrupt
  746. return r.err
  747. }
  748. if !r.readFull(r.buf[:len(magicBody)], false) {
  749. return r.err
  750. }
  751. if string(r.buf[:len(magicBody)]) != magicBody {
  752. if string(r.buf[:len(magicBody)]) != magicBodySnappy {
  753. r.err = ErrCorrupt
  754. return r.err
  755. }
  756. }
  757. continue
  758. }
  759. if chunkType <= 0x7f {
  760. // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
  761. r.err = ErrUnsupported
  762. return r.err
  763. }
  764. if chunkLen > maxChunkSize {
  765. r.err = ErrUnsupported
  766. return r.err
  767. }
  768. // Section 4.4 Padding (chunk type 0xfe).
  769. // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
  770. if !r.skippable(r.buf, chunkLen, false, chunkType) {
  771. return r.err
  772. }
  773. }
  774. return nil
  775. }
  776. // ReadSeeker provides random or forward seeking in compressed content.
  777. // See Reader.ReadSeeker
  778. type ReadSeeker struct {
  779. *Reader
  780. readAtMu sync.Mutex
  781. }
  782. // ReadSeeker will return an io.ReadSeeker and io.ReaderAt
  783. // compatible version of the reader.
  784. // If 'random' is specified the returned io.Seeker can be used for
  785. // random seeking, otherwise only forward seeking is supported.
  786. // Enabling random seeking requires the original input to support
  787. // the io.Seeker interface.
  788. // A custom index can be specified which will be used if supplied.
  789. // When using a custom index, it will not be read from the input stream.
  790. // The ReadAt position will affect regular reads and the current position of Seek.
  791. // So using Read after ReadAt will continue from where the ReadAt stopped.
  792. // No functions should be used concurrently.
  793. // The returned ReadSeeker contains a shallow reference to the existing Reader,
  794. // meaning changes performed to one is reflected in the other.
  795. func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) {
  796. // Read index if provided.
  797. if len(index) != 0 {
  798. if r.index == nil {
  799. r.index = &Index{}
  800. }
  801. if _, err := r.index.Load(index); err != nil {
  802. return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()}
  803. }
  804. }
  805. // Check if input is seekable
  806. rs, ok := r.r.(io.ReadSeeker)
  807. if !ok {
  808. if !random {
  809. return &ReadSeeker{Reader: r}, nil
  810. }
  811. return nil, ErrCantSeek{Reason: "input stream isn't seekable"}
  812. }
  813. if r.index != nil {
  814. // Seekable and index, ok...
  815. return &ReadSeeker{Reader: r}, nil
  816. }
  817. // Load from stream.
  818. r.index = &Index{}
  819. // Read current position.
  820. pos, err := rs.Seek(0, io.SeekCurrent)
  821. if err != nil {
  822. return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
  823. }
  824. err = r.index.LoadStream(rs)
  825. if err != nil {
  826. if err == ErrUnsupported {
  827. // If we don't require random seeking, reset input and return.
  828. if !random {
  829. _, err = rs.Seek(pos, io.SeekStart)
  830. if err != nil {
  831. return nil, ErrCantSeek{Reason: "resetting stream returned: " + err.Error()}
  832. }
  833. r.index = nil
  834. return &ReadSeeker{Reader: r}, nil
  835. }
  836. return nil, ErrCantSeek{Reason: "input stream does not contain an index"}
  837. }
  838. return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()}
  839. }
  840. // reset position.
  841. _, err = rs.Seek(pos, io.SeekStart)
  842. if err != nil {
  843. return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
  844. }
  845. return &ReadSeeker{Reader: r}, nil
  846. }
  847. // Seek allows seeking in compressed data.
  848. func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
  849. if r.err != nil {
  850. if !errors.Is(r.err, io.EOF) {
  851. return 0, r.err
  852. }
  853. // Reset on EOF
  854. r.err = nil
  855. }
  856. // Calculate absolute offset.
  857. absOffset := offset
  858. switch whence {
  859. case io.SeekStart:
  860. case io.SeekCurrent:
  861. absOffset = r.blockStart + int64(r.i) + offset
  862. case io.SeekEnd:
  863. if r.index == nil {
  864. return 0, ErrUnsupported
  865. }
  866. absOffset = r.index.TotalUncompressed + offset
  867. default:
  868. r.err = ErrUnsupported
  869. return 0, r.err
  870. }
  871. if absOffset < 0 {
  872. return 0, errors.New("seek before start of file")
  873. }
  874. if !r.readHeader {
  875. // Make sure we read the header.
  876. _, r.err = r.Read([]byte{})
  877. if r.err != nil {
  878. return 0, r.err
  879. }
  880. }
  881. // If we are inside current block no need to seek.
  882. // This includes no offset changes.
  883. if absOffset >= r.blockStart && absOffset < r.blockStart+int64(r.j) {
  884. r.i = int(absOffset - r.blockStart)
  885. return r.blockStart + int64(r.i), nil
  886. }
  887. rs, ok := r.r.(io.ReadSeeker)
  888. if r.index == nil || !ok {
  889. currOffset := r.blockStart + int64(r.i)
  890. if absOffset >= currOffset {
  891. err := r.Skip(absOffset - currOffset)
  892. return r.blockStart + int64(r.i), err
  893. }
  894. return 0, ErrUnsupported
  895. }
  896. // We can seek and we have an index.
  897. c, u, err := r.index.Find(absOffset)
  898. if err != nil {
  899. return r.blockStart + int64(r.i), err
  900. }
  901. // Seek to next block
  902. _, err = rs.Seek(c, io.SeekStart)
  903. if err != nil {
  904. return 0, err
  905. }
  906. r.i = r.j // Remove rest of current block.
  907. r.blockStart = u - int64(r.j) // Adjust current block start for accounting.
  908. if u < absOffset {
  909. // Forward inside block
  910. return absOffset, r.Skip(absOffset - u)
  911. }
  912. if u > absOffset {
  913. return 0, fmt.Errorf("s2 seek: (internal error) u (%d) > absOffset (%d)", u, absOffset)
  914. }
  915. return absOffset, nil
  916. }
  917. // ReadAt reads len(p) bytes into p starting at offset off in the
  918. // underlying input source. It returns the number of bytes
  919. // read (0 <= n <= len(p)) and any error encountered.
  920. //
  921. // When ReadAt returns n < len(p), it returns a non-nil error
  922. // explaining why more bytes were not returned. In this respect,
  923. // ReadAt is stricter than Read.
  924. //
  925. // Even if ReadAt returns n < len(p), it may use all of p as scratch
  926. // space during the call. If some data is available but not len(p) bytes,
  927. // ReadAt blocks until either all the data is available or an error occurs.
  928. // In this respect ReadAt is different from Read.
  929. //
  930. // If the n = len(p) bytes returned by ReadAt are at the end of the
  931. // input source, ReadAt may return either err == EOF or err == nil.
  932. //
  933. // If ReadAt is reading from an input source with a seek offset,
  934. // ReadAt should not affect nor be affected by the underlying
  935. // seek offset.
  936. //
  937. // Clients of ReadAt can execute parallel ReadAt calls on the
  938. // same input source. This is however not recommended.
  939. func (r *ReadSeeker) ReadAt(p []byte, offset int64) (int, error) {
  940. r.readAtMu.Lock()
  941. defer r.readAtMu.Unlock()
  942. _, err := r.Seek(offset, io.SeekStart)
  943. if err != nil {
  944. return 0, err
  945. }
  946. n := 0
  947. for n < len(p) {
  948. n2, err := r.Read(p[n:])
  949. if err != nil {
  950. // This will include io.EOF
  951. return n + n2, err
  952. }
  953. n += n2
  954. }
  955. return n, nil
  956. }
  957. // ReadByte satisfies the io.ByteReader interface.
  958. func (r *Reader) ReadByte() (byte, error) {
  959. if r.err != nil {
  960. return 0, r.err
  961. }
  962. if r.i < r.j {
  963. c := r.decoded[r.i]
  964. r.i++
  965. return c, nil
  966. }
  967. var tmp [1]byte
  968. for i := 0; i < 10; i++ {
  969. n, err := r.Read(tmp[:])
  970. if err != nil {
  971. return 0, err
  972. }
  973. if n == 1 {
  974. return tmp[0], nil
  975. }
  976. }
  977. return 0, io.ErrNoProgress
  978. }
  979. // SkippableCB will register a callback for chunks with the specified ID.
  980. // ID must be a Reserved skippable chunks ID, 0x80-0xfe (inclusive).
  981. // For each chunk with the ID, the callback is called with the content.
  982. // Any returned non-nil error will abort decompression.
  983. // Only one callback per ID is supported, latest sent will be used.
  984. // Sending a nil function will disable previous callbacks.
  985. func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error {
  986. if id < 0x80 || id > chunkTypePadding {
  987. return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)")
  988. }
  989. r.skippableCB[id] = fn
  990. return nil
  991. }