batch.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. package kafka
  2. import (
  3. "bufio"
  4. "errors"
  5. "io"
  6. "sync"
  7. "time"
  8. )
  9. // A Batch is an iterator over a sequence of messages fetched from a kafka
  10. // server.
  11. //
  12. // Batches are created by calling (*Conn).ReadBatch. They hold a internal lock
  13. // on the connection, which is released when the batch is closed. Failing to
  14. // call a batch's Close method will likely result in a dead-lock when trying to
  15. // use the connection.
  16. //
  17. // Batches are safe to use concurrently from multiple goroutines.
  18. type Batch struct {
  19. mutex sync.Mutex
  20. conn *Conn
  21. lock *sync.Mutex
  22. msgs *messageSetReader
  23. deadline time.Time
  24. throttle time.Duration
  25. topic string
  26. partition int
  27. offset int64
  28. highWaterMark int64
  29. err error
  30. // The last offset in the batch.
  31. //
  32. // We use lastOffset to skip offsets that have been compacted away.
  33. //
  34. // We store lastOffset because we get lastOffset when we read a new message
  35. // but only try to handle compaction when we receive an EOF. However, when
  36. // we get an EOF we do not get the lastOffset. So there is a mismatch
  37. // between when we receive it and need to use it.
  38. lastOffset int64
  39. }
  40. // Throttle gives the throttling duration applied by the kafka server on the
  41. // connection.
  42. func (batch *Batch) Throttle() time.Duration {
  43. return batch.throttle
  44. }
  45. // Watermark returns the current highest watermark in a partition.
  46. func (batch *Batch) HighWaterMark() int64 {
  47. return batch.highWaterMark
  48. }
  49. // Partition returns the batch partition.
  50. func (batch *Batch) Partition() int {
  51. return batch.partition
  52. }
  53. // Offset returns the offset of the next message in the batch.
  54. func (batch *Batch) Offset() int64 {
  55. batch.mutex.Lock()
  56. offset := batch.offset
  57. batch.mutex.Unlock()
  58. return offset
  59. }
  60. // Close closes the batch, releasing the connection lock and returning an error
  61. // if reading the batch failed for any reason.
  62. func (batch *Batch) Close() error {
  63. batch.mutex.Lock()
  64. err := batch.close()
  65. batch.mutex.Unlock()
  66. return err
  67. }
  68. func (batch *Batch) close() (err error) {
  69. conn := batch.conn
  70. lock := batch.lock
  71. batch.conn = nil
  72. batch.lock = nil
  73. if batch.msgs != nil {
  74. batch.msgs.discard()
  75. }
  76. if batch.msgs != nil && batch.msgs.decompressed != nil {
  77. releaseBuffer(batch.msgs.decompressed)
  78. batch.msgs.decompressed = nil
  79. }
  80. if err = batch.err; errors.Is(batch.err, io.EOF) {
  81. err = nil
  82. }
  83. if conn != nil {
  84. conn.rdeadline.unsetConnReadDeadline()
  85. conn.mutex.Lock()
  86. conn.offset = batch.offset
  87. conn.mutex.Unlock()
  88. if err != nil {
  89. var kafkaError Error
  90. if !errors.As(err, &kafkaError) && !errors.Is(err, io.ErrShortBuffer) {
  91. conn.Close()
  92. }
  93. }
  94. }
  95. if lock != nil {
  96. lock.Unlock()
  97. }
  98. return
  99. }
  100. // Err returns a non-nil error if the batch is broken. This is the same error
  101. // that would be returned by Read, ReadMessage or Close (except in the case of
  102. // io.EOF which is never returned by Close).
  103. //
  104. // This method is useful when building retry mechanisms for (*Conn).ReadBatch,
  105. // the program can check whether the batch carried a error before attempting to
  106. // read the first message.
  107. //
  108. // Note that checking errors on a batch is optional, calling Read or ReadMessage
  109. // is always valid and can be used to either read a message or an error in cases
  110. // where that's convenient.
  111. func (batch *Batch) Err() error { return batch.err }
  112. // Read reads the value of the next message from the batch into b, returning the
  113. // number of bytes read, or an error if the next message couldn't be read.
  114. //
  115. // If an error is returned the batch cannot be used anymore and calling Read
  116. // again will keep returning that error. All errors except io.EOF (indicating
  117. // that the program consumed all messages from the batch) are also returned by
  118. // Close.
  119. //
  120. // The method fails with io.ErrShortBuffer if the buffer passed as argument is
  121. // too small to hold the message value.
  122. func (batch *Batch) Read(b []byte) (int, error) {
  123. n := 0
  124. batch.mutex.Lock()
  125. offset := batch.offset
  126. _, _, _, err := batch.readMessage(
  127. func(r *bufio.Reader, size int, nbytes int) (int, error) {
  128. if nbytes < 0 {
  129. return size, nil
  130. }
  131. return discardN(r, size, nbytes)
  132. },
  133. func(r *bufio.Reader, size int, nbytes int) (int, error) {
  134. if nbytes < 0 {
  135. return size, nil
  136. }
  137. // make sure there are enough bytes for the message value. return
  138. // errShortRead if the message is truncated.
  139. if nbytes > size {
  140. return size, errShortRead
  141. }
  142. n = nbytes // return value
  143. if nbytes > cap(b) {
  144. nbytes = cap(b)
  145. }
  146. if nbytes > len(b) {
  147. b = b[:nbytes]
  148. }
  149. nbytes, err := io.ReadFull(r, b[:nbytes])
  150. if err != nil {
  151. return size - nbytes, err
  152. }
  153. return discardN(r, size-nbytes, n-nbytes)
  154. },
  155. )
  156. if err == nil && n > len(b) {
  157. n, err = len(b), io.ErrShortBuffer
  158. batch.err = io.ErrShortBuffer
  159. batch.offset = offset // rollback
  160. }
  161. batch.mutex.Unlock()
  162. return n, err
  163. }
  164. // ReadMessage reads and return the next message from the batch.
  165. //
  166. // Because this method allocate memory buffers for the message key and value
  167. // it is less memory-efficient than Read, but has the advantage of never
  168. // failing with io.ErrShortBuffer.
  169. func (batch *Batch) ReadMessage() (Message, error) {
  170. msg := Message{}
  171. batch.mutex.Lock()
  172. var offset, timestamp int64
  173. var headers []Header
  174. var err error
  175. offset, timestamp, headers, err = batch.readMessage(
  176. func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
  177. msg.Key, remain, err = readNewBytes(r, size, nbytes)
  178. return
  179. },
  180. func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
  181. msg.Value, remain, err = readNewBytes(r, size, nbytes)
  182. return
  183. },
  184. )
  185. // A batch may start before the requested offset so skip messages
  186. // until the requested offset is reached.
  187. for batch.conn != nil && offset < batch.conn.offset {
  188. if err != nil {
  189. break
  190. }
  191. offset, timestamp, headers, err = batch.readMessage(
  192. func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
  193. msg.Key, remain, err = readNewBytes(r, size, nbytes)
  194. return
  195. },
  196. func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
  197. msg.Value, remain, err = readNewBytes(r, size, nbytes)
  198. return
  199. },
  200. )
  201. }
  202. batch.mutex.Unlock()
  203. msg.Topic = batch.topic
  204. msg.Partition = batch.partition
  205. msg.Offset = offset
  206. msg.HighWaterMark = batch.highWaterMark
  207. msg.Time = makeTime(timestamp)
  208. msg.Headers = headers
  209. return msg, err
  210. }
  211. func (batch *Batch) readMessage(
  212. key func(*bufio.Reader, int, int) (int, error),
  213. val func(*bufio.Reader, int, int) (int, error),
  214. ) (offset int64, timestamp int64, headers []Header, err error) {
  215. if err = batch.err; err != nil {
  216. return
  217. }
  218. var lastOffset int64
  219. offset, lastOffset, timestamp, headers, err = batch.msgs.readMessage(batch.offset, key, val)
  220. switch {
  221. case err == nil:
  222. batch.offset = offset + 1
  223. batch.lastOffset = lastOffset
  224. case errors.Is(err, errShortRead):
  225. // As an "optimization" kafka truncates the returned response after
  226. // producing MaxBytes, which could then cause the code to return
  227. // errShortRead.
  228. err = batch.msgs.discard()
  229. switch {
  230. case err != nil:
  231. // Since io.EOF is used by the batch to indicate that there is are
  232. // no more messages to consume, it is crucial that any io.EOF errors
  233. // on the underlying connection are repackaged. Otherwise, the
  234. // caller can't tell the difference between a batch that was fully
  235. // consumed or a batch whose connection is in an error state.
  236. batch.err = dontExpectEOF(err)
  237. case batch.msgs.remaining() == 0:
  238. // Because we use the adjusted deadline we could end up returning
  239. // before the actual deadline occurred. This is necessary otherwise
  240. // timing out the connection for real could end up leaving it in an
  241. // unpredictable state, which would require closing it.
  242. // This design decision was made to maximize the chances of keeping
  243. // the connection open, the trade off being to lose precision on the
  244. // read deadline management.
  245. err = checkTimeoutErr(batch.deadline)
  246. batch.err = err
  247. // Checks the following:
  248. // - `batch.err` for a "success" from the previous timeout check
  249. // - `batch.msgs.lengthRemain` to ensure that this EOF is not due
  250. // to MaxBytes truncation
  251. // - `batch.lastOffset` to ensure that the message format contains
  252. // `lastOffset`
  253. if errors.Is(batch.err, io.EOF) && batch.msgs.lengthRemain == 0 && batch.lastOffset != -1 {
  254. // Log compaction can create batches that end with compacted
  255. // records so the normal strategy that increments the "next"
  256. // offset as records are read doesn't work as the compacted
  257. // records are "missing" and never get "read".
  258. //
  259. // In order to reliably reach the next non-compacted offset we
  260. // jump past the saved lastOffset.
  261. batch.offset = batch.lastOffset + 1
  262. }
  263. }
  264. default:
  265. // Since io.EOF is used by the batch to indicate that there is are
  266. // no more messages to consume, it is crucial that any io.EOF errors
  267. // on the underlying connection are repackaged. Otherwise, the
  268. // caller can't tell the difference between a batch that was fully
  269. // consumed or a batch whose connection is in an error state.
  270. batch.err = dontExpectEOF(err)
  271. }
  272. return
  273. }
  274. func checkTimeoutErr(deadline time.Time) (err error) {
  275. if !deadline.IsZero() && time.Now().After(deadline) {
  276. err = RequestTimedOut
  277. } else {
  278. err = io.EOF
  279. }
  280. return
  281. }