|
@@ -37,6 +37,7 @@ func (i *InfluxDBNode) Init(ctx ruleEngine.Context, config string) error {
|
|
|
}
|
|
|
i.config = c
|
|
|
}
|
|
|
+ server.Log.Debugf("拿到配置;%v", i.config)
|
|
|
i.pool = grpool.New(10)
|
|
|
client := influxdb2.NewClient(i.config.URL, i.config.Token)
|
|
|
i.writeApi = client.WriteAPI(i.config.Org, i.config.Bucket)
|
|
@@ -45,11 +46,14 @@ func (i *InfluxDBNode) Init(ctx ruleEngine.Context, config string) error {
|
|
|
|
|
|
func (i *InfluxDBNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
|
|
|
jsonData, err := gjson.DecodeToJson(message.Data)
|
|
|
+ server.Log.Debugf("拿到数据:%s", message.Data)
|
|
|
if err != nil {
|
|
|
server.Log.Error(err)
|
|
|
return err
|
|
|
}
|
|
|
- point := influxdb2.NewPoint(i.config.Measurement, i.config.Tags, jsonData.GetMap(i.config.DataFieldName), time.Now())
|
|
|
+ fields := jsonData.GetMap(i.config.DataFieldName)
|
|
|
+ server.Log.Debugf("拿到数据:%", fields)
|
|
|
+ point := influxdb2.NewPoint(i.config.Measurement, i.config.Tags, fields, time.Now())
|
|
|
err = i.pool.Add(func() {
|
|
|
i.writeApi.WritePoint(point)
|
|
|
})
|