compress.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package compress
  2. import (
  3. "encoding"
  4. "fmt"
  5. "io"
  6. "strconv"
  7. "strings"
  8. "github.com/segmentio/kafka-go/compress/gzip"
  9. "github.com/segmentio/kafka-go/compress/lz4"
  10. "github.com/segmentio/kafka-go/compress/snappy"
  11. "github.com/segmentio/kafka-go/compress/zstd"
  12. )
  13. // Compression represents the compression applied to a record set.
  14. type Compression int8
  15. const (
  16. None Compression = 0
  17. Gzip Compression = 1
  18. Snappy Compression = 2
  19. Lz4 Compression = 3
  20. Zstd Compression = 4
  21. )
  22. func (c Compression) Codec() Codec {
  23. if i := int(c); i >= 0 && i < len(Codecs) {
  24. return Codecs[i]
  25. }
  26. return nil
  27. }
  28. func (c Compression) String() string {
  29. if codec := c.Codec(); codec != nil {
  30. return codec.Name()
  31. }
  32. return "uncompressed"
  33. }
  34. func (c Compression) MarshalText() ([]byte, error) {
  35. return []byte(c.String()), nil
  36. }
  37. func (c *Compression) UnmarshalText(b []byte) error {
  38. switch string(b) {
  39. case "none", "uncompressed":
  40. *c = None
  41. return nil
  42. }
  43. for _, codec := range Codecs[None+1:] {
  44. if codec.Name() == string(b) {
  45. *c = Compression(codec.Code())
  46. return nil
  47. }
  48. }
  49. i, err := strconv.ParseInt(string(b), 10, 64)
  50. if err == nil && i >= 0 && i < int64(len(Codecs)) {
  51. *c = Compression(i)
  52. return nil
  53. }
  54. s := &strings.Builder{}
  55. s.WriteString("none, uncompressed")
  56. for i, codec := range Codecs[None+1:] {
  57. if i < (len(Codecs) - 1) {
  58. s.WriteString(", ")
  59. } else {
  60. s.WriteString(", or ")
  61. }
  62. s.WriteString(codec.Name())
  63. }
  64. return fmt.Errorf("compression format must be one of %s, not %q", s, b)
  65. }
  66. var (
  67. _ encoding.TextMarshaler = Compression(0)
  68. _ encoding.TextUnmarshaler = (*Compression)(nil)
  69. )
  70. // Codec represents a compression codec to encode and decode the messages.
  71. // See : https://cwiki.apache.org/confluence/display/KAFKA/Compression
  72. //
  73. // A Codec must be safe for concurrent access by multiple go routines.
  74. type Codec interface {
  75. // Code returns the compression codec code
  76. Code() int8
  77. // Human-readable name for the codec.
  78. Name() string
  79. // Constructs a new reader which decompresses data from r.
  80. NewReader(r io.Reader) io.ReadCloser
  81. // Constructs a new writer which writes compressed data to w.
  82. NewWriter(w io.Writer) io.WriteCloser
  83. }
  84. var (
  85. // The global gzip codec installed on the Codecs table.
  86. GzipCodec gzip.Codec
  87. // The global snappy codec installed on the Codecs table.
  88. SnappyCodec snappy.Codec
  89. // The global lz4 codec installed on the Codecs table.
  90. Lz4Codec lz4.Codec
  91. // The global zstd codec installed on the Codecs table.
  92. ZstdCodec zstd.Codec
  93. // The global table of compression codecs supported by the kafka protocol.
  94. Codecs = [...]Codec{
  95. None: nil,
  96. Gzip: &GzipCodec,
  97. Snappy: &SnappyCodec,
  98. Lz4: &Lz4Codec,
  99. Zstd: &ZstdCodec,
  100. }
  101. )