|
@@ -13,13 +13,13 @@ import (
|
|
)
|
|
)
|
|
|
|
|
|
type InfluxDBConfig struct {
|
|
type InfluxDBConfig struct {
|
|
- URL string `json:"url"` // 数据库地址
|
|
|
|
- Token string `json:"token"` // 连接Token
|
|
|
|
- Org string `json:"org"` // 组织名称
|
|
|
|
- Bucket string `json:"bucket"` // Bucket
|
|
|
|
- Measurement string `json:"measurement"` // 度量名称
|
|
|
|
- Tags map[string]string `json:"tags"` // 索引Tag
|
|
|
|
- DataFieldName string `json:"data_fields"` // 数据字段名称
|
|
|
|
|
|
+ URL string `json:"url"` // 数据库地址
|
|
|
|
+ Token string `json:"token"` // 连接Token
|
|
|
|
+ Org string `json:"org"` // 组织名称
|
|
|
|
+ Bucket string `json:"bucket"` // Bucket
|
|
|
|
+ Measurement string `json:"measurement"` // 度量名称
|
|
|
|
+ Tags []*KeyValue `json:"tags"` // 索引Tag
|
|
|
|
+ DataFieldName string `json:"data_fields"` // 数据字段名称
|
|
}
|
|
}
|
|
|
|
|
|
// InfluxDBNode influxDB写入节点
|
|
// InfluxDBNode influxDB写入节点
|
|
@@ -40,7 +40,7 @@ func (i *InfluxDBNode) Init(ctx ruleEngine.Context, config string) error {
|
|
i.config = c
|
|
i.config = c
|
|
}
|
|
}
|
|
if i.config.Tags == nil {
|
|
if i.config.Tags == nil {
|
|
- i.config.Tags = make(map[string]string)
|
|
|
|
|
|
+ i.config.Tags = make([]*KeyValue, 0)
|
|
}
|
|
}
|
|
i.pool = grpool.New(10)
|
|
i.pool = grpool.New(10)
|
|
client := influxdb2.NewClient(i.config.URL, i.config.Token)
|
|
client := influxdb2.NewClient(i.config.URL, i.config.Token)
|
|
@@ -49,8 +49,12 @@ func (i *InfluxDBNode) Init(ctx ruleEngine.Context, config string) error {
|
|
}
|
|
}
|
|
|
|
|
|
func (i *InfluxDBNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
|
|
func (i *InfluxDBNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
|
|
|
|
+ var tags map[string]string
|
|
for k, v := range message.MetaData {
|
|
for k, v := range message.MetaData {
|
|
- i.config.Tags[k] = v.(string)
|
|
|
|
|
|
+ tags[k] = v.(string)
|
|
|
|
+ }
|
|
|
|
+ for _, v := range i.config.Tags {
|
|
|
|
+ tags[v.Key] = v.Value
|
|
}
|
|
}
|
|
jsonData, err := gjson.DecodeToJson(message.Data)
|
|
jsonData, err := gjson.DecodeToJson(message.Data)
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -59,7 +63,7 @@ func (i *InfluxDBNode) OnMessage(ctx ruleEngine.Context, message *protocol.Messa
|
|
}
|
|
}
|
|
fields := jsonData.GetMap(i.config.DataFieldName)
|
|
fields := jsonData.GetMap(i.config.DataFieldName)
|
|
if fields != nil {
|
|
if fields != nil {
|
|
- point := influxdb2.NewPoint(i.config.Measurement, i.config.Tags, fields, time.Now())
|
|
|
|
|
|
+ point := influxdb2.NewPoint(i.config.Measurement, tags, fields, time.Now())
|
|
err = i.pool.Add(func() {
|
|
err = i.pool.Add(func() {
|
|
i.writeApi.WritePoint(point)
|
|
i.writeApi.WritePoint(point)
|
|
})
|
|
})
|