123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596 |
- // Copyright (c) 2022+ Klaus Post. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- package s2
- import (
- "bytes"
- "encoding/binary"
- "encoding/json"
- "fmt"
- "io"
- "sort"
- )
- const (
- S2IndexHeader = "s2idx\x00"
- S2IndexTrailer = "\x00xdi2s"
- maxIndexEntries = 1 << 16
- )
- // Index represents an S2/Snappy index.
- type Index struct {
- TotalUncompressed int64 // Total Uncompressed size if known. Will be -1 if unknown.
- TotalCompressed int64 // Total Compressed size if known. Will be -1 if unknown.
- info []struct {
- compressedOffset int64
- uncompressedOffset int64
- }
- estBlockUncomp int64
- }
- func (i *Index) reset(maxBlock int) {
- i.estBlockUncomp = int64(maxBlock)
- i.TotalCompressed = -1
- i.TotalUncompressed = -1
- if len(i.info) > 0 {
- i.info = i.info[:0]
- }
- }
- // allocInfos will allocate an empty slice of infos.
- func (i *Index) allocInfos(n int) {
- if n > maxIndexEntries {
- panic("n > maxIndexEntries")
- }
- i.info = make([]struct {
- compressedOffset int64
- uncompressedOffset int64
- }, 0, n)
- }
- // add an uncompressed and compressed pair.
- // Entries must be sent in order.
- func (i *Index) add(compressedOffset, uncompressedOffset int64) error {
- if i == nil {
- return nil
- }
- lastIdx := len(i.info) - 1
- if lastIdx >= 0 {
- latest := i.info[lastIdx]
- if latest.uncompressedOffset == uncompressedOffset {
- // Uncompressed didn't change, don't add entry,
- // but update start index.
- latest.compressedOffset = compressedOffset
- i.info[lastIdx] = latest
- return nil
- }
- if latest.uncompressedOffset > uncompressedOffset {
- return fmt.Errorf("internal error: Earlier uncompressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset)
- }
- if latest.compressedOffset > compressedOffset {
- return fmt.Errorf("internal error: Earlier compressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset)
- }
- }
- i.info = append(i.info, struct {
- compressedOffset int64
- uncompressedOffset int64
- }{compressedOffset: compressedOffset, uncompressedOffset: uncompressedOffset})
- return nil
- }
- // Find the offset at or before the wanted (uncompressed) offset.
- // If offset is 0 or positive it is the offset from the beginning of the file.
- // If the uncompressed size is known, the offset must be within the file.
- // If an offset outside the file is requested io.ErrUnexpectedEOF is returned.
- // If the offset is negative, it is interpreted as the distance from the end of the file,
- // where -1 represents the last byte.
- // If offset from the end of the file is requested, but size is unknown,
- // ErrUnsupported will be returned.
- func (i *Index) Find(offset int64) (compressedOff, uncompressedOff int64, err error) {
- if i.TotalUncompressed < 0 {
- return 0, 0, ErrCorrupt
- }
- if offset < 0 {
- offset = i.TotalUncompressed + offset
- if offset < 0 {
- return 0, 0, io.ErrUnexpectedEOF
- }
- }
- if offset > i.TotalUncompressed {
- return 0, 0, io.ErrUnexpectedEOF
- }
- if len(i.info) > 200 {
- n := sort.Search(len(i.info), func(n int) bool {
- return i.info[n].uncompressedOffset > offset
- })
- if n == 0 {
- n = 1
- }
- return i.info[n-1].compressedOffset, i.info[n-1].uncompressedOffset, nil
- }
- for _, info := range i.info {
- if info.uncompressedOffset > offset {
- break
- }
- compressedOff = info.compressedOffset
- uncompressedOff = info.uncompressedOffset
- }
- return compressedOff, uncompressedOff, nil
- }
- // reduce to stay below maxIndexEntries
- func (i *Index) reduce() {
- if len(i.info) < maxIndexEntries && i.estBlockUncomp >= 1<<20 {
- return
- }
- // Algorithm, keep 1, remove removeN entries...
- removeN := (len(i.info) + 1) / maxIndexEntries
- src := i.info
- j := 0
- // Each block should be at least 1MB, but don't reduce below 1000 entries.
- for i.estBlockUncomp*(int64(removeN)+1) < 1<<20 && len(i.info)/(removeN+1) > 1000 {
- removeN++
- }
- for idx := 0; idx < len(src); idx++ {
- i.info[j] = src[idx]
- j++
- idx += removeN
- }
- i.info = i.info[:j]
- // Update maxblock estimate.
- i.estBlockUncomp += i.estBlockUncomp * int64(removeN)
- }
- func (i *Index) appendTo(b []byte, uncompTotal, compTotal int64) []byte {
- i.reduce()
- var tmp [binary.MaxVarintLen64]byte
- initSize := len(b)
- // We make the start a skippable header+size.
- b = append(b, ChunkTypeIndex, 0, 0, 0)
- b = append(b, []byte(S2IndexHeader)...)
- // Total Uncompressed size
- n := binary.PutVarint(tmp[:], uncompTotal)
- b = append(b, tmp[:n]...)
- // Total Compressed size
- n = binary.PutVarint(tmp[:], compTotal)
- b = append(b, tmp[:n]...)
- // Put EstBlockUncomp size
- n = binary.PutVarint(tmp[:], i.estBlockUncomp)
- b = append(b, tmp[:n]...)
- // Put length
- n = binary.PutVarint(tmp[:], int64(len(i.info)))
- b = append(b, tmp[:n]...)
- // Check if we should add uncompressed offsets
- var hasUncompressed byte
- for idx, info := range i.info {
- if idx == 0 {
- if info.uncompressedOffset != 0 {
- hasUncompressed = 1
- break
- }
- continue
- }
- if info.uncompressedOffset != i.info[idx-1].uncompressedOffset+i.estBlockUncomp {
- hasUncompressed = 1
- break
- }
- }
- b = append(b, hasUncompressed)
- // Add each entry
- if hasUncompressed == 1 {
- for idx, info := range i.info {
- uOff := info.uncompressedOffset
- if idx > 0 {
- prev := i.info[idx-1]
- uOff -= prev.uncompressedOffset + (i.estBlockUncomp)
- }
- n = binary.PutVarint(tmp[:], uOff)
- b = append(b, tmp[:n]...)
- }
- }
- // Initial compressed size estimate.
- cPredict := i.estBlockUncomp / 2
- for idx, info := range i.info {
- cOff := info.compressedOffset
- if idx > 0 {
- prev := i.info[idx-1]
- cOff -= prev.compressedOffset + cPredict
- // Update compressed size prediction, with half the error.
- cPredict += cOff / 2
- }
- n = binary.PutVarint(tmp[:], cOff)
- b = append(b, tmp[:n]...)
- }
- // Add Total Size.
- // Stored as fixed size for easier reading.
- binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)-initSize+4+len(S2IndexTrailer)))
- b = append(b, tmp[:4]...)
- // Trailer
- b = append(b, []byte(S2IndexTrailer)...)
- // Update size
- chunkLen := len(b) - initSize - skippableFrameHeader
- b[initSize+1] = uint8(chunkLen >> 0)
- b[initSize+2] = uint8(chunkLen >> 8)
- b[initSize+3] = uint8(chunkLen >> 16)
- //fmt.Printf("chunklen: 0x%x Uncomp:%d, Comp:%d\n", chunkLen, uncompTotal, compTotal)
- return b
- }
- // Load a binary index.
- // A zero value Index can be used or a previous one can be reused.
- func (i *Index) Load(b []byte) ([]byte, error) {
- if len(b) <= 4+len(S2IndexHeader)+len(S2IndexTrailer) {
- return b, io.ErrUnexpectedEOF
- }
- if b[0] != ChunkTypeIndex {
- return b, ErrCorrupt
- }
- chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16
- b = b[4:]
- // Validate we have enough...
- if len(b) < chunkLen {
- return b, io.ErrUnexpectedEOF
- }
- if !bytes.Equal(b[:len(S2IndexHeader)], []byte(S2IndexHeader)) {
- return b, ErrUnsupported
- }
- b = b[len(S2IndexHeader):]
- // Total Uncompressed
- if v, n := binary.Varint(b); n <= 0 || v < 0 {
- return b, ErrCorrupt
- } else {
- i.TotalUncompressed = v
- b = b[n:]
- }
- // Total Compressed
- if v, n := binary.Varint(b); n <= 0 {
- return b, ErrCorrupt
- } else {
- i.TotalCompressed = v
- b = b[n:]
- }
- // Read EstBlockUncomp
- if v, n := binary.Varint(b); n <= 0 {
- return b, ErrCorrupt
- } else {
- if v < 0 {
- return b, ErrCorrupt
- }
- i.estBlockUncomp = v
- b = b[n:]
- }
- var entries int
- if v, n := binary.Varint(b); n <= 0 {
- return b, ErrCorrupt
- } else {
- if v < 0 || v > maxIndexEntries {
- return b, ErrCorrupt
- }
- entries = int(v)
- b = b[n:]
- }
- if cap(i.info) < entries {
- i.allocInfos(entries)
- }
- i.info = i.info[:entries]
- if len(b) < 1 {
- return b, io.ErrUnexpectedEOF
- }
- hasUncompressed := b[0]
- b = b[1:]
- if hasUncompressed&1 != hasUncompressed {
- return b, ErrCorrupt
- }
- // Add each uncompressed entry
- for idx := range i.info {
- var uOff int64
- if hasUncompressed != 0 {
- // Load delta
- if v, n := binary.Varint(b); n <= 0 {
- return b, ErrCorrupt
- } else {
- uOff = v
- b = b[n:]
- }
- }
- if idx > 0 {
- prev := i.info[idx-1].uncompressedOffset
- uOff += prev + (i.estBlockUncomp)
- if uOff <= prev {
- return b, ErrCorrupt
- }
- }
- if uOff < 0 {
- return b, ErrCorrupt
- }
- i.info[idx].uncompressedOffset = uOff
- }
- // Initial compressed size estimate.
- cPredict := i.estBlockUncomp / 2
- // Add each compressed entry
- for idx := range i.info {
- var cOff int64
- if v, n := binary.Varint(b); n <= 0 {
- return b, ErrCorrupt
- } else {
- cOff = v
- b = b[n:]
- }
- if idx > 0 {
- // Update compressed size prediction, with half the error.
- cPredictNew := cPredict + cOff/2
- prev := i.info[idx-1].compressedOffset
- cOff += prev + cPredict
- if cOff <= prev {
- return b, ErrCorrupt
- }
- cPredict = cPredictNew
- }
- if cOff < 0 {
- return b, ErrCorrupt
- }
- i.info[idx].compressedOffset = cOff
- }
- if len(b) < 4+len(S2IndexTrailer) {
- return b, io.ErrUnexpectedEOF
- }
- // Skip size...
- b = b[4:]
- // Check trailer...
- if !bytes.Equal(b[:len(S2IndexTrailer)], []byte(S2IndexTrailer)) {
- return b, ErrCorrupt
- }
- return b[len(S2IndexTrailer):], nil
- }
- // LoadStream will load an index from the end of the supplied stream.
- // ErrUnsupported will be returned if the signature cannot be found.
- // ErrCorrupt will be returned if unexpected values are found.
- // io.ErrUnexpectedEOF is returned if there are too few bytes.
- // IO errors are returned as-is.
- func (i *Index) LoadStream(rs io.ReadSeeker) error {
- // Go to end.
- _, err := rs.Seek(-10, io.SeekEnd)
- if err != nil {
- return err
- }
- var tmp [10]byte
- _, err = io.ReadFull(rs, tmp[:])
- if err != nil {
- return err
- }
- // Check trailer...
- if !bytes.Equal(tmp[4:4+len(S2IndexTrailer)], []byte(S2IndexTrailer)) {
- return ErrUnsupported
- }
- sz := binary.LittleEndian.Uint32(tmp[:4])
- if sz > maxChunkSize+skippableFrameHeader {
- return ErrCorrupt
- }
- _, err = rs.Seek(-int64(sz), io.SeekEnd)
- if err != nil {
- return err
- }
- // Read index.
- buf := make([]byte, sz)
- _, err = io.ReadFull(rs, buf)
- if err != nil {
- return err
- }
- _, err = i.Load(buf)
- return err
- }
- // IndexStream will return an index for a stream.
- // The stream structure will be checked, but
- // data within blocks is not verified.
- // The returned index can either be appended to the end of the stream
- // or stored separately.
- func IndexStream(r io.Reader) ([]byte, error) {
- var i Index
- var buf [maxChunkSize]byte
- var readHeader bool
- for {
- _, err := io.ReadFull(r, buf[:4])
- if err != nil {
- if err == io.EOF {
- return i.appendTo(nil, i.TotalUncompressed, i.TotalCompressed), nil
- }
- return nil, err
- }
- // Start of this chunk.
- startChunk := i.TotalCompressed
- i.TotalCompressed += 4
- chunkType := buf[0]
- if !readHeader {
- if chunkType != chunkTypeStreamIdentifier {
- return nil, ErrCorrupt
- }
- readHeader = true
- }
- chunkLen := int(buf[1]) | int(buf[2])<<8 | int(buf[3])<<16
- if chunkLen < checksumSize {
- return nil, ErrCorrupt
- }
- i.TotalCompressed += int64(chunkLen)
- _, err = io.ReadFull(r, buf[:chunkLen])
- if err != nil {
- return nil, io.ErrUnexpectedEOF
- }
- // The chunk types are specified at
- // https://github.com/google/snappy/blob/master/framing_format.txt
- switch chunkType {
- case chunkTypeCompressedData:
- // Section 4.2. Compressed data (chunk type 0x00).
- // Skip checksum.
- dLen, err := DecodedLen(buf[checksumSize:])
- if err != nil {
- return nil, err
- }
- if dLen > maxBlockSize {
- return nil, ErrCorrupt
- }
- if i.estBlockUncomp == 0 {
- // Use first block for estimate...
- i.estBlockUncomp = int64(dLen)
- }
- err = i.add(startChunk, i.TotalUncompressed)
- if err != nil {
- return nil, err
- }
- i.TotalUncompressed += int64(dLen)
- continue
- case chunkTypeUncompressedData:
- n2 := chunkLen - checksumSize
- if n2 > maxBlockSize {
- return nil, ErrCorrupt
- }
- if i.estBlockUncomp == 0 {
- // Use first block for estimate...
- i.estBlockUncomp = int64(n2)
- }
- err = i.add(startChunk, i.TotalUncompressed)
- if err != nil {
- return nil, err
- }
- i.TotalUncompressed += int64(n2)
- continue
- case chunkTypeStreamIdentifier:
- // Section 4.1. Stream identifier (chunk type 0xff).
- if chunkLen != len(magicBody) {
- return nil, ErrCorrupt
- }
- if string(buf[:len(magicBody)]) != magicBody {
- if string(buf[:len(magicBody)]) != magicBodySnappy {
- return nil, ErrCorrupt
- }
- }
- continue
- }
- if chunkType <= 0x7f {
- // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
- return nil, ErrUnsupported
- }
- if chunkLen > maxChunkSize {
- return nil, ErrUnsupported
- }
- // Section 4.4 Padding (chunk type 0xfe).
- // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
- }
- }
- // JSON returns the index as JSON text.
- func (i *Index) JSON() []byte {
- type offset struct {
- CompressedOffset int64 `json:"compressed"`
- UncompressedOffset int64 `json:"uncompressed"`
- }
- x := struct {
- TotalUncompressed int64 `json:"total_uncompressed"` // Total Uncompressed size if known. Will be -1 if unknown.
- TotalCompressed int64 `json:"total_compressed"` // Total Compressed size if known. Will be -1 if unknown.
- Offsets []offset `json:"offsets"`
- EstBlockUncomp int64 `json:"est_block_uncompressed"`
- }{
- TotalUncompressed: i.TotalUncompressed,
- TotalCompressed: i.TotalCompressed,
- EstBlockUncomp: i.estBlockUncomp,
- }
- for _, v := range i.info {
- x.Offsets = append(x.Offsets, offset{CompressedOffset: v.compressedOffset, UncompressedOffset: v.uncompressedOffset})
- }
- b, _ := json.MarshalIndent(x, "", " ")
- return b
- }
- // RemoveIndexHeaders will trim all headers and trailers from a given index.
- // This is expected to save 20 bytes.
- // These can be restored using RestoreIndexHeaders.
- // This removes a layer of security, but is the most compact representation.
- // Returns nil if headers contains errors.
- // The returned slice references the provided slice.
- func RemoveIndexHeaders(b []byte) []byte {
- const save = 4 + len(S2IndexHeader) + len(S2IndexTrailer) + 4
- if len(b) <= save {
- return nil
- }
- if b[0] != ChunkTypeIndex {
- return nil
- }
- chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16
- b = b[4:]
- // Validate we have enough...
- if len(b) < chunkLen {
- return nil
- }
- b = b[:chunkLen]
- if !bytes.Equal(b[:len(S2IndexHeader)], []byte(S2IndexHeader)) {
- return nil
- }
- b = b[len(S2IndexHeader):]
- if !bytes.HasSuffix(b, []byte(S2IndexTrailer)) {
- return nil
- }
- b = bytes.TrimSuffix(b, []byte(S2IndexTrailer))
- if len(b) < 4 {
- return nil
- }
- return b[:len(b)-4]
- }
- // RestoreIndexHeaders will index restore headers removed by RemoveIndexHeaders.
- // No error checking is performed on the input.
- // If a 0 length slice is sent, it is returned without modification.
- func RestoreIndexHeaders(in []byte) []byte {
- if len(in) == 0 {
- return in
- }
- b := make([]byte, 0, 4+len(S2IndexHeader)+len(in)+len(S2IndexTrailer)+4)
- b = append(b, ChunkTypeIndex, 0, 0, 0)
- b = append(b, []byte(S2IndexHeader)...)
- b = append(b, in...)
- var tmp [4]byte
- binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)+4+len(S2IndexTrailer)))
- b = append(b, tmp[:4]...)
- // Trailer
- b = append(b, []byte(S2IndexTrailer)...)
- chunkLen := len(b) - skippableFrameHeader
- b[1] = uint8(chunkLen >> 0)
- b[2] = uint8(chunkLen >> 8)
- b[3] = uint8(chunkLen >> 16)
- return b
- }
|