123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- package kafka
- import (
- "time"
- )
- // Message is a data structure representing kafka messages.
- type Message struct {
- // Topic indicates which topic this message was consumed from via Reader.
- //
- // When being used with Writer, this can be used to configure the topic if
- // not already specified on the writer itself.
- Topic string
- // Partition is read-only and MUST NOT be set when writing messages
- Partition int
- Offset int64
- HighWaterMark int64
- Key []byte
- Value []byte
- Headers []Header
- // This field is used to hold arbitrary data you wish to include, so it
- // will be available when handle it on the Writer's `Completion` method,
- // this support the application can do any post operation on each message.
- WriterData interface{}
- // If not set at the creation, Time will be automatically set when
- // writing the message.
- Time time.Time
- }
- func (msg Message) message(cw *crc32Writer) message {
- m := message{
- MagicByte: 1,
- Key: msg.Key,
- Value: msg.Value,
- Timestamp: timestamp(msg.Time),
- }
- if cw != nil {
- m.CRC = m.crc32(cw)
- }
- return m
- }
- const timestampSize = 8
- func (msg *Message) size() int32 {
- return 4 + 1 + 1 + sizeofBytes(msg.Key) + sizeofBytes(msg.Value) + timestampSize
- }
- func (msg *Message) headerSize() int {
- return varArrayLen(len(msg.Headers), func(i int) int {
- h := &msg.Headers[i]
- return varStringLen(h.Key) + varBytesLen(h.Value)
- })
- }
- func (msg *Message) totalSize() int32 {
- return int32(msg.headerSize()) + msg.size()
- }
- type message struct {
- CRC int32
- MagicByte int8
- Attributes int8
- Timestamp int64
- Key []byte
- Value []byte
- }
- func (m message) crc32(cw *crc32Writer) int32 {
- cw.crc32 = 0
- cw.writeInt8(m.MagicByte)
- cw.writeInt8(m.Attributes)
- if m.MagicByte != 0 {
- cw.writeInt64(m.Timestamp)
- }
- cw.writeBytes(m.Key)
- cw.writeBytes(m.Value)
- return int32(cw.crc32)
- }
- func (m message) size() int32 {
- size := 4 + 1 + 1 + sizeofBytes(m.Key) + sizeofBytes(m.Value)
- if m.MagicByte != 0 {
- size += timestampSize
- }
- return size
- }
- func (m message) writeTo(wb *writeBuffer) {
- wb.writeInt32(m.CRC)
- wb.writeInt8(m.MagicByte)
- wb.writeInt8(m.Attributes)
- if m.MagicByte != 0 {
- wb.writeInt64(m.Timestamp)
- }
- wb.writeBytes(m.Key)
- wb.writeBytes(m.Value)
- }
- type messageSetItem struct {
- Offset int64
- MessageSize int32
- Message message
- }
- func (m messageSetItem) size() int32 {
- return 8 + 4 + m.Message.size()
- }
- func (m messageSetItem) writeTo(wb *writeBuffer) {
- wb.writeInt64(m.Offset)
- wb.writeInt32(m.MessageSize)
- m.Message.writeTo(wb)
- }
- type messageSet []messageSetItem
- func (s messageSet) size() (size int32) {
- for _, m := range s {
- size += m.size()
- }
- return
- }
- func (s messageSet) writeTo(wb *writeBuffer) {
- for _, m := range s {
- m.writeTo(wb)
- }
- }
|