|
@@ -8,6 +8,7 @@ import (
|
|
|
"sparrow/pkg/protocol"
|
|
|
"sparrow/pkg/ruleEngine"
|
|
|
"sparrow/pkg/server"
|
|
|
+ "sync"
|
|
|
)
|
|
|
|
|
|
const jsWrapperPrifixTemplate = `function %s(msgStr, metaDataStr, msgType) {
|
|
@@ -31,6 +32,7 @@ const ruleNodeFuncName = "ruleNodeFunc"
|
|
|
type FilterJavascriptNode struct {
|
|
|
vm *otto.Otto
|
|
|
config *FilterJavascriptNodeConfig
|
|
|
+ l sync.Mutex
|
|
|
}
|
|
|
|
|
|
func (j *FilterJavascriptNode) Init(ctx ruleEngine.Context, config string) error {
|
|
@@ -59,6 +61,8 @@ func (j *FilterJavascriptNode) OnMessage(ctx ruleEngine.Context, message *protoc
|
|
|
ctx.TellError(next, err)
|
|
|
return errors.New("metadata marshal error " + err.Error())
|
|
|
}
|
|
|
+ j.l.Lock()
|
|
|
+ defer j.l.Unlock()
|
|
|
res, err := j.vm.Call("filter", nil, body, string(metaData), message.Type)
|
|
|
if err != nil {
|
|
|
//next := j.processError(ctx, message, err)
|