浏览代码

增加influxdb节点

lijian 2 年之前
父节点
当前提交
befaa11bbb
共有 3 个文件被更改,包括 66 次插入3 次删除
  1. 62 0
      pkg/ruleEngine/nodes/influxdb_node.go
  2. 1 0
      pkg/ruleEngine/nodes/reg_types.go
  3. 3 3
      services/controller/controller.go

+ 62 - 0
pkg/ruleEngine/nodes/influxdb_node.go

@@ -0,0 +1,62 @@
+package nodes
+
+import (
+	"encoding/json"
+	"github.com/gogf/gf/encoding/gjson"
+	"github.com/gogf/gf/os/grpool"
+	"github.com/influxdata/influxdb-client-go/api"
+	"github.com/influxdata/influxdb-client-go/v2"
+	"sparrow/pkg/protocol"
+	"sparrow/pkg/ruleEngine"
+	"sparrow/pkg/server"
+	"time"
+)
+
+type InfluxDBConfig struct {
+	URL           string            `json:"url"`
+	Token         string            `json:"token"`
+	Org           string            `json:"org"`
+	Bucket        string            `json:"bucket"`
+	Measurement   string            `json:"measurement"`
+	Tags          map[string]string `json:"tags"`
+	DataFieldName string            `json:"data_fields"` // 数据字段名称
+}
+
+type InfluxDBNode struct {
+	config   *InfluxDBConfig
+	writeApi api.WriteAPI
+	pool     *grpool.Pool
+}
+
+func (i *InfluxDBNode) Init(ctx ruleEngine.Context, config string) error {
+	if config != "" {
+		c := new(InfluxDBConfig)
+		err := json.Unmarshal([]byte(config), c)
+		if err != nil {
+			return err
+		}
+		i.config = c
+	}
+	i.pool = grpool.New(10)
+	client := influxdb2.NewClient(i.config.URL, i.config.Token)
+	i.writeApi = client.WriteAPI(i.config.Org, i.config.Bucket)
+	return nil
+}
+
+func (i *InfluxDBNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
+	jsonData, err := gjson.DecodeToJson(message.Data)
+	if err != nil {
+		server.Log.Error(err)
+		return err
+	}
+	point := influxdb2.NewPoint(i.config.Measurement, i.config.Tags, jsonData.GetMap(i.config.DataFieldName), time.Now())
+	err = i.pool.Add(func() {
+		i.writeApi.WritePoint(point)
+	})
+	if err != nil {
+		server.Log.Error(err)
+		return err
+	}
+	ctx.TellSuccess(message)
+	return nil
+}

+ 1 - 0
pkg/ruleEngine/nodes/reg_types.go

@@ -17,6 +17,7 @@ func init() {
 	registerType((*MsgTypeSwitchNode)(nil))
 	registerType((*RestApiRequestNode)(nil))
 	registerType((*FilterJavascriptNode)(nil))
+	registerType((*InfluxDBNode)(nil))
 }
 
 func registerType(elem interface{}) {

+ 3 - 3
services/controller/controller.go

@@ -235,7 +235,7 @@ func (c *Controller) OnEvent(args rpcs.ArgsOnEvent, reply *rpcs.ReplyOnEvent) er
 	return c.producer.Send(tpi, g, nil)
 }
 
-// OnCreateRuleChain 规则链生命周期-创建
+// CreateRuleChain 规则链生命周期-创建
 func (c *Controller) CreateRuleChain(args rpcs.ArgsRuleChainAct, reply *rpcs.ReplyEmptyResult) error {
 	if c.actorContext != nil {
 		msg := &ruleEngine.ComponentLifecycleMsg{
@@ -350,10 +350,10 @@ func (c *Controller) launchConsumer() {
 			if err := ruleEngineMsg.Decode(msg.GetData()); err != nil {
 				fmt.Println("解析消息失败")
 			}
-			tanantId := msg.GetHeaders().Get("tenant_id")
+			tenantId := msg.GetHeaders().Get("tenant_id")
 			if c.actorContext != nil {
 				c.actorContext.Tell(&ruleEngine.QueueToRuleEngineMsg{
-					TenantId: string(tanantId),
+					TenantId: string(tenantId),
 					Message:  ruleEngineMsg,
 				})
 			}