123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- package compress
- import (
- "encoding"
- "fmt"
- "io"
- "strconv"
- "strings"
- "github.com/segmentio/kafka-go/compress/gzip"
- "github.com/segmentio/kafka-go/compress/lz4"
- "github.com/segmentio/kafka-go/compress/snappy"
- "github.com/segmentio/kafka-go/compress/zstd"
- )
- // Compression represents the compression applied to a record set.
- type Compression int8
- const (
- None Compression = 0
- Gzip Compression = 1
- Snappy Compression = 2
- Lz4 Compression = 3
- Zstd Compression = 4
- )
- func (c Compression) Codec() Codec {
- if i := int(c); i >= 0 && i < len(Codecs) {
- return Codecs[i]
- }
- return nil
- }
- func (c Compression) String() string {
- if codec := c.Codec(); codec != nil {
- return codec.Name()
- }
- return "uncompressed"
- }
- func (c Compression) MarshalText() ([]byte, error) {
- return []byte(c.String()), nil
- }
- func (c *Compression) UnmarshalText(b []byte) error {
- switch string(b) {
- case "none", "uncompressed":
- *c = None
- return nil
- }
- for _, codec := range Codecs[None+1:] {
- if codec.Name() == string(b) {
- *c = Compression(codec.Code())
- return nil
- }
- }
- i, err := strconv.ParseInt(string(b), 10, 64)
- if err == nil && i >= 0 && i < int64(len(Codecs)) {
- *c = Compression(i)
- return nil
- }
- s := &strings.Builder{}
- s.WriteString("none, uncompressed")
- for i, codec := range Codecs[None+1:] {
- if i < (len(Codecs) - 1) {
- s.WriteString(", ")
- } else {
- s.WriteString(", or ")
- }
- s.WriteString(codec.Name())
- }
- return fmt.Errorf("compression format must be one of %s, not %q", s, b)
- }
- var (
- _ encoding.TextMarshaler = Compression(0)
- _ encoding.TextUnmarshaler = (*Compression)(nil)
- )
- // Codec represents a compression codec to encode and decode the messages.
- // See : https://cwiki.apache.org/confluence/display/KAFKA/Compression
- //
- // A Codec must be safe for concurrent access by multiple go routines.
- type Codec interface {
- // Code returns the compression codec code
- Code() int8
- // Human-readable name for the codec.
- Name() string
- // Constructs a new reader which decompresses data from r.
- NewReader(r io.Reader) io.ReadCloser
- // Constructs a new writer which writes compressed data to w.
- NewWriter(w io.Writer) io.WriteCloser
- }
- var (
- // The global gzip codec installed on the Codecs table.
- GzipCodec gzip.Codec
- // The global snappy codec installed on the Codecs table.
- SnappyCodec snappy.Codec
- // The global lz4 codec installed on the Codecs table.
- Lz4Codec lz4.Codec
- // The global zstd codec installed on the Codecs table.
- ZstdCodec zstd.Codec
- // The global table of compression codecs supported by the kafka protocol.
- Codecs = [...]Codec{
- None: nil,
- Gzip: &GzipCodec,
- Snappy: &SnappyCodec,
- Lz4: &Lz4Codec,
- Zstd: &ZstdCodec,
- }
- )
|