12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- package nodes
- import (
- "context"
- "encoding/json"
- "github.com/segmentio/kafka-go"
- "sparrow/pkg/protocol"
- "sparrow/pkg/ruleEngine"
- )
- // KafkaNode Kafka Node sends messages to Kafka brokers.
- // Expects messages with any message type.
- // Will send record via Kafka producer to Kafka server
- type KafkaNode struct {
- config *KafkaNodeConfiguration
- producer *kafka.Conn
- }
- func (k *KafkaNode) Init(ctx ruleEngine.Context, config string) error {
- if config == "" {
- k.config = &KafkaNodeConfiguration{
- TopicPattern: "",
- BootstrapServer: "",
- RetryTimes: 0,
- BatchSize: 0,
- LocallyBufferTime: 0,
- MaxSizeClientBuffer: 0,
- MetaData: nil,
- AckNumber: 0,
- }
- } else {
- c := new(KafkaNodeConfiguration)
- err := json.Unmarshal([]byte(config), c)
- if err != nil {
- return err
- }
- k.config = c
- }
- conn, err := kafka.DialLeader(context.Background(), "tcp", k.config.BootstrapServer, k.config.TopicPattern, 0)
- if err != nil {
- return err
- }
- k.producer = conn
- return nil
- }
- func (k *KafkaNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
- panic("implement me")
- }
- // KafkaNodeConfiguration 配置信息
- type KafkaNodeConfiguration struct {
- TopicPattern string `json:"topic_pattern"`
- BootstrapServer string `json:"bootstrap_server"`
- RetryTimes int `json:"retry_times"` // if fails retry times
- BatchSize int64 `json:"batch_size"` // produces batch size in bytes
- LocallyBufferTime int64 `json:"locally_buffer_time"` // time to buffer locally(ms)
- MaxSizeClientBuffer int64 `json:"max_size_client_buffer"` // client buffer max size in bytes
- MetaData map[string]string `json:"meta_data"`
- AckNumber int `json:"ack_number"` // number of ack
- }
|