influxdb_node.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package nodes
  2. import (
  3. "encoding/json"
  4. "github.com/gogf/gf/encoding/gjson"
  5. "github.com/gogf/gf/os/grpool"
  6. "github.com/influxdata/influxdb-client-go/v2"
  7. "github.com/influxdata/influxdb-client-go/v2/api"
  8. "sparrow/pkg/protocol"
  9. "sparrow/pkg/ruleEngine"
  10. "sparrow/pkg/server"
  11. "time"
  12. )
  13. type InfluxDBConfig struct {
  14. URL string `json:"url"`
  15. Token string `json:"token"`
  16. Org string `json:"org"`
  17. Bucket string `json:"bucket"`
  18. Measurement string `json:"measurement"`
  19. Tags map[string]string `json:"tags"`
  20. DataFieldName string `json:"data_fields"` // 数据字段名称
  21. }
  22. type InfluxDBNode struct {
  23. config *InfluxDBConfig
  24. writeApi api.WriteAPI
  25. pool *grpool.Pool
  26. }
  27. func (i *InfluxDBNode) Init(ctx ruleEngine.Context, config string) error {
  28. if config != "" {
  29. c := new(InfluxDBConfig)
  30. err := json.Unmarshal([]byte(config), c)
  31. if err != nil {
  32. return err
  33. }
  34. i.config = c
  35. }
  36. server.Log.Debugf("拿到配置;%v", i.config)
  37. i.pool = grpool.New(10)
  38. client := influxdb2.NewClient(i.config.URL, i.config.Token)
  39. i.writeApi = client.WriteAPI(i.config.Org, i.config.Bucket)
  40. return nil
  41. }
  42. func (i *InfluxDBNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
  43. jsonData, err := gjson.DecodeToJson(message.Data)
  44. server.Log.Debugf("拿到数据:%s", message.Data)
  45. if err != nil {
  46. server.Log.Error(err)
  47. return err
  48. }
  49. fields := jsonData.GetMap(i.config.DataFieldName)
  50. server.Log.Debugf("拿到数据:%", fields)
  51. point := influxdb2.NewPoint(i.config.Measurement, i.config.Tags, fields, time.Now())
  52. err = i.pool.Add(func() {
  53. i.writeApi.WritePoint(point)
  54. })
  55. if err != nil {
  56. server.Log.Error(err)
  57. return err
  58. }
  59. ctx.TellSuccess(message)
  60. return nil
  61. }