message.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package kafka
  2. import (
  3. "time"
  4. )
  5. // Message is a data structure representing kafka messages.
  6. type Message struct {
  7. // Topic indicates which topic this message was consumed from via Reader.
  8. //
  9. // When being used with Writer, this can be used to configure the topic if
  10. // not already specified on the writer itself.
  11. Topic string
  12. // Partition is read-only and MUST NOT be set when writing messages
  13. Partition int
  14. Offset int64
  15. HighWaterMark int64
  16. Key []byte
  17. Value []byte
  18. Headers []Header
  19. // This field is used to hold arbitrary data you wish to include, so it
  20. // will be available when handle it on the Writer's `Completion` method,
  21. // this support the application can do any post operation on each message.
  22. WriterData interface{}
  23. // If not set at the creation, Time will be automatically set when
  24. // writing the message.
  25. Time time.Time
  26. }
  27. func (msg Message) message(cw *crc32Writer) message {
  28. m := message{
  29. MagicByte: 1,
  30. Key: msg.Key,
  31. Value: msg.Value,
  32. Timestamp: timestamp(msg.Time),
  33. }
  34. if cw != nil {
  35. m.CRC = m.crc32(cw)
  36. }
  37. return m
  38. }
  39. const timestampSize = 8
  40. func (msg *Message) size() int32 {
  41. return 4 + 1 + 1 + sizeofBytes(msg.Key) + sizeofBytes(msg.Value) + timestampSize
  42. }
  43. func (msg *Message) headerSize() int {
  44. return varArrayLen(len(msg.Headers), func(i int) int {
  45. h := &msg.Headers[i]
  46. return varStringLen(h.Key) + varBytesLen(h.Value)
  47. })
  48. }
  49. func (msg *Message) totalSize() int32 {
  50. return int32(msg.headerSize()) + msg.size()
  51. }
  52. type message struct {
  53. CRC int32
  54. MagicByte int8
  55. Attributes int8
  56. Timestamp int64
  57. Key []byte
  58. Value []byte
  59. }
  60. func (m message) crc32(cw *crc32Writer) int32 {
  61. cw.crc32 = 0
  62. cw.writeInt8(m.MagicByte)
  63. cw.writeInt8(m.Attributes)
  64. if m.MagicByte != 0 {
  65. cw.writeInt64(m.Timestamp)
  66. }
  67. cw.writeBytes(m.Key)
  68. cw.writeBytes(m.Value)
  69. return int32(cw.crc32)
  70. }
  71. func (m message) size() int32 {
  72. size := 4 + 1 + 1 + sizeofBytes(m.Key) + sizeofBytes(m.Value)
  73. if m.MagicByte != 0 {
  74. size += timestampSize
  75. }
  76. return size
  77. }
  78. func (m message) writeTo(wb *writeBuffer) {
  79. wb.writeInt32(m.CRC)
  80. wb.writeInt8(m.MagicByte)
  81. wb.writeInt8(m.Attributes)
  82. if m.MagicByte != 0 {
  83. wb.writeInt64(m.Timestamp)
  84. }
  85. wb.writeBytes(m.Key)
  86. wb.writeBytes(m.Value)
  87. }
  88. type messageSetItem struct {
  89. Offset int64
  90. MessageSize int32
  91. Message message
  92. }
  93. func (m messageSetItem) size() int32 {
  94. return 8 + 4 + m.Message.size()
  95. }
  96. func (m messageSetItem) writeTo(wb *writeBuffer) {
  97. wb.writeInt64(m.Offset)
  98. wb.writeInt32(m.MessageSize)
  99. m.Message.writeTo(wb)
  100. }
  101. type messageSet []messageSetItem
  102. func (s messageSet) size() (size int32) {
  103. for _, m := range s {
  104. size += m.size()
  105. }
  106. return
  107. }
  108. func (s messageSet) writeTo(wb *writeBuffer) {
  109. for _, m := range s {
  110. m.writeTo(wb)
  111. }
  112. }