lijian 4 hete
szülő
commit
f3b5fda01c
1 módosított fájl, 30 hozzáadás és 37 törlés
  1. 30 37
      pkg/ruleEngine/nodes/script_filter_node.go

+ 30 - 37
pkg/ruleEngine/nodes/script_filter_node.go

@@ -2,12 +2,13 @@ package nodes
 
 import (
 	"encoding/json"
+	"errors"
 	"fmt"
-	"github.com/gogf/gf/v2/container/gqueue"
 	"github.com/robertkrimen/otto"
 	"sparrow/pkg/protocol"
 	"sparrow/pkg/ruleEngine"
 	"sparrow/pkg/server"
+	"sync"
 )
 
 const jsWrapperPrifixTemplate = `function %s(msgStr, metaDataStr, msgType) { 
@@ -31,7 +32,7 @@ const ruleNodeFuncName = "ruleNodeFunc"
 type FilterJavascriptNode struct {
 	vm     *otto.Otto
 	config *FilterJavascriptNodeConfig
-	queue  *gqueue.Queue
+	l      sync.Mutex
 }
 
 func (j *FilterJavascriptNode) Init(ctx ruleEngine.Context, config string) error {
@@ -44,8 +45,6 @@ func (j *FilterJavascriptNode) Init(ctx ruleEngine.Context, config string) error
 		j.config = c
 	}
 	j.vm = otto.New()
-	j.queue = gqueue.New(10)
-	go j.handleMessage(ctx)
 	_, err := j.vm.Run(generateRuleNodeScript("filter", j.config.FuncBody))
 	if err != nil {
 		return err
@@ -53,41 +52,35 @@ func (j *FilterJavascriptNode) Init(ctx ruleEngine.Context, config string) error
 	return nil
 }
 
-func (j *FilterJavascriptNode) handleMessage(ctx ruleEngine.Context) {
-
-	for {
-		if v := j.queue.Pop(); v != nil {
-			message := v.(*protocol.Message)
-			body := message.Data
-			metaData, err := json.Marshal(message.MetaData)
-			if err != nil {
-				server.Log.Errorf("metadata marshal error:%s", err.Error())
-				next := j.processError(ctx, message, err)
-				ctx.TellError(next, err)
-			}
-			res, err := j.vm.Call("filter", nil, body, string(metaData), message.Type)
-			if err != nil {
-				next := j.processError(ctx, message, err)
-				ctx.TellError(next, err)
-				server.Log.Errorf("vm call filter error:%s", err.Error())
-				fmt.Printf("________++++==%s", metaData)
-			}
-			v := false
-			if result, err := res.ToBoolean(); err == nil {
-				v = result
-			}
-
-			if v {
-				ctx.TellNext(message, protocol.True)
-			} else {
-				ctx.TellNext(message, protocol.False)
-			}
-		}
+func (j *FilterJavascriptNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
+	body := message.Data
+	metaData, err := json.Marshal(message.MetaData)
+	if err != nil {
+		server.Log.Errorf("metadata marshal error:%s", err.Error())
+		next := j.processError(ctx, message, err)
+		ctx.TellError(next, err)
+		return errors.New("metadata marshal error " + err.Error())
+	}
+	j.l.Lock()
+	defer j.l.Unlock()
+	res, err := j.vm.Call("filter", nil, body, string(metaData), message.Type)
+	if err != nil {
+		//next := j.processError(ctx, message, err)
+		//ctx.TellError(next, err)
+		server.Log.Errorf("vm call filter error:%s", err.Error())
+		fmt.Printf("________++++==%s", metaData)
+		return err
+	}
+	v := false
+	if result, err := res.ToBoolean(); err == nil {
+		v = result
 	}
-}
 
-func (j *FilterJavascriptNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
-	j.queue.Push(message)
+	if v {
+		ctx.TellNext(message, protocol.True)
+	} else {
+		ctx.TellNext(message, protocol.False)
+	}
 	return nil
 }