package nodes import ( "context" "encoding/json" "github.com/segmentio/kafka-go" "sparrow/pkg/protocol" "sparrow/pkg/ruleEngine" ) // KafkaNode Kafka Node sends messages to Kafka brokers. // Expects messages with any message type. // Will send record via Kafka producer to Kafka server type KafkaNode struct { config *KafkaNodeConfiguration producer *kafka.Conn } func (k *KafkaNode) Init(ctx ruleEngine.Context, config string) error { if config == "" { k.config = &KafkaNodeConfiguration{ TopicPattern: "", BootstrapServer: "", RetryTimes: 0, BatchSize: 0, LocallyBufferTime: 0, MaxSizeClientBuffer: 0, MetaData: nil, AckNumber: 0, } } else { c := new(KafkaNodeConfiguration) err := json.Unmarshal([]byte(config), c) if err != nil { return err } k.config = c } conn, err := kafka.DialLeader(context.Background(), "tcp", k.config.BootstrapServer, k.config.TopicPattern, 0) if err != nil { return err } k.producer = conn return nil } func (k *KafkaNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error { panic("implement me") } // KafkaNodeConfiguration 配置信息 type KafkaNodeConfiguration struct { TopicPattern string `json:"topic_pattern"` BootstrapServer string `json:"bootstrap_server"` RetryTimes int `json:"retry_times"` // if fails retry times BatchSize int64 `json:"batch_size"` // produces batch size in bytes LocallyBufferTime int64 `json:"locally_buffer_time"` // time to buffer locally(ms) MaxSizeClientBuffer int64 `json:"max_size_client_buffer"` // client buffer max size in bytes MetaData map[string]string `json:"meta_data"` AckNumber int `json:"ack_number"` // number of ack }