listoffsets.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package listoffsets
  2. import (
  3. "sort"
  4. "github.com/segmentio/kafka-go/protocol"
  5. )
  6. func init() {
  7. protocol.Register(&Request{}, &Response{})
  8. }
  9. type Request struct {
  10. ReplicaID int32 `kafka:"min=v1,max=v5"`
  11. IsolationLevel int8 `kafka:"min=v2,max=v5"`
  12. Topics []RequestTopic `kafka:"min=v1,max=v5"`
  13. }
  14. type RequestTopic struct {
  15. Topic string `kafka:"min=v1,max=v5"`
  16. Partitions []RequestPartition `kafka:"min=v1,max=v5"`
  17. }
  18. type RequestPartition struct {
  19. Partition int32 `kafka:"min=v1,max=v5"`
  20. CurrentLeaderEpoch int32 `kafka:"min=v4,max=v5"`
  21. Timestamp int64 `kafka:"min=v1,max=v5"`
  22. // v0 of the API predates kafka 0.10, and doesn't make much sense to
  23. // use so we chose not to support it. It had this extra field to limit
  24. // the number of offsets returned, which has been removed in v1.
  25. //
  26. // MaxNumOffsets int32 `kafka:"min=v0,max=v0"`
  27. }
  28. func (r *Request) ApiKey() protocol.ApiKey { return protocol.ListOffsets }
  29. func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
  30. // Expects r to be a request that was returned by Map, will likely panic
  31. // or produce the wrong result if that's not the case.
  32. partition := r.Topics[0].Partitions[0].Partition
  33. topic := r.Topics[0].Topic
  34. for _, p := range cluster.Topics[topic].Partitions {
  35. if p.ID == partition {
  36. return cluster.Brokers[p.Leader], nil
  37. }
  38. }
  39. return protocol.Broker{ID: -1}, nil
  40. }
  41. func (r *Request) Split(cluster protocol.Cluster) ([]protocol.Message, protocol.Merger, error) {
  42. // Because kafka refuses to answer ListOffsets requests containing multiple
  43. // entries of unique topic/partition pairs, we submit multiple requests on
  44. // the wire and merge their results back.
  45. //
  46. // ListOffsets requests also need to be sent to partition leaders, to keep
  47. // the logic simple we simply split each offset request into a single
  48. // message. This may cause a bit more requests to be sent on the wire but
  49. // it keeps the code sane, we can still optimize the aggregation mechanism
  50. // later if it becomes a problem.
  51. //
  52. // Really the idea here is to shield applications from having to deal with
  53. // the limitation of the kafka server, so they can request any combinations
  54. // of topic/partition/offsets.
  55. requests := make([]Request, 0, 2*len(r.Topics))
  56. for _, t := range r.Topics {
  57. for _, p := range t.Partitions {
  58. requests = append(requests, Request{
  59. ReplicaID: r.ReplicaID,
  60. IsolationLevel: r.IsolationLevel,
  61. Topics: []RequestTopic{{
  62. Topic: t.Topic,
  63. Partitions: []RequestPartition{{
  64. Partition: p.Partition,
  65. CurrentLeaderEpoch: p.CurrentLeaderEpoch,
  66. Timestamp: p.Timestamp,
  67. }},
  68. }},
  69. })
  70. }
  71. }
  72. messages := make([]protocol.Message, len(requests))
  73. for i := range requests {
  74. messages[i] = &requests[i]
  75. }
  76. return messages, new(Response), nil
  77. }
  78. type Response struct {
  79. ThrottleTimeMs int32 `kafka:"min=v2,max=v5"`
  80. Topics []ResponseTopic `kafka:"min=v1,max=v5"`
  81. }
  82. type ResponseTopic struct {
  83. Topic string `kafka:"min=v1,max=v5"`
  84. Partitions []ResponsePartition `kafka:"min=v1,max=v5"`
  85. }
  86. type ResponsePartition struct {
  87. Partition int32 `kafka:"min=v1,max=v5"`
  88. ErrorCode int16 `kafka:"min=v1,max=v5"`
  89. Timestamp int64 `kafka:"min=v1,max=v5"`
  90. Offset int64 `kafka:"min=v1,max=v5"`
  91. LeaderEpoch int32 `kafka:"min=v4,max=v5"`
  92. }
  93. func (r *Response) ApiKey() protocol.ApiKey { return protocol.ListOffsets }
  94. func (r *Response) Merge(requests []protocol.Message, results []interface{}) (protocol.Message, error) {
  95. type topicPartition struct {
  96. topic string
  97. partition int32
  98. }
  99. // Kafka doesn't always return the timestamp in the response, for example
  100. // when the request sends -2 (for the first offset) it always returns -1,
  101. // probably to indicate that the timestamp is unknown. This means that we
  102. // can't correlate the requests and responses based on their timestamps,
  103. // the primary key is the topic/partition pair.
  104. //
  105. // To make the API a bit friendly, we reconstructing an index of topic
  106. // partitions to the timestamps that were requested, and override the
  107. // timestamp value in the response.
  108. timestamps := make([]map[topicPartition]int64, len(requests))
  109. for i, m := range requests {
  110. req := m.(*Request)
  111. ts := make(map[topicPartition]int64, len(req.Topics))
  112. for _, t := range req.Topics {
  113. for _, p := range t.Partitions {
  114. ts[topicPartition{
  115. topic: t.Topic,
  116. partition: p.Partition,
  117. }] = p.Timestamp
  118. }
  119. }
  120. timestamps[i] = ts
  121. }
  122. topics := make(map[string][]ResponsePartition)
  123. errors := 0
  124. for i, res := range results {
  125. m, err := protocol.Result(res)
  126. if err != nil {
  127. for _, t := range requests[i].(*Request).Topics {
  128. partitions := topics[t.Topic]
  129. for _, p := range t.Partitions {
  130. partitions = append(partitions, ResponsePartition{
  131. Partition: p.Partition,
  132. ErrorCode: -1, // UNKNOWN, can we do better?
  133. Timestamp: -1,
  134. Offset: -1,
  135. LeaderEpoch: -1,
  136. })
  137. }
  138. topics[t.Topic] = partitions
  139. }
  140. errors++
  141. continue
  142. }
  143. response := m.(*Response)
  144. if r.ThrottleTimeMs < response.ThrottleTimeMs {
  145. r.ThrottleTimeMs = response.ThrottleTimeMs
  146. }
  147. for _, t := range response.Topics {
  148. for _, p := range t.Partitions {
  149. if timestamp, ok := timestamps[i][topicPartition{
  150. topic: t.Topic,
  151. partition: p.Partition,
  152. }]; ok {
  153. p.Timestamp = timestamp
  154. }
  155. topics[t.Topic] = append(topics[t.Topic], p)
  156. }
  157. }
  158. }
  159. if errors > 0 && errors == len(results) {
  160. _, err := protocol.Result(results[0])
  161. return nil, err
  162. }
  163. r.Topics = make([]ResponseTopic, 0, len(topics))
  164. for topicName, partitions := range topics {
  165. r.Topics = append(r.Topics, ResponseTopic{
  166. Topic: topicName,
  167. Partitions: partitions,
  168. })
  169. }
  170. sort.Slice(r.Topics, func(i, j int) bool {
  171. return r.Topics[i].Topic < r.Topics[j].Topic
  172. })
  173. for _, t := range r.Topics {
  174. sort.Slice(t.Partitions, func(i, j int) bool {
  175. p1 := &t.Partitions[i]
  176. p2 := &t.Partitions[j]
  177. if p1.Partition != p2.Partition {
  178. return p1.Partition < p2.Partition
  179. }
  180. return p1.Offset < p2.Offset
  181. })
  182. }
  183. return r, nil
  184. }
  185. var (
  186. _ protocol.BrokerMessage = (*Request)(nil)
  187. _ protocol.Splitter = (*Request)(nil)
  188. _ protocol.Merger = (*Response)(nil)
  189. )