Просмотр исходного кода

fix concurrent map read and map write

lijian 1 месяц назад
Родитель
Сommit
407311093e
2 измененных файлов с 40 добавлено и 33 удалено
  1. 3 3
      pkg/ruleEngine/context.go
  2. 37 30
      pkg/ruleEngine/nodes/script_filter_node.go

+ 3 - 3
pkg/ruleEngine/context.go

@@ -14,11 +14,11 @@ type Context interface {
 	TellNext(msg *protocol.Message, relationType protocol.RelationType)
 	// 向当前节点发消息,duration 为延迟时间
 	TellSelf(msg *protocol.Message, duration time.Duration)
-	// 发送错误消息消息
+	// TellError 发送错误消息消息
 	TellError(msg *protocol.Message, err error)
-	// message ack
+	// Ack message ack
 	Ack(msg *protocol.Message)
-	// transform a message
+	// TransformMessage transform a message
 	TransformMessage(msg *protocol.Message, msgType, originator string, metaData map[string]interface{}, data string) *protocol.Message
 
 	GetRuleNodeCtx() *RuleNodeCtx

+ 37 - 30
pkg/ruleEngine/nodes/script_filter_node.go

@@ -2,13 +2,12 @@ package nodes
 
 import (
 	"encoding/json"
-	"errors"
 	"fmt"
+	"github.com/gogf/gf/v2/container/gqueue"
 	"github.com/robertkrimen/otto"
 	"sparrow/pkg/protocol"
 	"sparrow/pkg/ruleEngine"
 	"sparrow/pkg/server"
-	"sync"
 )
 
 const jsWrapperPrifixTemplate = `function %s(msgStr, metaDataStr, msgType) { 
@@ -32,7 +31,7 @@ const ruleNodeFuncName = "ruleNodeFunc"
 type FilterJavascriptNode struct {
 	vm     *otto.Otto
 	config *FilterJavascriptNodeConfig
-	l      sync.Mutex
+	queue  *gqueue.Queue
 }
 
 func (j *FilterJavascriptNode) Init(ctx ruleEngine.Context, config string) error {
@@ -45,6 +44,8 @@ func (j *FilterJavascriptNode) Init(ctx ruleEngine.Context, config string) error
 		j.config = c
 	}
 	j.vm = otto.New()
+	j.queue = gqueue.New(10)
+	go j.handleMessage(ctx)
 	_, err := j.vm.Run(generateRuleNodeScript("filter", j.config.FuncBody))
 	if err != nil {
 		return err
@@ -52,35 +53,41 @@ func (j *FilterJavascriptNode) Init(ctx ruleEngine.Context, config string) error
 	return nil
 }
 
-func (j *FilterJavascriptNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
-	body := message.Data
-	metaData, err := json.Marshal(message.MetaData)
-	if err != nil {
-		server.Log.Errorf("metadata marshal error:%s", err.Error())
-		next := j.processError(ctx, message, err)
-		ctx.TellError(next, err)
-		return errors.New("metadata marshal error " + err.Error())
-	}
-	j.l.Lock()
-	defer j.l.Unlock()
-	res, err := j.vm.Call("filter", nil, body, string(metaData), message.Type)
-	if err != nil {
-		//next := j.processError(ctx, message, err)
-		//ctx.TellError(next, err)
-		server.Log.Errorf("vm call filter error:%s", err.Error())
-		fmt.Printf("________++++==%s", metaData)
-		return err
-	}
-	v := false
-	if result, err := res.ToBoolean(); err == nil {
-		v = result
-	}
+func (j *FilterJavascriptNode) handleMessage(ctx ruleEngine.Context) {
 
-	if v {
-		ctx.TellNext(message, protocol.True)
-	} else {
-		ctx.TellNext(message, protocol.False)
+	for {
+		if v := j.queue.Pop(); v != nil {
+			message := v.(*protocol.Message)
+			body := message.Data
+			metaData, err := json.Marshal(message.MetaData)
+			if err != nil {
+				server.Log.Errorf("metadata marshal error:%s", err.Error())
+				next := j.processError(ctx, message, err)
+				ctx.TellError(next, err)
+			}
+			res, err := j.vm.Call("filter", nil, body, string(metaData), message.Type)
+			if err != nil {
+				next := j.processError(ctx, message, err)
+				ctx.TellError(next, err)
+				server.Log.Errorf("vm call filter error:%s", err.Error())
+				fmt.Printf("________++++==%s", metaData)
+			}
+			v := false
+			if result, err := res.ToBoolean(); err == nil {
+				v = result
+			}
+
+			if v {
+				ctx.TellNext(message, protocol.True)
+			} else {
+				ctx.TellNext(message, protocol.False)
+			}
+		}
 	}
+}
+
+func (j *FilterJavascriptNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
+	j.queue.Push(message)
 	return nil
 }