record.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. package protocol
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "time"
  8. "github.com/segmentio/kafka-go/compress"
  9. )
  10. // Attributes is a bitset representing special attributes set on records.
  11. type Attributes int16
  12. const (
  13. Gzip Attributes = Attributes(compress.Gzip) // 1
  14. Snappy Attributes = Attributes(compress.Snappy) // 2
  15. Lz4 Attributes = Attributes(compress.Lz4) // 3
  16. Zstd Attributes = Attributes(compress.Zstd) // 4
  17. Transactional Attributes = 1 << 4
  18. Control Attributes = 1 << 5
  19. )
  20. func (a Attributes) Compression() compress.Compression {
  21. return compress.Compression(a & 7)
  22. }
  23. func (a Attributes) Transactional() bool {
  24. return (a & Transactional) != 0
  25. }
  26. func (a Attributes) Control() bool {
  27. return (a & Control) != 0
  28. }
  29. func (a Attributes) String() string {
  30. s := a.Compression().String()
  31. if a.Transactional() {
  32. s += "+transactional"
  33. }
  34. if a.Control() {
  35. s += "+control"
  36. }
  37. return s
  38. }
  39. // Header represents a single entry in a list of record headers.
  40. type Header struct {
  41. Key string
  42. Value []byte
  43. }
  44. // Record is an interface representing a single kafka record.
  45. //
  46. // Record values are not safe to use concurrently from multiple goroutines.
  47. type Record struct {
  48. // The offset at which the record exists in a topic partition. This value
  49. // is ignored in produce requests.
  50. Offset int64
  51. // Returns the time of the record. This value may be omitted in produce
  52. // requests to let kafka set the time when it saves the record.
  53. Time time.Time
  54. // Returns a byte sequence containing the key of this record. The returned
  55. // sequence may be nil to indicate that the record has no key. If the record
  56. // is part of a RecordSet, the content of the key must remain valid at least
  57. // until the record set is closed (or until the key is closed).
  58. Key Bytes
  59. // Returns a byte sequence containing the value of this record. The returned
  60. // sequence may be nil to indicate that the record has no value. If the
  61. // record is part of a RecordSet, the content of the value must remain valid
  62. // at least until the record set is closed (or until the value is closed).
  63. Value Bytes
  64. // Returns the list of headers associated with this record. The returned
  65. // slice may be reused across calls, the program should use it as an
  66. // immutable value.
  67. Headers []Header
  68. }
  69. // RecordSet represents a sequence of records in Produce requests and Fetch
  70. // responses. All v0, v1, and v2 formats are supported.
  71. type RecordSet struct {
  72. // The message version that this record set will be represented as, valid
  73. // values are 1, or 2.
  74. //
  75. // When reading, this is the value of the highest version used in the
  76. // batches that compose the record set.
  77. //
  78. // When writing, this value dictates the format that the records will be
  79. // encoded in.
  80. Version int8
  81. // Attributes set on the record set.
  82. //
  83. // When reading, the attributes are the combination of all attributes in
  84. // the batches that compose the record set.
  85. //
  86. // When writing, the attributes apply to the whole sequence of records in
  87. // the set.
  88. Attributes Attributes
  89. // A reader exposing the sequence of records.
  90. //
  91. // When reading a RecordSet from an io.Reader, the Records field will be a
  92. // *RecordStream. If the program needs to access the details of each batch
  93. // that compose the stream, it may use type assertions to access the
  94. // underlying types of each batch.
  95. Records RecordReader
  96. }
  97. // bufferedReader is an interface implemented by types like bufio.Reader, which
  98. // we use to optimize prefix reads by accessing the internal buffer directly
  99. // through calls to Peek.
  100. type bufferedReader interface {
  101. Discard(int) (int, error)
  102. Peek(int) ([]byte, error)
  103. }
  104. // bytesBuffer is an interface implemented by types like bytes.Buffer, which we
  105. // use to optimize prefix reads by accessing the internal buffer directly
  106. // through calls to Bytes.
  107. type bytesBuffer interface {
  108. Bytes() []byte
  109. }
  110. // magicByteOffset is the position of the magic byte in all versions of record
  111. // sets in the kafka protocol.
  112. const magicByteOffset = 16
  113. // ReadFrom reads the representation of a record set from r into rs, returning
  114. // the number of bytes consumed from r, and an non-nil error if the record set
  115. // could not be read.
  116. func (rs *RecordSet) ReadFrom(r io.Reader) (int64, error) {
  117. d, _ := r.(*decoder)
  118. if d == nil {
  119. d = &decoder{
  120. reader: r,
  121. remain: 4,
  122. }
  123. }
  124. *rs = RecordSet{}
  125. limit := d.remain
  126. size := d.readInt32()
  127. if d.err != nil {
  128. return int64(limit - d.remain), d.err
  129. }
  130. if size <= 0 {
  131. return 4, nil
  132. }
  133. stream := &RecordStream{
  134. Records: make([]RecordReader, 0, 4),
  135. }
  136. var err error
  137. d.remain = int(size)
  138. for d.remain > 0 && err == nil {
  139. var version byte
  140. if d.remain < (magicByteOffset + 1) {
  141. if len(stream.Records) != 0 {
  142. break
  143. }
  144. return 4, fmt.Errorf("impossible record set shorter than %d bytes", magicByteOffset+1)
  145. }
  146. switch r := d.reader.(type) {
  147. case bufferedReader:
  148. b, err := r.Peek(magicByteOffset + 1)
  149. if err != nil {
  150. n, _ := r.Discard(len(b))
  151. return 4 + int64(n), dontExpectEOF(err)
  152. }
  153. version = b[magicByteOffset]
  154. case bytesBuffer:
  155. version = r.Bytes()[magicByteOffset]
  156. default:
  157. b := make([]byte, magicByteOffset+1)
  158. if n, err := io.ReadFull(d.reader, b); err != nil {
  159. return 4 + int64(n), dontExpectEOF(err)
  160. }
  161. version = b[magicByteOffset]
  162. // Reconstruct the prefix that we had to read to determine the version
  163. // of the record set from the magic byte.
  164. //
  165. // Technically this may recurisvely stack readers when consuming all
  166. // items of the batch, which could hurt performance. In practice this
  167. // path should not be taken tho, since the decoder would read from a
  168. // *bufio.Reader which implements the bufferedReader interface.
  169. d.reader = io.MultiReader(bytes.NewReader(b), d.reader)
  170. }
  171. var tmp RecordSet
  172. switch version {
  173. case 0, 1:
  174. err = tmp.readFromVersion1(d)
  175. case 2:
  176. err = tmp.readFromVersion2(d)
  177. default:
  178. err = fmt.Errorf("unsupported message version %d for message of size %d", version, size)
  179. }
  180. if tmp.Version > rs.Version {
  181. rs.Version = tmp.Version
  182. }
  183. rs.Attributes |= tmp.Attributes
  184. if tmp.Records != nil {
  185. stream.Records = append(stream.Records, tmp.Records)
  186. }
  187. }
  188. if len(stream.Records) != 0 {
  189. rs.Records = stream
  190. // Ignore errors if we've successfully read records, so the
  191. // program can keep making progress.
  192. err = nil
  193. }
  194. d.discardAll()
  195. rn := 4 + (int(size) - d.remain)
  196. d.remain = limit - rn
  197. return int64(rn), err
  198. }
  199. // WriteTo writes the representation of rs into w. The value of rs.Version
  200. // dictates which format that the record set will be represented as.
  201. //
  202. // The error will be ErrNoRecord if rs contained no records.
  203. //
  204. // Note: since this package is only compatible with kafka 0.10 and above, the
  205. // method never produces messages in version 0. If rs.Version is zero, the
  206. // method defaults to producing messages in version 1.
  207. func (rs *RecordSet) WriteTo(w io.Writer) (int64, error) {
  208. if rs.Records == nil {
  209. return 0, ErrNoRecord
  210. }
  211. // This optimization avoids rendering the record set in an intermediary
  212. // buffer when the writer is already a pageBuffer, which is a common case
  213. // due to the way WriteRequest and WriteResponse are implemented.
  214. buffer, _ := w.(*pageBuffer)
  215. bufferOffset := int64(0)
  216. if buffer != nil {
  217. bufferOffset = buffer.Size()
  218. } else {
  219. buffer = newPageBuffer()
  220. defer buffer.unref()
  221. }
  222. size := packUint32(0)
  223. buffer.Write(size[:]) // size placeholder
  224. var err error
  225. switch rs.Version {
  226. case 0, 1:
  227. err = rs.writeToVersion1(buffer, bufferOffset+4)
  228. case 2:
  229. err = rs.writeToVersion2(buffer, bufferOffset+4)
  230. default:
  231. err = fmt.Errorf("unsupported record set version %d", rs.Version)
  232. }
  233. if err != nil {
  234. return 0, err
  235. }
  236. n := buffer.Size() - bufferOffset
  237. if n == 0 {
  238. size = packUint32(^uint32(0))
  239. } else {
  240. size = packUint32(uint32(n) - 4)
  241. }
  242. buffer.WriteAt(size[:], bufferOffset)
  243. // This condition indicates that the output writer received by `WriteTo` was
  244. // not a *pageBuffer, in which case we need to flush the buffered records
  245. // data into it.
  246. if buffer != w {
  247. return buffer.WriteTo(w)
  248. }
  249. return n, nil
  250. }
  251. // RawRecordSet represents a record set for a RawProduce request. The record set is
  252. // represented as a raw sequence of pre-encoded record set bytes.
  253. type RawRecordSet struct {
  254. // Reader exposes the raw sequence of record set bytes.
  255. Reader io.Reader
  256. }
  257. // ReadFrom reads the representation of a record set from r into rrs. It re-uses the
  258. // existing RecordSet.ReadFrom implementation to first read/decode data into a RecordSet,
  259. // then writes/encodes the RecordSet to a buffer referenced by the RawRecordSet.
  260. //
  261. // Note: re-using the RecordSet.ReadFrom implementation makes this suboptimal from a
  262. // performance standpoint as it require an extra copy of the record bytes. Holding off
  263. // on optimizing, as this code path is only invoked in tests.
  264. func (rrs *RawRecordSet) ReadFrom(r io.Reader) (int64, error) {
  265. rs := &RecordSet{}
  266. n, err := rs.ReadFrom(r)
  267. if err != nil {
  268. return 0, err
  269. }
  270. buf := &bytes.Buffer{}
  271. rs.WriteTo(buf)
  272. *rrs = RawRecordSet{
  273. Reader: buf,
  274. }
  275. return n, nil
  276. }
  277. // WriteTo writes the RawRecordSet to an io.Writer. Since this is a raw record set representation, all that is
  278. // done here is copying bytes from the underlying reader to the specified writer.
  279. func (rrs *RawRecordSet) WriteTo(w io.Writer) (int64, error) {
  280. if rrs.Reader == nil {
  281. return 0, ErrNoRecord
  282. }
  283. return io.Copy(w, rrs.Reader)
  284. }
  285. func makeTime(t int64) time.Time {
  286. return time.Unix(t/1000, (t%1000)*int64(time.Millisecond))
  287. }
  288. func timestamp(t time.Time) int64 {
  289. if t.IsZero() {
  290. return 0
  291. }
  292. return t.UnixNano() / int64(time.Millisecond)
  293. }
  294. func packUint32(u uint32) (b [4]byte) {
  295. binary.BigEndian.PutUint32(b[:], u)
  296. return
  297. }
  298. func packUint64(u uint64) (b [8]byte) {
  299. binary.BigEndian.PutUint64(b[:], u)
  300. return
  301. }