Browse Source

增加多个数据字段名称

lijian 2 years ago
parent
commit
a7730733f9
2 changed files with 23 additions and 18 deletions
  1. 8 8
      pkg/queue/queue_consumer.go
  2. 15 10
      pkg/ruleEngine/nodes/influxdb_node.go

+ 8 - 8
pkg/queue/queue_consumer.go

@@ -2,19 +2,19 @@ package queue
 
 import "time"
 
-// queue consumer interface
+// QueueConsumer queue consumer interface
 // for a message queue consumer implements
 type QueueConsumer interface {
-	// get current topic
+	// GetTopic get current topic
 	GetTopic() string
-	// subscribe the topic
+	// Subscribe ,subscribe the topic
 	Subscribe() error
-	// subscribe with partitions, partitions is topics
+	// SubscribeWithPartitions subscribe with partitions, partitions is topics
 	SubscribeWithPartitions(partitions []*TopicPartitionInfo) error
-	// unsubscribe
+	// UnSubscribe unsubscribe
 	UnSubscribe()
-	// pop message from queue with time duration
-	Pop(time time.Duration) (<- chan QueueMessage, error)
-	// commit a message to queue if it is necessary
+	// Pop ,pop message from queue with time duration
+	Pop(time time.Duration) (<-chan QueueMessage, error)
+	// Commit ,commit a message to queue if it is necessary
 	Commit() error
 }

+ 15 - 10
pkg/ruleEngine/nodes/influxdb_node.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"github.com/gogf/gf/encoding/gjson"
 	"github.com/gogf/gf/os/grpool"
+	"github.com/gogf/gf/text/gstr"
 	"github.com/influxdata/influxdb-client-go/v2"
 	"github.com/influxdata/influxdb-client-go/v2/api"
 	"sparrow/pkg/protocol"
@@ -19,7 +20,7 @@ type InfluxDBConfig struct {
 	Bucket        string      `json:"bucket"`      // Bucket
 	Measurement   string      `json:"measurement"` // 度量名称
 	Tags          []*KeyValue `json:"tags"`        // 索引Tag
-	DataFieldName string      `json:"data_fields"` // 数据字段名称
+	DataFieldName string      `json:"data_fields"` // 数据字段名称,支持,分隔
 }
 
 // InfluxDBNode influxDB写入节点
@@ -61,17 +62,21 @@ func (i *InfluxDBNode) OnMessage(ctx ruleEngine.Context, message *protocol.Messa
 		server.Log.Error(err)
 		return err
 	}
-	fields := jsonData.GetMap(i.config.DataFieldName)
-	if fields != nil {
-		point := influxdb2.NewPoint(i.config.Measurement, tags, fields, time.Now())
-		err = i.pool.Add(func() {
-			i.writeApi.WritePoint(point)
-		})
-		if err != nil {
-			server.Log.Error(err)
-			return err
+	fields := gstr.Split(i.config.DataFieldName, ",")
+	for _, f := range fields {
+		m := jsonData.GetMap(f)
+		if fields != nil {
+			point := influxdb2.NewPoint(i.config.Measurement, tags, m, time.Now())
+			err = i.pool.Add(func() {
+				i.writeApi.WritePoint(point)
+			})
+			if err != nil {
+				server.Log.Error(err)
+				return err
+			}
 		}
 	}
+
 	ctx.TellSuccess(message)
 	return nil
 }