influxdb_node.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package nodes
  2. import (
  3. "encoding/json"
  4. "github.com/gogf/gf/encoding/gjson"
  5. "github.com/gogf/gf/os/grpool"
  6. "github.com/influxdata/influxdb-client-go/v2"
  7. "github.com/influxdata/influxdb-client-go/v2/api"
  8. "sparrow/pkg/protocol"
  9. "sparrow/pkg/ruleEngine"
  10. "sparrow/pkg/server"
  11. "time"
  12. )
  13. type InfluxDBConfig struct {
  14. URL string `json:"url"` // 数据库地址
  15. Token string `json:"token"` // 连接Token
  16. Org string `json:"org"` // 组织名称
  17. Bucket string `json:"bucket"` // Bucket
  18. Measurement string `json:"measurement"` // 度量名称
  19. Tags []*KeyValue `json:"tags"` // 索引Tag
  20. DataFieldName string `json:"data_fields"` // 数据字段名称
  21. }
  22. // InfluxDBNode influxDB写入节点
  23. // 消息中的MeteData字段会合并到Tags中
  24. type InfluxDBNode struct {
  25. config *InfluxDBConfig
  26. writeApi api.WriteAPI
  27. pool *grpool.Pool
  28. }
  29. func (i *InfluxDBNode) Init(ctx ruleEngine.Context, config string) error {
  30. if config != "" {
  31. c := new(InfluxDBConfig)
  32. err := json.Unmarshal([]byte(config), c)
  33. if err != nil {
  34. return err
  35. }
  36. i.config = c
  37. }
  38. if i.config.Tags == nil {
  39. i.config.Tags = make([]*KeyValue, 0)
  40. }
  41. i.pool = grpool.New(10)
  42. client := influxdb2.NewClient(i.config.URL, i.config.Token)
  43. i.writeApi = client.WriteAPI(i.config.Org, i.config.Bucket)
  44. return nil
  45. }
  46. func (i *InfluxDBNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
  47. var tags map[string]string
  48. for k, v := range message.MetaData {
  49. tags[k] = v.(string)
  50. }
  51. for _, v := range i.config.Tags {
  52. tags[v.Key] = v.Value
  53. }
  54. jsonData, err := gjson.DecodeToJson(message.Data)
  55. if err != nil {
  56. server.Log.Error(err)
  57. return err
  58. }
  59. fields := jsonData.GetMap(i.config.DataFieldName)
  60. if fields != nil {
  61. point := influxdb2.NewPoint(i.config.Measurement, tags, fields, time.Now())
  62. err = i.pool.Add(func() {
  63. i.writeApi.WritePoint(point)
  64. })
  65. if err != nil {
  66. server.Log.Error(err)
  67. return err
  68. }
  69. }
  70. ctx.TellSuccess(message)
  71. return nil
  72. }