12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- package nodes
- import (
- "encoding/json"
- "github.com/gogf/gf/encoding/gjson"
- "github.com/gogf/gf/os/grpool"
- "github.com/influxdata/influxdb-client-go/v2"
- "github.com/influxdata/influxdb-client-go/v2/api"
- "sparrow/pkg/protocol"
- "sparrow/pkg/ruleEngine"
- "sparrow/pkg/server"
- "time"
- )
- type InfluxDBConfig struct {
- URL string `json:"url"` // 数据库地址
- Token string `json:"token"` // 连接Token
- Org string `json:"org"` // 组织名称
- Bucket string `json:"bucket"` // Bucket
- Measurement string `json:"measurement"` // 度量名称
- Tags map[string]string `json:"tags"` // 索引Tag
- DataFieldName string `json:"data_fields"` // 数据字段名称
- }
- // InfluxDBNode influxDB写入节点
- // 消息中的MeteData字段会合并到Tags中
- type InfluxDBNode struct {
- config *InfluxDBConfig
- writeApi api.WriteAPI
- pool *grpool.Pool
- }
- func (i *InfluxDBNode) Init(ctx ruleEngine.Context, config string) error {
- if config != "" {
- c := new(InfluxDBConfig)
- err := json.Unmarshal([]byte(config), c)
- if err != nil {
- return err
- }
- i.config = c
- }
- i.pool = grpool.New(10)
- client := influxdb2.NewClient(i.config.URL, i.config.Token)
- i.writeApi = client.WriteAPI(i.config.Org, i.config.Bucket)
- return nil
- }
- func (i *InfluxDBNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
- for k, v := range message.MetaData {
- i.config.Tags[k] = v.(string)
- }
- jsonData, err := gjson.DecodeToJson(message.Data)
- if err != nil {
- server.Log.Error(err)
- return err
- }
- fields := jsonData.GetMap(i.config.DataFieldName)
- if fields != nil {
- point := influxdb2.NewPoint(i.config.Measurement, i.config.Tags, fields, time.Now())
- err = i.pool.Add(func() {
- i.writeApi.WritePoint(point)
- })
- if err != nil {
- server.Log.Error(err)
- return err
- }
- }
- ctx.TellSuccess(message)
- return nil
- }
|