|
@@ -22,6 +22,8 @@ type InfluxDBConfig struct {
|
|
|
DataFieldName string `json:"data_fields"` // 数据字段名称
|
|
|
}
|
|
|
|
|
|
+// InfluxDBNode influxDB写入节点
|
|
|
+// 消息中的MeteData字段会合并到Tags中
|
|
|
type InfluxDBNode struct {
|
|
|
config *InfluxDBConfig
|
|
|
writeApi api.WriteAPI
|
|
@@ -37,7 +39,6 @@ func (i *InfluxDBNode) Init(ctx ruleEngine.Context, config string) error {
|
|
|
}
|
|
|
i.config = c
|
|
|
}
|
|
|
- server.Log.Debugf("拿到配置;%v", i.config)
|
|
|
i.pool = grpool.New(10)
|
|
|
client := influxdb2.NewClient(i.config.URL, i.config.Token)
|
|
|
i.writeApi = client.WriteAPI(i.config.Org, i.config.Bucket)
|
|
@@ -45,14 +46,15 @@ func (i *InfluxDBNode) Init(ctx ruleEngine.Context, config string) error {
|
|
|
}
|
|
|
|
|
|
func (i *InfluxDBNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
|
|
|
+ for k, v := range message.MetaData {
|
|
|
+ i.config.Tags[k] = v.(string)
|
|
|
+ }
|
|
|
jsonData, err := gjson.DecodeToJson(message.Data)
|
|
|
- server.Log.Debugf("拿到数据:%s", message.Data)
|
|
|
if err != nil {
|
|
|
server.Log.Error(err)
|
|
|
return err
|
|
|
}
|
|
|
fields := jsonData.GetMap(i.config.DataFieldName)
|
|
|
- server.Log.Debugf("拿到数据:%", fields)
|
|
|
point := influxdb2.NewPoint(i.config.Measurement, i.config.Tags, fields, time.Now())
|
|
|
err = i.pool.Add(func() {
|
|
|
i.writeApi.WritePoint(point)
|