encode.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. package protocol
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "hash/crc32"
  7. "io"
  8. "math"
  9. "reflect"
  10. "sync"
  11. "sync/atomic"
  12. )
  13. type encoder struct {
  14. writer io.Writer
  15. err error
  16. table *crc32.Table
  17. crc32 uint32
  18. buffer [32]byte
  19. }
  20. type encoderChecksum struct {
  21. reader io.Reader
  22. encoder *encoder
  23. }
  24. func (e *encoderChecksum) Read(b []byte) (int, error) {
  25. n, err := e.reader.Read(b)
  26. if n > 0 {
  27. e.encoder.update(b[:n])
  28. }
  29. return n, err
  30. }
  31. func (e *encoder) Reset(w io.Writer) {
  32. e.writer = w
  33. e.err = nil
  34. e.table = nil
  35. e.crc32 = 0
  36. e.buffer = [32]byte{}
  37. }
  38. func (e *encoder) ReadFrom(r io.Reader) (int64, error) {
  39. if e.table != nil {
  40. r = &encoderChecksum{
  41. reader: r,
  42. encoder: e,
  43. }
  44. }
  45. return io.Copy(e.writer, r)
  46. }
  47. func (e *encoder) Write(b []byte) (int, error) {
  48. if e.err != nil {
  49. return 0, e.err
  50. }
  51. n, err := e.writer.Write(b)
  52. if n > 0 {
  53. e.update(b[:n])
  54. }
  55. if err != nil {
  56. e.err = err
  57. }
  58. return n, err
  59. }
  60. func (e *encoder) WriteByte(b byte) error {
  61. e.buffer[0] = b
  62. _, err := e.Write(e.buffer[:1])
  63. return err
  64. }
  65. func (e *encoder) WriteString(s string) (int, error) {
  66. // This implementation is an optimization to avoid the heap allocation that
  67. // would occur when converting the string to a []byte to call crc32.Update.
  68. //
  69. // Strings are rarely long in the kafka protocol, so the use of a 32 byte
  70. // buffer is a good comprise between keeping the encoder value small and
  71. // limiting the number of calls to Write.
  72. //
  73. // We introduced this optimization because memory profiles on the benchmarks
  74. // showed that most heap allocations were caused by this code path.
  75. n := 0
  76. for len(s) != 0 {
  77. c := copy(e.buffer[:], s)
  78. w, err := e.Write(e.buffer[:c])
  79. n += w
  80. if err != nil {
  81. return n, err
  82. }
  83. s = s[c:]
  84. }
  85. return n, nil
  86. }
  87. func (e *encoder) setCRC(table *crc32.Table) {
  88. e.table, e.crc32 = table, 0
  89. }
  90. func (e *encoder) update(b []byte) {
  91. if e.table != nil {
  92. e.crc32 = crc32.Update(e.crc32, e.table, b)
  93. }
  94. }
  95. func (e *encoder) encodeBool(v value) {
  96. b := int8(0)
  97. if v.bool() {
  98. b = 1
  99. }
  100. e.writeInt8(b)
  101. }
  102. func (e *encoder) encodeInt8(v value) {
  103. e.writeInt8(v.int8())
  104. }
  105. func (e *encoder) encodeInt16(v value) {
  106. e.writeInt16(v.int16())
  107. }
  108. func (e *encoder) encodeInt32(v value) {
  109. e.writeInt32(v.int32())
  110. }
  111. func (e *encoder) encodeInt64(v value) {
  112. e.writeInt64(v.int64())
  113. }
  114. func (e *encoder) encodeFloat64(v value) {
  115. e.writeFloat64(v.float64())
  116. }
  117. func (e *encoder) encodeString(v value) {
  118. e.writeString(v.string())
  119. }
  120. func (e *encoder) encodeCompactString(v value) {
  121. e.writeCompactString(v.string())
  122. }
  123. func (e *encoder) encodeNullString(v value) {
  124. e.writeNullString(v.string())
  125. }
  126. func (e *encoder) encodeCompactNullString(v value) {
  127. e.writeCompactNullString(v.string())
  128. }
  129. func (e *encoder) encodeBytes(v value) {
  130. e.writeBytes(v.bytes())
  131. }
  132. func (e *encoder) encodeCompactBytes(v value) {
  133. e.writeCompactBytes(v.bytes())
  134. }
  135. func (e *encoder) encodeNullBytes(v value) {
  136. e.writeNullBytes(v.bytes())
  137. }
  138. func (e *encoder) encodeCompactNullBytes(v value) {
  139. e.writeCompactNullBytes(v.bytes())
  140. }
  141. func (e *encoder) encodeArray(v value, elemType reflect.Type, encodeElem encodeFunc) {
  142. a := v.array(elemType)
  143. n := a.length()
  144. e.writeInt32(int32(n))
  145. for i := 0; i < n; i++ {
  146. encodeElem(e, a.index(i))
  147. }
  148. }
  149. func (e *encoder) encodeCompactArray(v value, elemType reflect.Type, encodeElem encodeFunc) {
  150. a := v.array(elemType)
  151. n := a.length()
  152. e.writeUnsignedVarInt(uint64(n + 1))
  153. for i := 0; i < n; i++ {
  154. encodeElem(e, a.index(i))
  155. }
  156. }
  157. func (e *encoder) encodeNullArray(v value, elemType reflect.Type, encodeElem encodeFunc) {
  158. a := v.array(elemType)
  159. if a.isNil() {
  160. e.writeInt32(-1)
  161. return
  162. }
  163. n := a.length()
  164. e.writeInt32(int32(n))
  165. for i := 0; i < n; i++ {
  166. encodeElem(e, a.index(i))
  167. }
  168. }
  169. func (e *encoder) encodeCompactNullArray(v value, elemType reflect.Type, encodeElem encodeFunc) {
  170. a := v.array(elemType)
  171. if a.isNil() {
  172. e.writeUnsignedVarInt(0)
  173. return
  174. }
  175. n := a.length()
  176. e.writeUnsignedVarInt(uint64(n + 1))
  177. for i := 0; i < n; i++ {
  178. encodeElem(e, a.index(i))
  179. }
  180. }
  181. func (e *encoder) writeInt8(i int8) {
  182. writeInt8(e.buffer[:1], i)
  183. e.Write(e.buffer[:1])
  184. }
  185. func (e *encoder) writeInt16(i int16) {
  186. writeInt16(e.buffer[:2], i)
  187. e.Write(e.buffer[:2])
  188. }
  189. func (e *encoder) writeInt32(i int32) {
  190. writeInt32(e.buffer[:4], i)
  191. e.Write(e.buffer[:4])
  192. }
  193. func (e *encoder) writeInt64(i int64) {
  194. writeInt64(e.buffer[:8], i)
  195. e.Write(e.buffer[:8])
  196. }
  197. func (e *encoder) writeFloat64(f float64) {
  198. writeFloat64(e.buffer[:8], f)
  199. e.Write(e.buffer[:8])
  200. }
  201. func (e *encoder) writeString(s string) {
  202. e.writeInt16(int16(len(s)))
  203. e.WriteString(s)
  204. }
  205. func (e *encoder) writeVarString(s string) {
  206. e.writeVarInt(int64(len(s)))
  207. e.WriteString(s)
  208. }
  209. func (e *encoder) writeCompactString(s string) {
  210. e.writeUnsignedVarInt(uint64(len(s)) + 1)
  211. e.WriteString(s)
  212. }
  213. func (e *encoder) writeNullString(s string) {
  214. if s == "" {
  215. e.writeInt16(-1)
  216. } else {
  217. e.writeInt16(int16(len(s)))
  218. e.WriteString(s)
  219. }
  220. }
  221. func (e *encoder) writeCompactNullString(s string) {
  222. if s == "" {
  223. e.writeUnsignedVarInt(0)
  224. } else {
  225. e.writeUnsignedVarInt(uint64(len(s)) + 1)
  226. e.WriteString(s)
  227. }
  228. }
  229. func (e *encoder) writeBytes(b []byte) {
  230. e.writeInt32(int32(len(b)))
  231. e.Write(b)
  232. }
  233. func (e *encoder) writeCompactBytes(b []byte) {
  234. e.writeUnsignedVarInt(uint64(len(b)) + 1)
  235. e.Write(b)
  236. }
  237. func (e *encoder) writeNullBytes(b []byte) {
  238. if b == nil {
  239. e.writeInt32(-1)
  240. } else {
  241. e.writeInt32(int32(len(b)))
  242. e.Write(b)
  243. }
  244. }
  245. func (e *encoder) writeVarNullBytes(b []byte) {
  246. if b == nil {
  247. e.writeVarInt(-1)
  248. } else {
  249. e.writeVarInt(int64(len(b)))
  250. e.Write(b)
  251. }
  252. }
  253. func (e *encoder) writeCompactNullBytes(b []byte) {
  254. if b == nil {
  255. e.writeUnsignedVarInt(0)
  256. } else {
  257. e.writeUnsignedVarInt(uint64(len(b)) + 1)
  258. e.Write(b)
  259. }
  260. }
  261. func (e *encoder) writeNullBytesFrom(b Bytes) error {
  262. if b == nil {
  263. e.writeInt32(-1)
  264. return nil
  265. } else {
  266. size := int64(b.Len())
  267. e.writeInt32(int32(size))
  268. n, err := io.Copy(e, b)
  269. if err == nil && n != size {
  270. err = fmt.Errorf("size of nullable bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF)
  271. }
  272. return err
  273. }
  274. }
  275. func (e *encoder) writeVarNullBytesFrom(b Bytes) error {
  276. if b == nil {
  277. e.writeVarInt(-1)
  278. return nil
  279. } else {
  280. size := int64(b.Len())
  281. e.writeVarInt(size)
  282. n, err := io.Copy(e, b)
  283. if err == nil && n != size {
  284. err = fmt.Errorf("size of nullable bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF)
  285. }
  286. return err
  287. }
  288. }
  289. func (e *encoder) writeVarInt(i int64) {
  290. e.writeUnsignedVarInt(uint64((i << 1) ^ (i >> 63)))
  291. }
  292. func (e *encoder) writeUnsignedVarInt(i uint64) {
  293. b := e.buffer[:]
  294. n := 0
  295. for i >= 0x80 && n < len(b) {
  296. b[n] = byte(i) | 0x80
  297. i >>= 7
  298. n++
  299. }
  300. if n < len(b) {
  301. b[n] = byte(i)
  302. n++
  303. }
  304. e.Write(b[:n])
  305. }
  306. type encodeFunc func(*encoder, value)
  307. var (
  308. _ io.ReaderFrom = (*encoder)(nil)
  309. _ io.Writer = (*encoder)(nil)
  310. _ io.ByteWriter = (*encoder)(nil)
  311. _ io.StringWriter = (*encoder)(nil)
  312. writerTo = reflect.TypeOf((*io.WriterTo)(nil)).Elem()
  313. )
  314. func encodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) encodeFunc {
  315. if reflect.PtrTo(typ).Implements(writerTo) {
  316. return writerEncodeFuncOf(typ)
  317. }
  318. switch typ.Kind() {
  319. case reflect.Bool:
  320. return (*encoder).encodeBool
  321. case reflect.Int8:
  322. return (*encoder).encodeInt8
  323. case reflect.Int16:
  324. return (*encoder).encodeInt16
  325. case reflect.Int32:
  326. return (*encoder).encodeInt32
  327. case reflect.Int64:
  328. return (*encoder).encodeInt64
  329. case reflect.Float64:
  330. return (*encoder).encodeFloat64
  331. case reflect.String:
  332. return stringEncodeFuncOf(flexible, tag)
  333. case reflect.Struct:
  334. return structEncodeFuncOf(typ, version, flexible)
  335. case reflect.Slice:
  336. if typ.Elem().Kind() == reflect.Uint8 { // []byte
  337. return bytesEncodeFuncOf(flexible, tag)
  338. }
  339. return arrayEncodeFuncOf(typ, version, flexible, tag)
  340. default:
  341. panic("unsupported type: " + typ.String())
  342. }
  343. }
  344. func stringEncodeFuncOf(flexible bool, tag structTag) encodeFunc {
  345. switch {
  346. case flexible && tag.Nullable:
  347. // In flexible messages, all strings are compact
  348. return (*encoder).encodeCompactNullString
  349. case flexible:
  350. // In flexible messages, all strings are compact
  351. return (*encoder).encodeCompactString
  352. case tag.Nullable:
  353. return (*encoder).encodeNullString
  354. default:
  355. return (*encoder).encodeString
  356. }
  357. }
  358. func bytesEncodeFuncOf(flexible bool, tag structTag) encodeFunc {
  359. switch {
  360. case flexible && tag.Nullable:
  361. // In flexible messages, all arrays are compact
  362. return (*encoder).encodeCompactNullBytes
  363. case flexible:
  364. // In flexible messages, all arrays are compact
  365. return (*encoder).encodeCompactBytes
  366. case tag.Nullable:
  367. return (*encoder).encodeNullBytes
  368. default:
  369. return (*encoder).encodeBytes
  370. }
  371. }
  372. func structEncodeFuncOf(typ reflect.Type, version int16, flexible bool) encodeFunc {
  373. type field struct {
  374. encode encodeFunc
  375. index index
  376. tagID int
  377. }
  378. var fields []field
  379. var taggedFields []field
  380. forEachStructField(typ, func(typ reflect.Type, index index, tag string) {
  381. if typ.Size() != 0 { // skip struct{}
  382. forEachStructTag(tag, func(tag structTag) bool {
  383. if tag.MinVersion <= version && version <= tag.MaxVersion {
  384. f := field{
  385. encode: encodeFuncOf(typ, version, flexible, tag),
  386. index: index,
  387. tagID: tag.TagID,
  388. }
  389. if tag.TagID < -1 {
  390. // Normal required field
  391. fields = append(fields, f)
  392. } else {
  393. // Optional tagged field (flexible messages only)
  394. taggedFields = append(taggedFields, f)
  395. }
  396. return false
  397. }
  398. return true
  399. })
  400. }
  401. })
  402. return func(e *encoder, v value) {
  403. for i := range fields {
  404. f := &fields[i]
  405. f.encode(e, v.fieldByIndex(f.index))
  406. }
  407. if flexible {
  408. // See https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
  409. // for details of tag buffers in "flexible" messages.
  410. e.writeUnsignedVarInt(uint64(len(taggedFields)))
  411. for i := range taggedFields {
  412. f := &taggedFields[i]
  413. e.writeUnsignedVarInt(uint64(f.tagID))
  414. buf := &bytes.Buffer{}
  415. se := &encoder{writer: buf}
  416. f.encode(se, v.fieldByIndex(f.index))
  417. e.writeUnsignedVarInt(uint64(buf.Len()))
  418. e.Write(buf.Bytes())
  419. }
  420. }
  421. }
  422. }
  423. func arrayEncodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) encodeFunc {
  424. elemType := typ.Elem()
  425. elemFunc := encodeFuncOf(elemType, version, flexible, tag)
  426. switch {
  427. case flexible && tag.Nullable:
  428. // In flexible messages, all arrays are compact
  429. return func(e *encoder, v value) { e.encodeCompactNullArray(v, elemType, elemFunc) }
  430. case flexible:
  431. // In flexible messages, all arrays are compact
  432. return func(e *encoder, v value) { e.encodeCompactArray(v, elemType, elemFunc) }
  433. case tag.Nullable:
  434. return func(e *encoder, v value) { e.encodeNullArray(v, elemType, elemFunc) }
  435. default:
  436. return func(e *encoder, v value) { e.encodeArray(v, elemType, elemFunc) }
  437. }
  438. }
  439. func writerEncodeFuncOf(typ reflect.Type) encodeFunc {
  440. typ = reflect.PtrTo(typ)
  441. return func(e *encoder, v value) {
  442. // Optimization to write directly into the buffer when the encoder
  443. // does no need to compute a crc32 checksum.
  444. w := io.Writer(e)
  445. if e.table == nil {
  446. w = e.writer
  447. }
  448. _, err := v.iface(typ).(io.WriterTo).WriteTo(w)
  449. if err != nil {
  450. e.err = err
  451. }
  452. }
  453. }
  454. func writeInt8(b []byte, i int8) {
  455. b[0] = byte(i)
  456. }
  457. func writeInt16(b []byte, i int16) {
  458. binary.BigEndian.PutUint16(b, uint16(i))
  459. }
  460. func writeInt32(b []byte, i int32) {
  461. binary.BigEndian.PutUint32(b, uint32(i))
  462. }
  463. func writeInt64(b []byte, i int64) {
  464. binary.BigEndian.PutUint64(b, uint64(i))
  465. }
  466. func writeFloat64(b []byte, f float64) {
  467. binary.BigEndian.PutUint64(b, math.Float64bits(f))
  468. }
  469. func Marshal(version int16, value interface{}) ([]byte, error) {
  470. typ := typeOf(value)
  471. cache, _ := marshalers.Load().(map[versionedType]encodeFunc)
  472. key := versionedType{typ: typ, version: version}
  473. encode := cache[key]
  474. if encode == nil {
  475. encode = encodeFuncOf(reflect.TypeOf(value), version, false, structTag{
  476. MinVersion: -1,
  477. MaxVersion: -1,
  478. TagID: -2,
  479. Compact: true,
  480. Nullable: true,
  481. })
  482. newCache := make(map[versionedType]encodeFunc, len(cache)+1)
  483. newCache[key] = encode
  484. for typ, fun := range cache {
  485. newCache[typ] = fun
  486. }
  487. marshalers.Store(newCache)
  488. }
  489. e, _ := encoders.Get().(*encoder)
  490. if e == nil {
  491. e = &encoder{writer: new(bytes.Buffer)}
  492. }
  493. b, _ := e.writer.(*bytes.Buffer)
  494. defer func() {
  495. b.Reset()
  496. e.Reset(b)
  497. encoders.Put(e)
  498. }()
  499. encode(e, nonAddressableValueOf(value))
  500. if e.err != nil {
  501. return nil, e.err
  502. }
  503. buf := b.Bytes()
  504. out := make([]byte, len(buf))
  505. copy(out, buf)
  506. return out, nil
  507. }
  508. type versionedType struct {
  509. typ _type
  510. version int16
  511. }
  512. var (
  513. encoders sync.Pool // *encoder
  514. marshalers atomic.Value // map[versionedType]encodeFunc
  515. )