fetch.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "net"
  7. "time"
  8. "github.com/segmentio/kafka-go/protocol"
  9. fetchAPI "github.com/segmentio/kafka-go/protocol/fetch"
  10. )
  11. // FetchRequest represents a request sent to a kafka broker to retrieve records
  12. // from a topic partition.
  13. type FetchRequest struct {
  14. // Address of the kafka broker to send the request to.
  15. Addr net.Addr
  16. // Topic, partition, and offset to retrieve records from. The offset may be
  17. // one of the special FirstOffset or LastOffset constants, in which case the
  18. // request will automatically discover the first or last offset of the
  19. // partition and submit the request for these.
  20. Topic string
  21. Partition int
  22. Offset int64
  23. // Size and time limits of the response returned by the broker.
  24. MinBytes int64
  25. MaxBytes int64
  26. MaxWait time.Duration
  27. // The isolation level for the request.
  28. //
  29. // Defaults to ReadUncommitted.
  30. //
  31. // This field requires the kafka broker to support the Fetch API in version
  32. // 4 or above (otherwise the value is ignored).
  33. IsolationLevel IsolationLevel
  34. }
  35. // FetchResponse represents a response from a kafka broker to a fetch request.
  36. type FetchResponse struct {
  37. // The amount of time that the broker throttled the request.
  38. Throttle time.Duration
  39. // The topic and partition that the response came for (will match the values
  40. // in the request).
  41. Topic string
  42. Partition int
  43. // Informations about the topic partition layout returned from the broker.
  44. //
  45. // LastStableOffset requires the kafka broker to support the Fetch API in
  46. // version 4 or above (otherwise the value is zero).
  47. //
  48. /// LogStartOffset requires the kafka broker to support the Fetch API in
  49. // version 5 or above (otherwise the value is zero).
  50. HighWatermark int64
  51. LastStableOffset int64
  52. LogStartOffset int64
  53. // An error that may have occurred while attempting to fetch the records.
  54. //
  55. // The error contains both the kafka error code, and an error message
  56. // returned by the kafka broker. Programs may use the standard errors.Is
  57. // function to test the error against kafka error codes.
  58. Error error
  59. // The set of records returned in the response.
  60. //
  61. // The program is expected to call the RecordSet's Close method when it
  62. // finished reading the records.
  63. //
  64. // Note that kafka may return record batches that start at an offset before
  65. // the one that was requested. It is the program's responsibility to skip
  66. // the offsets that it is not interested in.
  67. Records RecordReader
  68. }
  69. // Fetch sends a fetch request to a kafka broker and returns the response.
  70. //
  71. // If the broker returned an invalid response with no topics, an error wrapping
  72. // protocol.ErrNoTopic is returned.
  73. //
  74. // If the broker returned an invalid response with no partitions, an error
  75. // wrapping ErrNoPartitions is returned.
  76. func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error) {
  77. timeout := c.timeout(ctx, math.MaxInt64)
  78. maxWait := req.maxWait()
  79. if maxWait < timeout {
  80. timeout = maxWait
  81. }
  82. offset := req.Offset
  83. switch offset {
  84. case FirstOffset, LastOffset:
  85. topic, partition := req.Topic, req.Partition
  86. r, err := c.ListOffsets(ctx, &ListOffsetsRequest{
  87. Addr: req.Addr,
  88. Topics: map[string][]OffsetRequest{
  89. topic: []OffsetRequest{{
  90. Partition: partition,
  91. Timestamp: offset,
  92. }},
  93. },
  94. })
  95. if err != nil {
  96. return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", err)
  97. }
  98. for _, p := range r.Topics[topic] {
  99. if p.Partition == partition {
  100. if p.Error != nil {
  101. return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", p.Error)
  102. }
  103. switch offset {
  104. case FirstOffset:
  105. offset = p.FirstOffset
  106. case LastOffset:
  107. offset = p.LastOffset
  108. }
  109. break
  110. }
  111. }
  112. }
  113. m, err := c.roundTrip(ctx, req.Addr, &fetchAPI.Request{
  114. ReplicaID: -1,
  115. MaxWaitTime: milliseconds(timeout),
  116. MinBytes: int32(req.MinBytes),
  117. MaxBytes: int32(req.MaxBytes),
  118. IsolationLevel: int8(req.IsolationLevel),
  119. SessionID: -1,
  120. SessionEpoch: -1,
  121. Topics: []fetchAPI.RequestTopic{{
  122. Topic: req.Topic,
  123. Partitions: []fetchAPI.RequestPartition{{
  124. Partition: int32(req.Partition),
  125. CurrentLeaderEpoch: -1,
  126. FetchOffset: offset,
  127. LogStartOffset: -1,
  128. PartitionMaxBytes: int32(req.MaxBytes),
  129. }},
  130. }},
  131. })
  132. if err != nil {
  133. return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", err)
  134. }
  135. res := m.(*fetchAPI.Response)
  136. if len(res.Topics) == 0 {
  137. return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", protocol.ErrNoTopic)
  138. }
  139. topic := &res.Topics[0]
  140. if len(topic.Partitions) == 0 {
  141. return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", protocol.ErrNoPartition)
  142. }
  143. partition := &topic.Partitions[0]
  144. ret := &FetchResponse{
  145. Throttle: makeDuration(res.ThrottleTimeMs),
  146. Topic: topic.Topic,
  147. Partition: int(partition.Partition),
  148. Error: makeError(res.ErrorCode, ""),
  149. HighWatermark: partition.HighWatermark,
  150. LastStableOffset: partition.LastStableOffset,
  151. LogStartOffset: partition.LogStartOffset,
  152. Records: partition.RecordSet.Records,
  153. }
  154. if partition.ErrorCode != 0 {
  155. ret.Error = makeError(partition.ErrorCode, "")
  156. }
  157. if ret.Records == nil {
  158. ret.Records = NewRecordReader()
  159. }
  160. return ret, nil
  161. }
  162. func (req *FetchRequest) maxWait() time.Duration {
  163. if req.MaxWait > 0 {
  164. return req.MaxWait
  165. }
  166. return defaultMaxWait
  167. }
  168. type fetchRequestV2 struct {
  169. ReplicaID int32
  170. MaxWaitTime int32
  171. MinBytes int32
  172. Topics []fetchRequestTopicV2
  173. }
  174. func (r fetchRequestV2) size() int32 {
  175. return 4 + 4 + 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
  176. }
  177. func (r fetchRequestV2) writeTo(wb *writeBuffer) {
  178. wb.writeInt32(r.ReplicaID)
  179. wb.writeInt32(r.MaxWaitTime)
  180. wb.writeInt32(r.MinBytes)
  181. wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
  182. }
  183. type fetchRequestTopicV2 struct {
  184. TopicName string
  185. Partitions []fetchRequestPartitionV2
  186. }
  187. func (t fetchRequestTopicV2) size() int32 {
  188. return sizeofString(t.TopicName) +
  189. sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
  190. }
  191. func (t fetchRequestTopicV2) writeTo(wb *writeBuffer) {
  192. wb.writeString(t.TopicName)
  193. wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
  194. }
  195. type fetchRequestPartitionV2 struct {
  196. Partition int32
  197. FetchOffset int64
  198. MaxBytes int32
  199. }
  200. func (p fetchRequestPartitionV2) size() int32 {
  201. return 4 + 8 + 4
  202. }
  203. func (p fetchRequestPartitionV2) writeTo(wb *writeBuffer) {
  204. wb.writeInt32(p.Partition)
  205. wb.writeInt64(p.FetchOffset)
  206. wb.writeInt32(p.MaxBytes)
  207. }
  208. type fetchResponseV2 struct {
  209. ThrottleTime int32
  210. Topics []fetchResponseTopicV2
  211. }
  212. func (r fetchResponseV2) size() int32 {
  213. return 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
  214. }
  215. func (r fetchResponseV2) writeTo(wb *writeBuffer) {
  216. wb.writeInt32(r.ThrottleTime)
  217. wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
  218. }
  219. type fetchResponseTopicV2 struct {
  220. TopicName string
  221. Partitions []fetchResponsePartitionV2
  222. }
  223. func (t fetchResponseTopicV2) size() int32 {
  224. return sizeofString(t.TopicName) +
  225. sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
  226. }
  227. func (t fetchResponseTopicV2) writeTo(wb *writeBuffer) {
  228. wb.writeString(t.TopicName)
  229. wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
  230. }
  231. type fetchResponsePartitionV2 struct {
  232. Partition int32
  233. ErrorCode int16
  234. HighwaterMarkOffset int64
  235. MessageSetSize int32
  236. MessageSet messageSet
  237. }
  238. func (p fetchResponsePartitionV2) size() int32 {
  239. return 4 + 2 + 8 + 4 + p.MessageSet.size()
  240. }
  241. func (p fetchResponsePartitionV2) writeTo(wb *writeBuffer) {
  242. wb.writeInt32(p.Partition)
  243. wb.writeInt16(p.ErrorCode)
  244. wb.writeInt64(p.HighwaterMarkOffset)
  245. wb.writeInt32(p.MessageSetSize)
  246. p.MessageSet.writeTo(wb)
  247. }