snappy.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package snappy
  2. import (
  3. "io"
  4. "sync"
  5. "github.com/klauspost/compress/s2"
  6. "github.com/klauspost/compress/snappy"
  7. )
  8. // Framing is an enumeration type used to enable or disable xerial framing of
  9. // snappy messages.
  10. type Framing int
  11. const (
  12. Framed Framing = iota
  13. Unframed
  14. )
  15. // Compression level.
  16. type Compression int
  17. const (
  18. DefaultCompression Compression = iota
  19. FasterCompression
  20. BetterCompression
  21. BestCompression
  22. )
  23. var (
  24. readerPool sync.Pool
  25. writerPool sync.Pool
  26. )
  27. // Codec is the implementation of a compress.Codec which supports creating
  28. // readers and writers for kafka messages compressed with snappy.
  29. type Codec struct {
  30. // An optional framing to apply to snappy compression.
  31. //
  32. // Default to Framed.
  33. Framing Framing
  34. // Compression level.
  35. Compression Compression
  36. }
  37. // Code implements the compress.Codec interface.
  38. func (c *Codec) Code() int8 { return 2 }
  39. // Name implements the compress.Codec interface.
  40. func (c *Codec) Name() string { return "snappy" }
  41. // NewReader implements the compress.Codec interface.
  42. func (c *Codec) NewReader(r io.Reader) io.ReadCloser {
  43. x, _ := readerPool.Get().(*xerialReader)
  44. if x != nil {
  45. x.Reset(r)
  46. } else {
  47. x = &xerialReader{
  48. reader: r,
  49. decode: snappy.Decode,
  50. }
  51. }
  52. return &reader{xerialReader: x}
  53. }
  54. // NewWriter implements the compress.Codec interface.
  55. func (c *Codec) NewWriter(w io.Writer) io.WriteCloser {
  56. x, _ := writerPool.Get().(*xerialWriter)
  57. if x != nil {
  58. x.Reset(w)
  59. } else {
  60. x = &xerialWriter{writer: w}
  61. }
  62. x.framed = c.Framing == Framed
  63. switch c.Compression {
  64. case FasterCompression:
  65. x.encode = s2.EncodeSnappy
  66. case BetterCompression:
  67. x.encode = s2.EncodeSnappyBetter
  68. case BestCompression:
  69. x.encode = s2.EncodeSnappyBest
  70. default:
  71. x.encode = snappy.Encode // aka. s2.EncodeSnappyBetter
  72. }
  73. return &writer{xerialWriter: x}
  74. }
  75. type reader struct{ *xerialReader }
  76. func (r *reader) Close() (err error) {
  77. if x := r.xerialReader; x != nil {
  78. r.xerialReader = nil
  79. x.Reset(nil)
  80. readerPool.Put(x)
  81. }
  82. return
  83. }
  84. type writer struct{ *xerialWriter }
  85. func (w *writer) Close() (err error) {
  86. if x := w.xerialWriter; x != nil {
  87. w.xerialWriter = nil
  88. err = x.Flush()
  89. x.Reset(nil)
  90. writerPool.Put(x)
  91. }
  92. return
  93. }