influxdb_node.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package nodes
  2. import (
  3. "encoding/json"
  4. "github.com/gogf/gf/encoding/gjson"
  5. "github.com/gogf/gf/os/grpool"
  6. "github.com/gogf/gf/text/gstr"
  7. "github.com/influxdata/influxdb-client-go/v2"
  8. "github.com/influxdata/influxdb-client-go/v2/api"
  9. "sparrow/pkg/protocol"
  10. "sparrow/pkg/ruleEngine"
  11. "sparrow/pkg/server"
  12. "time"
  13. )
  14. type InfluxDBConfig struct {
  15. URL string `json:"url"` // 数据库地址
  16. Token string `json:"token"` // 连接Token
  17. Org string `json:"org"` // 组织名称
  18. Bucket string `json:"bucket"` // Bucket
  19. Measurement string `json:"measurement"` // 度量名称
  20. Tags []*KeyValue `json:"tags"` // 索引Tag
  21. DataFieldName string `json:"data_fields"` // 数据字段名称,支持,分隔
  22. }
  23. // InfluxDBNode influxDB写入节点
  24. // 消息中的MeteData字段会合并到Tags中
  25. type InfluxDBNode struct {
  26. config *InfluxDBConfig
  27. writeApi api.WriteAPI
  28. pool *grpool.Pool
  29. }
  30. func (i *InfluxDBNode) Init(ctx ruleEngine.Context, config string) error {
  31. if config != "" {
  32. c := new(InfluxDBConfig)
  33. err := json.Unmarshal([]byte(config), c)
  34. if err != nil {
  35. return err
  36. }
  37. i.config = c
  38. }
  39. if i.config.Tags == nil {
  40. i.config.Tags = make([]*KeyValue, 0)
  41. }
  42. i.pool = grpool.New(10)
  43. client := influxdb2.NewClient(i.config.URL, i.config.Token)
  44. i.writeApi = client.WriteAPI(i.config.Org, i.config.Bucket)
  45. return nil
  46. }
  47. func (i *InfluxDBNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
  48. tags := make(map[string]string)
  49. for k, v := range message.MetaData {
  50. tags[k] = v.(string)
  51. }
  52. for _, v := range i.config.Tags {
  53. tags[v.Key] = v.Value
  54. }
  55. jsonData, err := gjson.DecodeToJson(message.Data)
  56. if err != nil {
  57. server.Log.Error(err)
  58. return err
  59. }
  60. fields := gstr.Split(i.config.DataFieldName, ",")
  61. for _, f := range fields {
  62. m := jsonData.GetMap(f)
  63. if fields != nil {
  64. point := influxdb2.NewPoint(i.config.Measurement, tags, m, time.Now())
  65. err = i.pool.Add(func() {
  66. i.writeApi.WritePoint(point)
  67. })
  68. if err != nil {
  69. server.Log.Error(err)
  70. return err
  71. }
  72. }
  73. }
  74. ctx.TellSuccess(message)
  75. return nil
  76. }