123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- package listoffsets
- import (
- "sort"
- "github.com/segmentio/kafka-go/protocol"
- )
- func init() {
- protocol.Register(&Request{}, &Response{})
- }
- type Request struct {
- ReplicaID int32 `kafka:"min=v1,max=v5"`
- IsolationLevel int8 `kafka:"min=v2,max=v5"`
- Topics []RequestTopic `kafka:"min=v1,max=v5"`
- }
- type RequestTopic struct {
- Topic string `kafka:"min=v1,max=v5"`
- Partitions []RequestPartition `kafka:"min=v1,max=v5"`
- }
- type RequestPartition struct {
- Partition int32 `kafka:"min=v1,max=v5"`
- CurrentLeaderEpoch int32 `kafka:"min=v4,max=v5"`
- Timestamp int64 `kafka:"min=v1,max=v5"`
- // v0 of the API predates kafka 0.10, and doesn't make much sense to
- // use so we chose not to support it. It had this extra field to limit
- // the number of offsets returned, which has been removed in v1.
- //
- // MaxNumOffsets int32 `kafka:"min=v0,max=v0"`
- }
- func (r *Request) ApiKey() protocol.ApiKey { return protocol.ListOffsets }
- func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
- // Expects r to be a request that was returned by Map, will likely panic
- // or produce the wrong result if that's not the case.
- partition := r.Topics[0].Partitions[0].Partition
- topic := r.Topics[0].Topic
- for _, p := range cluster.Topics[topic].Partitions {
- if p.ID == partition {
- return cluster.Brokers[p.Leader], nil
- }
- }
- return protocol.Broker{ID: -1}, nil
- }
- func (r *Request) Split(cluster protocol.Cluster) ([]protocol.Message, protocol.Merger, error) {
- // Because kafka refuses to answer ListOffsets requests containing multiple
- // entries of unique topic/partition pairs, we submit multiple requests on
- // the wire and merge their results back.
- //
- // ListOffsets requests also need to be sent to partition leaders, to keep
- // the logic simple we simply split each offset request into a single
- // message. This may cause a bit more requests to be sent on the wire but
- // it keeps the code sane, we can still optimize the aggregation mechanism
- // later if it becomes a problem.
- //
- // Really the idea here is to shield applications from having to deal with
- // the limitation of the kafka server, so they can request any combinations
- // of topic/partition/offsets.
- requests := make([]Request, 0, 2*len(r.Topics))
- for _, t := range r.Topics {
- for _, p := range t.Partitions {
- requests = append(requests, Request{
- ReplicaID: r.ReplicaID,
- IsolationLevel: r.IsolationLevel,
- Topics: []RequestTopic{{
- Topic: t.Topic,
- Partitions: []RequestPartition{{
- Partition: p.Partition,
- CurrentLeaderEpoch: p.CurrentLeaderEpoch,
- Timestamp: p.Timestamp,
- }},
- }},
- })
- }
- }
- messages := make([]protocol.Message, len(requests))
- for i := range requests {
- messages[i] = &requests[i]
- }
- return messages, new(Response), nil
- }
- type Response struct {
- ThrottleTimeMs int32 `kafka:"min=v2,max=v5"`
- Topics []ResponseTopic `kafka:"min=v1,max=v5"`
- }
- type ResponseTopic struct {
- Topic string `kafka:"min=v1,max=v5"`
- Partitions []ResponsePartition `kafka:"min=v1,max=v5"`
- }
- type ResponsePartition struct {
- Partition int32 `kafka:"min=v1,max=v5"`
- ErrorCode int16 `kafka:"min=v1,max=v5"`
- Timestamp int64 `kafka:"min=v1,max=v5"`
- Offset int64 `kafka:"min=v1,max=v5"`
- LeaderEpoch int32 `kafka:"min=v4,max=v5"`
- }
- func (r *Response) ApiKey() protocol.ApiKey { return protocol.ListOffsets }
- func (r *Response) Merge(requests []protocol.Message, results []interface{}) (protocol.Message, error) {
- type topicPartition struct {
- topic string
- partition int32
- }
- // Kafka doesn't always return the timestamp in the response, for example
- // when the request sends -2 (for the first offset) it always returns -1,
- // probably to indicate that the timestamp is unknown. This means that we
- // can't correlate the requests and responses based on their timestamps,
- // the primary key is the topic/partition pair.
- //
- // To make the API a bit friendly, we reconstructing an index of topic
- // partitions to the timestamps that were requested, and override the
- // timestamp value in the response.
- timestamps := make([]map[topicPartition]int64, len(requests))
- for i, m := range requests {
- req := m.(*Request)
- ts := make(map[topicPartition]int64, len(req.Topics))
- for _, t := range req.Topics {
- for _, p := range t.Partitions {
- ts[topicPartition{
- topic: t.Topic,
- partition: p.Partition,
- }] = p.Timestamp
- }
- }
- timestamps[i] = ts
- }
- topics := make(map[string][]ResponsePartition)
- errors := 0
- for i, res := range results {
- m, err := protocol.Result(res)
- if err != nil {
- for _, t := range requests[i].(*Request).Topics {
- partitions := topics[t.Topic]
- for _, p := range t.Partitions {
- partitions = append(partitions, ResponsePartition{
- Partition: p.Partition,
- ErrorCode: -1, // UNKNOWN, can we do better?
- Timestamp: -1,
- Offset: -1,
- LeaderEpoch: -1,
- })
- }
- topics[t.Topic] = partitions
- }
- errors++
- continue
- }
- response := m.(*Response)
- if r.ThrottleTimeMs < response.ThrottleTimeMs {
- r.ThrottleTimeMs = response.ThrottleTimeMs
- }
- for _, t := range response.Topics {
- for _, p := range t.Partitions {
- if timestamp, ok := timestamps[i][topicPartition{
- topic: t.Topic,
- partition: p.Partition,
- }]; ok {
- p.Timestamp = timestamp
- }
- topics[t.Topic] = append(topics[t.Topic], p)
- }
- }
- }
- if errors > 0 && errors == len(results) {
- _, err := protocol.Result(results[0])
- return nil, err
- }
- r.Topics = make([]ResponseTopic, 0, len(topics))
- for topicName, partitions := range topics {
- r.Topics = append(r.Topics, ResponseTopic{
- Topic: topicName,
- Partitions: partitions,
- })
- }
- sort.Slice(r.Topics, func(i, j int) bool {
- return r.Topics[i].Topic < r.Topics[j].Topic
- })
- for _, t := range r.Topics {
- sort.Slice(t.Partitions, func(i, j int) bool {
- p1 := &t.Partitions[i]
- p2 := &t.Partitions[j]
- if p1.Partition != p2.Partition {
- return p1.Partition < p2.Partition
- }
- return p1.Offset < p2.Offset
- })
- }
- return r, nil
- }
- var (
- _ protocol.BrokerMessage = (*Request)(nil)
- _ protocol.Splitter = (*Request)(nil)
- _ protocol.Merger = (*Response)(nil)
- )
|