writer.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020
  1. // Copyright 2011 The Snappy-Go Authors. All rights reserved.
  2. // Copyright (c) 2019+ Klaus Post. All rights reserved.
  3. // Use of this source code is governed by a BSD-style
  4. // license that can be found in the LICENSE file.
  5. package s2
  6. import (
  7. "crypto/rand"
  8. "encoding/binary"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "runtime"
  13. "sync"
  14. )
  15. const (
  16. levelUncompressed = iota + 1
  17. levelFast
  18. levelBetter
  19. levelBest
  20. )
  21. // NewWriter returns a new Writer that compresses to w, using the
  22. // framing format described at
  23. // https://github.com/google/snappy/blob/master/framing_format.txt
  24. //
  25. // Users must call Close to guarantee all data has been forwarded to
  26. // the underlying io.Writer and that resources are released.
  27. // They may also call Flush zero or more times before calling Close.
  28. func NewWriter(w io.Writer, opts ...WriterOption) *Writer {
  29. w2 := Writer{
  30. blockSize: defaultBlockSize,
  31. concurrency: runtime.GOMAXPROCS(0),
  32. randSrc: rand.Reader,
  33. level: levelFast,
  34. }
  35. for _, opt := range opts {
  36. if err := opt(&w2); err != nil {
  37. w2.errState = err
  38. return &w2
  39. }
  40. }
  41. w2.obufLen = obufHeaderLen + MaxEncodedLen(w2.blockSize)
  42. w2.paramsOK = true
  43. w2.ibuf = make([]byte, 0, w2.blockSize)
  44. w2.buffers.New = func() interface{} {
  45. return make([]byte, w2.obufLen)
  46. }
  47. w2.Reset(w)
  48. return &w2
  49. }
  50. // Writer is an io.Writer that can write Snappy-compressed bytes.
  51. type Writer struct {
  52. errMu sync.Mutex
  53. errState error
  54. // ibuf is a buffer for the incoming (uncompressed) bytes.
  55. ibuf []byte
  56. blockSize int
  57. obufLen int
  58. concurrency int
  59. written int64
  60. uncompWritten int64 // Bytes sent to compression
  61. output chan chan result
  62. buffers sync.Pool
  63. pad int
  64. writer io.Writer
  65. randSrc io.Reader
  66. writerWg sync.WaitGroup
  67. index Index
  68. customEnc func(dst, src []byte) int
  69. // wroteStreamHeader is whether we have written the stream header.
  70. wroteStreamHeader bool
  71. paramsOK bool
  72. snappy bool
  73. flushOnWrite bool
  74. appendIndex bool
  75. level uint8
  76. }
  77. type result struct {
  78. b []byte
  79. // Uncompressed start offset
  80. startOffset int64
  81. }
  82. // err returns the previously set error.
  83. // If no error has been set it is set to err if not nil.
  84. func (w *Writer) err(err error) error {
  85. w.errMu.Lock()
  86. errSet := w.errState
  87. if errSet == nil && err != nil {
  88. w.errState = err
  89. errSet = err
  90. }
  91. w.errMu.Unlock()
  92. return errSet
  93. }
  94. // Reset discards the writer's state and switches the Snappy writer to write to w.
  95. // This permits reusing a Writer rather than allocating a new one.
  96. func (w *Writer) Reset(writer io.Writer) {
  97. if !w.paramsOK {
  98. return
  99. }
  100. // Close previous writer, if any.
  101. if w.output != nil {
  102. close(w.output)
  103. w.writerWg.Wait()
  104. w.output = nil
  105. }
  106. w.errState = nil
  107. w.ibuf = w.ibuf[:0]
  108. w.wroteStreamHeader = false
  109. w.written = 0
  110. w.writer = writer
  111. w.uncompWritten = 0
  112. w.index.reset(w.blockSize)
  113. // If we didn't get a writer, stop here.
  114. if writer == nil {
  115. return
  116. }
  117. // If no concurrency requested, don't spin up writer goroutine.
  118. if w.concurrency == 1 {
  119. return
  120. }
  121. toWrite := make(chan chan result, w.concurrency)
  122. w.output = toWrite
  123. w.writerWg.Add(1)
  124. // Start a writer goroutine that will write all output in order.
  125. go func() {
  126. defer w.writerWg.Done()
  127. // Get a queued write.
  128. for write := range toWrite {
  129. // Wait for the data to be available.
  130. input := <-write
  131. in := input.b
  132. if len(in) > 0 {
  133. if w.err(nil) == nil {
  134. // Don't expose data from previous buffers.
  135. toWrite := in[:len(in):len(in)]
  136. // Write to output.
  137. n, err := writer.Write(toWrite)
  138. if err == nil && n != len(toWrite) {
  139. err = io.ErrShortBuffer
  140. }
  141. _ = w.err(err)
  142. w.err(w.index.add(w.written, input.startOffset))
  143. w.written += int64(n)
  144. }
  145. }
  146. if cap(in) >= w.obufLen {
  147. w.buffers.Put(in)
  148. }
  149. // close the incoming write request.
  150. // This can be used for synchronizing flushes.
  151. close(write)
  152. }
  153. }()
  154. }
  155. // Write satisfies the io.Writer interface.
  156. func (w *Writer) Write(p []byte) (nRet int, errRet error) {
  157. if err := w.err(nil); err != nil {
  158. return 0, err
  159. }
  160. if w.flushOnWrite {
  161. return w.write(p)
  162. }
  163. // If we exceed the input buffer size, start writing
  164. for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err(nil) == nil {
  165. var n int
  166. if len(w.ibuf) == 0 {
  167. // Large write, empty buffer.
  168. // Write directly from p to avoid copy.
  169. n, _ = w.write(p)
  170. } else {
  171. n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
  172. w.ibuf = w.ibuf[:len(w.ibuf)+n]
  173. w.write(w.ibuf)
  174. w.ibuf = w.ibuf[:0]
  175. }
  176. nRet += n
  177. p = p[n:]
  178. }
  179. if err := w.err(nil); err != nil {
  180. return nRet, err
  181. }
  182. // p should always be able to fit into w.ibuf now.
  183. n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
  184. w.ibuf = w.ibuf[:len(w.ibuf)+n]
  185. nRet += n
  186. return nRet, nil
  187. }
  188. // ReadFrom implements the io.ReaderFrom interface.
  189. // Using this is typically more efficient since it avoids a memory copy.
  190. // ReadFrom reads data from r until EOF or error.
  191. // The return value n is the number of bytes read.
  192. // Any error except io.EOF encountered during the read is also returned.
  193. func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
  194. if err := w.err(nil); err != nil {
  195. return 0, err
  196. }
  197. if len(w.ibuf) > 0 {
  198. err := w.Flush()
  199. if err != nil {
  200. return 0, err
  201. }
  202. }
  203. if br, ok := r.(byter); ok {
  204. buf := br.Bytes()
  205. if err := w.EncodeBuffer(buf); err != nil {
  206. return 0, err
  207. }
  208. return int64(len(buf)), w.Flush()
  209. }
  210. for {
  211. inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen]
  212. n2, err := io.ReadFull(r, inbuf[obufHeaderLen:])
  213. if err != nil {
  214. if err == io.ErrUnexpectedEOF {
  215. err = io.EOF
  216. }
  217. if err != io.EOF {
  218. return n, w.err(err)
  219. }
  220. }
  221. if n2 == 0 {
  222. break
  223. }
  224. n += int64(n2)
  225. err2 := w.writeFull(inbuf[:n2+obufHeaderLen])
  226. if w.err(err2) != nil {
  227. break
  228. }
  229. if err != nil {
  230. // We got EOF and wrote everything
  231. break
  232. }
  233. }
  234. return n, w.err(nil)
  235. }
  236. // AddSkippableBlock will add a skippable block to the stream.
  237. // The ID must be 0x80-0xfe (inclusive).
  238. // Length of the skippable block must be <= 16777215 bytes.
  239. func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
  240. if err := w.err(nil); err != nil {
  241. return err
  242. }
  243. if len(data) == 0 {
  244. return nil
  245. }
  246. if id < 0x80 || id > chunkTypePadding {
  247. return fmt.Errorf("invalid skippable block id %x", id)
  248. }
  249. if len(data) > maxChunkSize {
  250. return fmt.Errorf("skippable block excessed maximum size")
  251. }
  252. var header [4]byte
  253. chunkLen := 4 + len(data)
  254. header[0] = id
  255. header[1] = uint8(chunkLen >> 0)
  256. header[2] = uint8(chunkLen >> 8)
  257. header[3] = uint8(chunkLen >> 16)
  258. if w.concurrency == 1 {
  259. write := func(b []byte) error {
  260. n, err := w.writer.Write(b)
  261. if err = w.err(err); err != nil {
  262. return err
  263. }
  264. if n != len(data) {
  265. return w.err(io.ErrShortWrite)
  266. }
  267. w.written += int64(n)
  268. return w.err(nil)
  269. }
  270. if !w.wroteStreamHeader {
  271. w.wroteStreamHeader = true
  272. if w.snappy {
  273. if err := write([]byte(magicChunkSnappy)); err != nil {
  274. return err
  275. }
  276. } else {
  277. if err := write([]byte(magicChunk)); err != nil {
  278. return err
  279. }
  280. }
  281. }
  282. if err := write(header[:]); err != nil {
  283. return err
  284. }
  285. if err := write(data); err != nil {
  286. return err
  287. }
  288. }
  289. // Create output...
  290. if !w.wroteStreamHeader {
  291. w.wroteStreamHeader = true
  292. hWriter := make(chan result)
  293. w.output <- hWriter
  294. if w.snappy {
  295. hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
  296. } else {
  297. hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
  298. }
  299. }
  300. // Copy input.
  301. inbuf := w.buffers.Get().([]byte)[:4]
  302. copy(inbuf, header[:])
  303. inbuf = append(inbuf, data...)
  304. output := make(chan result, 1)
  305. // Queue output.
  306. w.output <- output
  307. output <- result{startOffset: w.uncompWritten, b: inbuf}
  308. return nil
  309. }
  310. // EncodeBuffer will add a buffer to the stream.
  311. // This is the fastest way to encode a stream,
  312. // but the input buffer cannot be written to by the caller
  313. // until Flush or Close has been called when concurrency != 1.
  314. //
  315. // If you cannot control that, use the regular Write function.
  316. //
  317. // Note that input is not buffered.
  318. // This means that each write will result in discrete blocks being created.
  319. // For buffered writes, use the regular Write function.
  320. func (w *Writer) EncodeBuffer(buf []byte) (err error) {
  321. if err := w.err(nil); err != nil {
  322. return err
  323. }
  324. if w.flushOnWrite {
  325. _, err := w.write(buf)
  326. return err
  327. }
  328. // Flush queued data first.
  329. if len(w.ibuf) > 0 {
  330. err := w.Flush()
  331. if err != nil {
  332. return err
  333. }
  334. }
  335. if w.concurrency == 1 {
  336. _, err := w.writeSync(buf)
  337. return err
  338. }
  339. // Spawn goroutine and write block to output channel.
  340. if !w.wroteStreamHeader {
  341. w.wroteStreamHeader = true
  342. hWriter := make(chan result)
  343. w.output <- hWriter
  344. if w.snappy {
  345. hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
  346. } else {
  347. hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
  348. }
  349. }
  350. for len(buf) > 0 {
  351. // Cut input.
  352. uncompressed := buf
  353. if len(uncompressed) > w.blockSize {
  354. uncompressed = uncompressed[:w.blockSize]
  355. }
  356. buf = buf[len(uncompressed):]
  357. // Get an output buffer.
  358. obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
  359. output := make(chan result)
  360. // Queue output now, so we keep order.
  361. w.output <- output
  362. res := result{
  363. startOffset: w.uncompWritten,
  364. }
  365. w.uncompWritten += int64(len(uncompressed))
  366. go func() {
  367. checksum := crc(uncompressed)
  368. // Set to uncompressed.
  369. chunkType := uint8(chunkTypeUncompressedData)
  370. chunkLen := 4 + len(uncompressed)
  371. // Attempt compressing.
  372. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
  373. n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
  374. // Check if we should use this, or store as uncompressed instead.
  375. if n2 > 0 {
  376. chunkType = uint8(chunkTypeCompressedData)
  377. chunkLen = 4 + n + n2
  378. obuf = obuf[:obufHeaderLen+n+n2]
  379. } else {
  380. // copy uncompressed
  381. copy(obuf[obufHeaderLen:], uncompressed)
  382. }
  383. // Fill in the per-chunk header that comes before the body.
  384. obuf[0] = chunkType
  385. obuf[1] = uint8(chunkLen >> 0)
  386. obuf[2] = uint8(chunkLen >> 8)
  387. obuf[3] = uint8(chunkLen >> 16)
  388. obuf[4] = uint8(checksum >> 0)
  389. obuf[5] = uint8(checksum >> 8)
  390. obuf[6] = uint8(checksum >> 16)
  391. obuf[7] = uint8(checksum >> 24)
  392. // Queue final output.
  393. res.b = obuf
  394. output <- res
  395. }()
  396. }
  397. return nil
  398. }
  399. func (w *Writer) encodeBlock(obuf, uncompressed []byte) int {
  400. if w.customEnc != nil {
  401. if ret := w.customEnc(obuf, uncompressed); ret >= 0 {
  402. return ret
  403. }
  404. }
  405. if w.snappy {
  406. switch w.level {
  407. case levelFast:
  408. return encodeBlockSnappy(obuf, uncompressed)
  409. case levelBetter:
  410. return encodeBlockBetterSnappy(obuf, uncompressed)
  411. case levelBest:
  412. return encodeBlockBestSnappy(obuf, uncompressed)
  413. }
  414. return 0
  415. }
  416. switch w.level {
  417. case levelFast:
  418. return encodeBlock(obuf, uncompressed)
  419. case levelBetter:
  420. return encodeBlockBetter(obuf, uncompressed)
  421. case levelBest:
  422. return encodeBlockBest(obuf, uncompressed, nil)
  423. }
  424. return 0
  425. }
  426. func (w *Writer) write(p []byte) (nRet int, errRet error) {
  427. if err := w.err(nil); err != nil {
  428. return 0, err
  429. }
  430. if w.concurrency == 1 {
  431. return w.writeSync(p)
  432. }
  433. // Spawn goroutine and write block to output channel.
  434. for len(p) > 0 {
  435. if !w.wroteStreamHeader {
  436. w.wroteStreamHeader = true
  437. hWriter := make(chan result)
  438. w.output <- hWriter
  439. if w.snappy {
  440. hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
  441. } else {
  442. hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
  443. }
  444. }
  445. var uncompressed []byte
  446. if len(p) > w.blockSize {
  447. uncompressed, p = p[:w.blockSize], p[w.blockSize:]
  448. } else {
  449. uncompressed, p = p, nil
  450. }
  451. // Copy input.
  452. // If the block is incompressible, this is used for the result.
  453. inbuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
  454. obuf := w.buffers.Get().([]byte)[:w.obufLen]
  455. copy(inbuf[obufHeaderLen:], uncompressed)
  456. uncompressed = inbuf[obufHeaderLen:]
  457. output := make(chan result)
  458. // Queue output now, so we keep order.
  459. w.output <- output
  460. res := result{
  461. startOffset: w.uncompWritten,
  462. }
  463. w.uncompWritten += int64(len(uncompressed))
  464. go func() {
  465. checksum := crc(uncompressed)
  466. // Set to uncompressed.
  467. chunkType := uint8(chunkTypeUncompressedData)
  468. chunkLen := 4 + len(uncompressed)
  469. // Attempt compressing.
  470. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
  471. n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
  472. // Check if we should use this, or store as uncompressed instead.
  473. if n2 > 0 {
  474. chunkType = uint8(chunkTypeCompressedData)
  475. chunkLen = 4 + n + n2
  476. obuf = obuf[:obufHeaderLen+n+n2]
  477. } else {
  478. // Use input as output.
  479. obuf, inbuf = inbuf, obuf
  480. }
  481. // Fill in the per-chunk header that comes before the body.
  482. obuf[0] = chunkType
  483. obuf[1] = uint8(chunkLen >> 0)
  484. obuf[2] = uint8(chunkLen >> 8)
  485. obuf[3] = uint8(chunkLen >> 16)
  486. obuf[4] = uint8(checksum >> 0)
  487. obuf[5] = uint8(checksum >> 8)
  488. obuf[6] = uint8(checksum >> 16)
  489. obuf[7] = uint8(checksum >> 24)
  490. // Queue final output.
  491. res.b = obuf
  492. output <- res
  493. // Put unused buffer back in pool.
  494. w.buffers.Put(inbuf)
  495. }()
  496. nRet += len(uncompressed)
  497. }
  498. return nRet, nil
  499. }
  500. // writeFull is a special version of write that will always write the full buffer.
  501. // Data to be compressed should start at offset obufHeaderLen and fill the remainder of the buffer.
  502. // The data will be written as a single block.
  503. // The caller is not allowed to use inbuf after this function has been called.
  504. func (w *Writer) writeFull(inbuf []byte) (errRet error) {
  505. if err := w.err(nil); err != nil {
  506. return err
  507. }
  508. if w.concurrency == 1 {
  509. _, err := w.writeSync(inbuf[obufHeaderLen:])
  510. return err
  511. }
  512. // Spawn goroutine and write block to output channel.
  513. if !w.wroteStreamHeader {
  514. w.wroteStreamHeader = true
  515. hWriter := make(chan result)
  516. w.output <- hWriter
  517. if w.snappy {
  518. hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
  519. } else {
  520. hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
  521. }
  522. }
  523. // Get an output buffer.
  524. obuf := w.buffers.Get().([]byte)[:w.obufLen]
  525. uncompressed := inbuf[obufHeaderLen:]
  526. output := make(chan result)
  527. // Queue output now, so we keep order.
  528. w.output <- output
  529. res := result{
  530. startOffset: w.uncompWritten,
  531. }
  532. w.uncompWritten += int64(len(uncompressed))
  533. go func() {
  534. checksum := crc(uncompressed)
  535. // Set to uncompressed.
  536. chunkType := uint8(chunkTypeUncompressedData)
  537. chunkLen := 4 + len(uncompressed)
  538. // Attempt compressing.
  539. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
  540. n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
  541. // Check if we should use this, or store as uncompressed instead.
  542. if n2 > 0 {
  543. chunkType = uint8(chunkTypeCompressedData)
  544. chunkLen = 4 + n + n2
  545. obuf = obuf[:obufHeaderLen+n+n2]
  546. } else {
  547. // Use input as output.
  548. obuf, inbuf = inbuf, obuf
  549. }
  550. // Fill in the per-chunk header that comes before the body.
  551. obuf[0] = chunkType
  552. obuf[1] = uint8(chunkLen >> 0)
  553. obuf[2] = uint8(chunkLen >> 8)
  554. obuf[3] = uint8(chunkLen >> 16)
  555. obuf[4] = uint8(checksum >> 0)
  556. obuf[5] = uint8(checksum >> 8)
  557. obuf[6] = uint8(checksum >> 16)
  558. obuf[7] = uint8(checksum >> 24)
  559. // Queue final output.
  560. res.b = obuf
  561. output <- res
  562. // Put unused buffer back in pool.
  563. w.buffers.Put(inbuf)
  564. }()
  565. return nil
  566. }
  567. func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
  568. if err := w.err(nil); err != nil {
  569. return 0, err
  570. }
  571. if !w.wroteStreamHeader {
  572. w.wroteStreamHeader = true
  573. var n int
  574. var err error
  575. if w.snappy {
  576. n, err = w.writer.Write([]byte(magicChunkSnappy))
  577. } else {
  578. n, err = w.writer.Write([]byte(magicChunk))
  579. }
  580. if err != nil {
  581. return 0, w.err(err)
  582. }
  583. if n != len(magicChunk) {
  584. return 0, w.err(io.ErrShortWrite)
  585. }
  586. w.written += int64(n)
  587. }
  588. for len(p) > 0 {
  589. var uncompressed []byte
  590. if len(p) > w.blockSize {
  591. uncompressed, p = p[:w.blockSize], p[w.blockSize:]
  592. } else {
  593. uncompressed, p = p, nil
  594. }
  595. obuf := w.buffers.Get().([]byte)[:w.obufLen]
  596. checksum := crc(uncompressed)
  597. // Set to uncompressed.
  598. chunkType := uint8(chunkTypeUncompressedData)
  599. chunkLen := 4 + len(uncompressed)
  600. // Attempt compressing.
  601. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
  602. n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
  603. if n2 > 0 {
  604. chunkType = uint8(chunkTypeCompressedData)
  605. chunkLen = 4 + n + n2
  606. obuf = obuf[:obufHeaderLen+n+n2]
  607. } else {
  608. obuf = obuf[:8]
  609. }
  610. // Fill in the per-chunk header that comes before the body.
  611. obuf[0] = chunkType
  612. obuf[1] = uint8(chunkLen >> 0)
  613. obuf[2] = uint8(chunkLen >> 8)
  614. obuf[3] = uint8(chunkLen >> 16)
  615. obuf[4] = uint8(checksum >> 0)
  616. obuf[5] = uint8(checksum >> 8)
  617. obuf[6] = uint8(checksum >> 16)
  618. obuf[7] = uint8(checksum >> 24)
  619. n, err := w.writer.Write(obuf)
  620. if err != nil {
  621. return 0, w.err(err)
  622. }
  623. if n != len(obuf) {
  624. return 0, w.err(io.ErrShortWrite)
  625. }
  626. w.err(w.index.add(w.written, w.uncompWritten))
  627. w.written += int64(n)
  628. w.uncompWritten += int64(len(uncompressed))
  629. if chunkType == chunkTypeUncompressedData {
  630. // Write uncompressed data.
  631. n, err := w.writer.Write(uncompressed)
  632. if err != nil {
  633. return 0, w.err(err)
  634. }
  635. if n != len(uncompressed) {
  636. return 0, w.err(io.ErrShortWrite)
  637. }
  638. w.written += int64(n)
  639. }
  640. w.buffers.Put(obuf)
  641. // Queue final output.
  642. nRet += len(uncompressed)
  643. }
  644. return nRet, nil
  645. }
  646. // Flush flushes the Writer to its underlying io.Writer.
  647. // This does not apply padding.
  648. func (w *Writer) Flush() error {
  649. if err := w.err(nil); err != nil {
  650. return err
  651. }
  652. // Queue any data still in input buffer.
  653. if len(w.ibuf) != 0 {
  654. if !w.wroteStreamHeader {
  655. _, err := w.writeSync(w.ibuf)
  656. w.ibuf = w.ibuf[:0]
  657. return w.err(err)
  658. } else {
  659. _, err := w.write(w.ibuf)
  660. w.ibuf = w.ibuf[:0]
  661. err = w.err(err)
  662. if err != nil {
  663. return err
  664. }
  665. }
  666. }
  667. if w.output == nil {
  668. return w.err(nil)
  669. }
  670. // Send empty buffer
  671. res := make(chan result)
  672. w.output <- res
  673. // Block until this has been picked up.
  674. res <- result{b: nil, startOffset: w.uncompWritten}
  675. // When it is closed, we have flushed.
  676. <-res
  677. return w.err(nil)
  678. }
  679. // Close calls Flush and then closes the Writer.
  680. // Calling Close multiple times is ok,
  681. // but calling CloseIndex after this will make it not return the index.
  682. func (w *Writer) Close() error {
  683. _, err := w.closeIndex(w.appendIndex)
  684. return err
  685. }
  686. // CloseIndex calls Close and returns an index on first call.
  687. // This is not required if you are only adding index to a stream.
  688. func (w *Writer) CloseIndex() ([]byte, error) {
  689. return w.closeIndex(true)
  690. }
  691. func (w *Writer) closeIndex(idx bool) ([]byte, error) {
  692. err := w.Flush()
  693. if w.output != nil {
  694. close(w.output)
  695. w.writerWg.Wait()
  696. w.output = nil
  697. }
  698. var index []byte
  699. if w.err(err) == nil && w.writer != nil {
  700. // Create index.
  701. if idx {
  702. compSize := int64(-1)
  703. if w.pad <= 1 {
  704. compSize = w.written
  705. }
  706. index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize)
  707. // Count as written for padding.
  708. if w.appendIndex {
  709. w.written += int64(len(index))
  710. }
  711. }
  712. if w.pad > 1 {
  713. tmp := w.ibuf[:0]
  714. if len(index) > 0 {
  715. // Allocate another buffer.
  716. tmp = w.buffers.Get().([]byte)[:0]
  717. defer w.buffers.Put(tmp)
  718. }
  719. add := calcSkippableFrame(w.written, int64(w.pad))
  720. frame, err := skippableFrame(tmp, add, w.randSrc)
  721. if err = w.err(err); err != nil {
  722. return nil, err
  723. }
  724. n, err2 := w.writer.Write(frame)
  725. if err2 == nil && n != len(frame) {
  726. err2 = io.ErrShortWrite
  727. }
  728. _ = w.err(err2)
  729. }
  730. if len(index) > 0 && w.appendIndex {
  731. n, err2 := w.writer.Write(index)
  732. if err2 == nil && n != len(index) {
  733. err2 = io.ErrShortWrite
  734. }
  735. _ = w.err(err2)
  736. }
  737. }
  738. err = w.err(errClosed)
  739. if err == errClosed {
  740. return index, nil
  741. }
  742. return nil, err
  743. }
  744. // calcSkippableFrame will return a total size to be added for written
  745. // to be divisible by multiple.
  746. // The value will always be > skippableFrameHeader.
  747. // The function will panic if written < 0 or wantMultiple <= 0.
  748. func calcSkippableFrame(written, wantMultiple int64) int {
  749. if wantMultiple <= 0 {
  750. panic("wantMultiple <= 0")
  751. }
  752. if written < 0 {
  753. panic("written < 0")
  754. }
  755. leftOver := written % wantMultiple
  756. if leftOver == 0 {
  757. return 0
  758. }
  759. toAdd := wantMultiple - leftOver
  760. for toAdd < skippableFrameHeader {
  761. toAdd += wantMultiple
  762. }
  763. return int(toAdd)
  764. }
  765. // skippableFrame will add a skippable frame with a total size of bytes.
  766. // total should be >= skippableFrameHeader and < maxBlockSize + skippableFrameHeader
  767. func skippableFrame(dst []byte, total int, r io.Reader) ([]byte, error) {
  768. if total == 0 {
  769. return dst, nil
  770. }
  771. if total < skippableFrameHeader {
  772. return dst, fmt.Errorf("s2: requested skippable frame (%d) < 4", total)
  773. }
  774. if int64(total) >= maxBlockSize+skippableFrameHeader {
  775. return dst, fmt.Errorf("s2: requested skippable frame (%d) >= max 1<<24", total)
  776. }
  777. // Chunk type 0xfe "Section 4.4 Padding (chunk type 0xfe)"
  778. dst = append(dst, chunkTypePadding)
  779. f := uint32(total - skippableFrameHeader)
  780. // Add chunk length.
  781. dst = append(dst, uint8(f), uint8(f>>8), uint8(f>>16))
  782. // Add data
  783. start := len(dst)
  784. dst = append(dst, make([]byte, f)...)
  785. _, err := io.ReadFull(r, dst[start:])
  786. return dst, err
  787. }
  788. var errClosed = errors.New("s2: Writer is closed")
  789. // WriterOption is an option for creating a encoder.
  790. type WriterOption func(*Writer) error
  791. // WriterConcurrency will set the concurrency,
  792. // meaning the maximum number of decoders to run concurrently.
  793. // The value supplied must be at least 1.
  794. // By default this will be set to GOMAXPROCS.
  795. func WriterConcurrency(n int) WriterOption {
  796. return func(w *Writer) error {
  797. if n <= 0 {
  798. return errors.New("concurrency must be at least 1")
  799. }
  800. w.concurrency = n
  801. return nil
  802. }
  803. }
  804. // WriterAddIndex will append an index to the end of a stream
  805. // when it is closed.
  806. func WriterAddIndex() WriterOption {
  807. return func(w *Writer) error {
  808. w.appendIndex = true
  809. return nil
  810. }
  811. }
  812. // WriterBetterCompression will enable better compression.
  813. // EncodeBetter compresses better than Encode but typically with a
  814. // 10-40% speed decrease on both compression and decompression.
  815. func WriterBetterCompression() WriterOption {
  816. return func(w *Writer) error {
  817. w.level = levelBetter
  818. return nil
  819. }
  820. }
  821. // WriterBestCompression will enable better compression.
  822. // EncodeBetter compresses better than Encode but typically with a
  823. // big speed decrease on compression.
  824. func WriterBestCompression() WriterOption {
  825. return func(w *Writer) error {
  826. w.level = levelBest
  827. return nil
  828. }
  829. }
  830. // WriterUncompressed will bypass compression.
  831. // The stream will be written as uncompressed blocks only.
  832. // If concurrency is > 1 CRC and output will still be done async.
  833. func WriterUncompressed() WriterOption {
  834. return func(w *Writer) error {
  835. w.level = levelUncompressed
  836. return nil
  837. }
  838. }
  839. // WriterBlockSize allows to override the default block size.
  840. // Blocks will be this size or smaller.
  841. // Minimum size is 4KB and and maximum size is 4MB.
  842. //
  843. // Bigger blocks may give bigger throughput on systems with many cores,
  844. // and will increase compression slightly, but it will limit the possible
  845. // concurrency for smaller payloads for both encoding and decoding.
  846. // Default block size is 1MB.
  847. //
  848. // When writing Snappy compatible output using WriterSnappyCompat,
  849. // the maximum block size is 64KB.
  850. func WriterBlockSize(n int) WriterOption {
  851. return func(w *Writer) error {
  852. if w.snappy && n > maxSnappyBlockSize || n < minBlockSize {
  853. return errors.New("s2: block size too large. Must be <= 64K and >=4KB on for snappy compatible output")
  854. }
  855. if n > maxBlockSize || n < minBlockSize {
  856. return errors.New("s2: block size too large. Must be <= 4MB and >=4KB")
  857. }
  858. w.blockSize = n
  859. return nil
  860. }
  861. }
  862. // WriterPadding will add padding to all output so the size will be a multiple of n.
  863. // This can be used to obfuscate the exact output size or make blocks of a certain size.
  864. // The contents will be a skippable frame, so it will be invisible by the decoder.
  865. // n must be > 0 and <= 4MB.
  866. // The padded area will be filled with data from crypto/rand.Reader.
  867. // The padding will be applied whenever Close is called on the writer.
  868. func WriterPadding(n int) WriterOption {
  869. return func(w *Writer) error {
  870. if n <= 0 {
  871. return fmt.Errorf("s2: padding must be at least 1")
  872. }
  873. // No need to waste our time.
  874. if n == 1 {
  875. w.pad = 0
  876. }
  877. if n > maxBlockSize {
  878. return fmt.Errorf("s2: padding must less than 4MB")
  879. }
  880. w.pad = n
  881. return nil
  882. }
  883. }
  884. // WriterPaddingSrc will get random data for padding from the supplied source.
  885. // By default crypto/rand is used.
  886. func WriterPaddingSrc(reader io.Reader) WriterOption {
  887. return func(w *Writer) error {
  888. w.randSrc = reader
  889. return nil
  890. }
  891. }
  892. // WriterSnappyCompat will write snappy compatible output.
  893. // The output can be decompressed using either snappy or s2.
  894. // If block size is more than 64KB it is set to that.
  895. func WriterSnappyCompat() WriterOption {
  896. return func(w *Writer) error {
  897. w.snappy = true
  898. if w.blockSize > 64<<10 {
  899. // We choose 8 bytes less than 64K, since that will make literal emits slightly more effective.
  900. // And allows us to skip some size checks.
  901. w.blockSize = (64 << 10) - 8
  902. }
  903. return nil
  904. }
  905. }
  906. // WriterFlushOnWrite will compress blocks on each call to the Write function.
  907. //
  908. // This is quite inefficient as blocks size will depend on the write size.
  909. //
  910. // Use WriterConcurrency(1) to also make sure that output is flushed.
  911. // When Write calls return, otherwise they will be written when compression is done.
  912. func WriterFlushOnWrite() WriterOption {
  913. return func(w *Writer) error {
  914. w.flushOnWrite = true
  915. return nil
  916. }
  917. }
  918. // WriterCustomEncoder allows to override the encoder for blocks on the stream.
  919. // The function must compress 'src' into 'dst' and return the bytes used in dst as an integer.
  920. // Block size (initial varint) should not be added by the encoder.
  921. // Returning value 0 indicates the block could not be compressed.
  922. // Returning a negative value indicates that compression should be attempted.
  923. // The function should expect to be called concurrently.
  924. func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption {
  925. return func(w *Writer) error {
  926. w.customEnc = fn
  927. return nil
  928. }
  929. }