kafka_node.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package nodes
  2. import (
  3. "context"
  4. "encoding/json"
  5. "github.com/segmentio/kafka-go"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/ruleEngine"
  8. )
  9. // KafkaNode Kafka Node sends messages to Kafka brokers.
  10. // Expects messages with any message type.
  11. // Will send record via Kafka producer to Kafka server
  12. type KafkaNode struct {
  13. config *KafkaNodeConfiguration
  14. producer *kafka.Conn
  15. }
  16. func (k *KafkaNode) Init(ctx ruleEngine.Context, config string) error {
  17. if config == "" {
  18. k.config = &KafkaNodeConfiguration{
  19. TopicPattern: "",
  20. BootstrapServer: "",
  21. RetryTimes: 0,
  22. BatchSize: 0,
  23. LocallyBufferTime: 0,
  24. MaxSizeClientBuffer: 0,
  25. MetaData: nil,
  26. AckNumber: 0,
  27. }
  28. } else {
  29. c := new(KafkaNodeConfiguration)
  30. err := json.Unmarshal([]byte(config), c)
  31. if err != nil {
  32. return err
  33. }
  34. k.config = c
  35. }
  36. conn, err := kafka.DialLeader(context.Background(), "tcp", k.config.BootstrapServer, k.config.TopicPattern, 0)
  37. if err != nil {
  38. return err
  39. }
  40. k.producer = conn
  41. return nil
  42. }
  43. func (k *KafkaNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
  44. panic("implement me")
  45. }
  46. // KafkaNodeConfiguration 配置信息
  47. type KafkaNodeConfiguration struct {
  48. TopicPattern string `json:"topic_pattern"`
  49. BootstrapServer string `json:"bootstrap_server"`
  50. RetryTimes int `json:"retry_times"` // if fails retry times
  51. BatchSize int64 `json:"batch_size"` // produces batch size in bytes
  52. LocallyBufferTime int64 `json:"locally_buffer_time"` // time to buffer locally(ms)
  53. MaxSizeClientBuffer int64 `json:"max_size_client_buffer"` // client buffer max size in bytes
  54. MetaData map[string]string `json:"meta_data"`
  55. AckNumber int `json:"ack_number"` // number of ack
  56. }