package nodes import ( "encoding/json" "github.com/gogf/gf/encoding/gjson" "github.com/gogf/gf/os/grpool" "github.com/influxdata/influxdb-client-go/v2" "github.com/influxdata/influxdb-client-go/v2/api" "sparrow/pkg/protocol" "sparrow/pkg/ruleEngine" "sparrow/pkg/server" "time" ) type InfluxDBConfig struct { URL string `json:"url"` // 数据库地址 Token string `json:"token"` // 连接Token Org string `json:"org"` // 组织名称 Bucket string `json:"bucket"` // Bucket Measurement string `json:"measurement"` // 度量名称 Tags map[string]string `json:"tags"` // 索引Tag DataFieldName string `json:"data_fields"` // 数据字段名称 } // InfluxDBNode influxDB写入节点 // 消息中的MeteData字段会合并到Tags中 type InfluxDBNode struct { config *InfluxDBConfig writeApi api.WriteAPI pool *grpool.Pool } func (i *InfluxDBNode) Init(ctx ruleEngine.Context, config string) error { if config != "" { c := new(InfluxDBConfig) err := json.Unmarshal([]byte(config), c) if err != nil { return err } i.config = c } i.pool = grpool.New(10) client := influxdb2.NewClient(i.config.URL, i.config.Token) i.writeApi = client.WriteAPI(i.config.Org, i.config.Bucket) return nil } func (i *InfluxDBNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error { for k, v := range message.MetaData { i.config.Tags[k] = v.(string) } jsonData, err := gjson.DecodeToJson(message.Data) if err != nil { server.Log.Error(err) return err } fields := jsonData.GetMap(i.config.DataFieldName) if fields != nil { point := influxdb2.NewPoint(i.config.Measurement, i.config.Tags, fields, time.Now()) err = i.pool.Add(func() { i.writeApi.WritePoint(point) }) if err != nil { server.Log.Error(err) return err } } ctx.TellSuccess(message) return nil }