package nodes import ( "encoding/json" "github.com/gogf/gf/encoding/gjson" "github.com/gogf/gf/os/grpool" "github.com/influxdata/influxdb-client-go/v2" "github.com/influxdata/influxdb-client-go/v2/api" "sparrow/pkg/protocol" "sparrow/pkg/ruleEngine" "sparrow/pkg/server" "time" ) type InfluxDBConfig struct { URL string `json:"url"` Token string `json:"token"` Org string `json:"org"` Bucket string `json:"bucket"` Measurement string `json:"measurement"` Tags map[string]string `json:"tags"` DataFieldName string `json:"data_fields"` // 数据字段名称 } type InfluxDBNode struct { config *InfluxDBConfig writeApi api.WriteAPI pool *grpool.Pool } func (i *InfluxDBNode) Init(ctx ruleEngine.Context, config string) error { if config != "" { c := new(InfluxDBConfig) err := json.Unmarshal([]byte(config), c) if err != nil { return err } i.config = c } i.pool = grpool.New(10) client := influxdb2.NewClient(i.config.URL, i.config.Token) i.writeApi = client.WriteAPI(i.config.Org, i.config.Bucket) return nil } func (i *InfluxDBNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error { jsonData, err := gjson.DecodeToJson(message.Data) if err != nil { server.Log.Error(err) return err } point := influxdb2.NewPoint(i.config.Measurement, i.config.Tags, jsonData.GetMap(i.config.DataFieldName), time.Now()) err = i.pool.Add(func() { i.writeApi.WritePoint(point) }) if err != nil { server.Log.Error(err) return err } ctx.TellSuccess(message) return nil }