|
@@ -13,12 +13,12 @@ import (
|
|
|
)
|
|
|
|
|
|
type InfluxDBConfig struct {
|
|
|
- URL string `json:"url"`
|
|
|
- Token string `json:"token"`
|
|
|
- Org string `json:"org"`
|
|
|
- Bucket string `json:"bucket"`
|
|
|
- Measurement string `json:"measurement"`
|
|
|
- Tags map[string]string `json:"tags"`
|
|
|
+ URL string `json:"url"` // 数据库地址
|
|
|
+ Token string `json:"token"` // 连接Token
|
|
|
+ Org string `json:"org"` // 组织名称
|
|
|
+ Bucket string `json:"bucket"` // Bucket
|
|
|
+ Measurement string `json:"measurement"` // 度量名称
|
|
|
+ Tags map[string]string `json:"tags"` // 索引Tag
|
|
|
DataFieldName string `json:"data_fields"` // 数据字段名称
|
|
|
}
|
|
|
|
|
@@ -55,13 +55,15 @@ func (i *InfluxDBNode) OnMessage(ctx ruleEngine.Context, message *protocol.Messa
|
|
|
return err
|
|
|
}
|
|
|
fields := jsonData.GetMap(i.config.DataFieldName)
|
|
|
- point := influxdb2.NewPoint(i.config.Measurement, i.config.Tags, fields, time.Now())
|
|
|
- err = i.pool.Add(func() {
|
|
|
- i.writeApi.WritePoint(point)
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- server.Log.Error(err)
|
|
|
- return err
|
|
|
+ if fields != nil {
|
|
|
+ point := influxdb2.NewPoint(i.config.Measurement, i.config.Tags, fields, time.Now())
|
|
|
+ err = i.pool.Add(func() {
|
|
|
+ i.writeApi.WritePoint(point)
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ server.Log.Error(err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
}
|
|
|
ctx.TellSuccess(message)
|
|
|
return nil
|