rawproduce.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package rawproduce
  2. import (
  3. "fmt"
  4. "github.com/segmentio/kafka-go/protocol"
  5. "github.com/segmentio/kafka-go/protocol/produce"
  6. )
  7. func init() {
  8. // Register a type override so that raw produce requests will be encoded with the correct type.
  9. req := &Request{}
  10. protocol.RegisterOverride(req, &produce.Response{}, req.TypeKey())
  11. }
  12. type Request struct {
  13. TransactionalID string `kafka:"min=v3,max=v8,nullable"`
  14. Acks int16 `kafka:"min=v0,max=v8"`
  15. Timeout int32 `kafka:"min=v0,max=v8"`
  16. Topics []RequestTopic `kafka:"min=v0,max=v8"`
  17. }
  18. func (r *Request) ApiKey() protocol.ApiKey { return protocol.Produce }
  19. func (r *Request) TypeKey() protocol.OverrideTypeKey { return protocol.RawProduceOverride }
  20. func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
  21. broker := protocol.Broker{ID: -1}
  22. for i := range r.Topics {
  23. t := &r.Topics[i]
  24. topic, ok := cluster.Topics[t.Topic]
  25. if !ok {
  26. return broker, NewError(protocol.NewErrNoTopic(t.Topic))
  27. }
  28. for j := range t.Partitions {
  29. p := &t.Partitions[j]
  30. partition, ok := topic.Partitions[p.Partition]
  31. if !ok {
  32. return broker, NewError(protocol.NewErrNoPartition(t.Topic, p.Partition))
  33. }
  34. if b, ok := cluster.Brokers[partition.Leader]; !ok {
  35. return broker, NewError(protocol.NewErrNoLeader(t.Topic, p.Partition))
  36. } else if broker.ID < 0 {
  37. broker = b
  38. } else if b.ID != broker.ID {
  39. return broker, NewError(fmt.Errorf("mismatching leaders (%d!=%d)", b.ID, broker.ID))
  40. }
  41. }
  42. }
  43. return broker, nil
  44. }
  45. func (r *Request) HasResponse() bool {
  46. return r.Acks != 0
  47. }
  48. type RequestTopic struct {
  49. Topic string `kafka:"min=v0,max=v8"`
  50. Partitions []RequestPartition `kafka:"min=v0,max=v8"`
  51. }
  52. type RequestPartition struct {
  53. Partition int32 `kafka:"min=v0,max=v8"`
  54. RecordSet protocol.RawRecordSet `kafka:"min=v0,max=v8"`
  55. }
  56. var (
  57. _ protocol.BrokerMessage = (*Request)(nil)
  58. )
  59. type Error struct {
  60. Err error
  61. }
  62. func NewError(err error) *Error {
  63. return &Error{Err: err}
  64. }
  65. func (e *Error) Error() string {
  66. return fmt.Sprintf("fetch request error: %v", e.Err)
  67. }
  68. func (e *Error) Unwrap() error {
  69. return e.Err
  70. }