Browse Source

支持filter script node

lijian 3 years ago
parent
commit
fa0a5f1f89

+ 1 - 1
pkg/productconfig/productconfig.go

@@ -138,7 +138,7 @@ func (config *ProductConfig) StatusToMap(status []protocol.SubData) (map[string]
 			}
 		}
 		result[label] = values
-		result["device_id"] = sub.Head.SubDeviceid
+		result["sub_device_id"] = sub.Head.SubDeviceid
 	}
 
 	return result, nil

+ 1 - 0
pkg/ruleEngine/nodes/reg_types.go

@@ -16,6 +16,7 @@ func init() {
 	registerType((*MsgTypeFilterNode)(nil))
 	registerType((*MsgTypeSwitchNode)(nil))
 	registerType((*RestApiRequestNode)(nil))
+	registerType((*FilterJavascriptNode)(nil))
 }
 
 func registerType(elem interface{}) {

+ 11 - 7
pkg/ruleEngine/nodes/rest_api_request_node.go

@@ -72,12 +72,12 @@ func (r *RestApiRequestNode) OnMessage(ctx ruleEngine.Context, message *protocol
 	for k, v := range r.config.Headers {
 		headers[k] = v
 	}
-	w := r.bufPool.Get().(*bytes.Buffer)
-	if err := json.NewEncoder(w).Encode(body); err != nil {
-		return err
-	}
-	w.Reset()
-	r.bufPool.Put(w)
+	//w := r.bufPool.Get().(*bytes.Buffer)
+	//if err := json.NewEncoder(w).Encode(body); err != nil {
+	//	return err
+	//}
+	//w.Reset()
+	//r.bufPool.Put(w)
 	req, err := utils.NewRequest(r.config.Method, r.config.Url, []byte(body))
 	if err != nil {
 		server.Log.Error(err)
@@ -95,7 +95,11 @@ func (r *RestApiRequestNode) OnMessage(ctx ruleEngine.Context, message *protocol
 			ctx.TellError(next, err)
 			return
 		}
-		defer res.Close()
+		defer func() {
+			if res != nil {
+				res.Close()
+			}
+		}()
 		if res != nil && res.Response() != nil {
 			if res.Response().StatusCode == http.StatusOK {
 				msg := r.processResponse(ctx, message, res)

+ 103 - 0
pkg/ruleEngine/nodes/script_filter_node.go

@@ -0,0 +1,103 @@
+package nodes
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"github.com/robertkrimen/otto"
+	"sparrow/pkg/protocol"
+	"sparrow/pkg/ruleEngine"
+	"sparrow/pkg/server"
+)
+
+const jsWrapperPrifixTemplate = `function %s(msgStr, metaDataStr, msgType) { 
+									var msg = JSON.parse(msgStr);
+									var metaData = JSON.parse(metaDataStr);
+									return JSON.stringify(%s(msg, metaData, msgType));
+									function %s(%s, %s, %s) {`
+const jsWrapperSuffix = `}}`
+const ruleNodeFuncName = "ruleNodeFunc"
+// FilterJavascriptNode js代码逻辑节点,提供执行Js代码的执行能力,依赖执行结果输出节点的最终判定,脚本返回true或false
+// 默认Filter函数接收三个参数,Filter(msg, metadata, msgType)
+type FilterJavascriptNode struct {
+	vm *otto.Otto
+	config *FilterJavascriptNodeConfig
+}
+
+func (j *FilterJavascriptNode) Init(ctx ruleEngine.Context, config string) error {
+	if config != "" {
+		c := new(FilterJavascriptNodeConfig)
+		err := json.Unmarshal([]byte(config), c)
+		if err != nil {
+			return err
+		}
+		j.config = c
+	}
+	j.vm = otto.New()
+	_, err := j.vm.Run(generateRuleNodeScript("filter", j.config.FuncBody))
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (j *FilterJavascriptNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
+	body := message.Data
+	fmt.Println(body)
+	metaData, err := json.Marshal(message.MetaData)
+	if err != nil {
+		server.Log.Errorf("metadata marshal error:%s", err.Error())
+		next := j.processError(ctx, message, err)
+		ctx.TellError(next, err)
+		return errors.New("metadata marshal error " + err.Error())
+	}
+	fmt.Println(string(metaData))
+	res, err := j.vm.Call("filter", nil, body, string(metaData), message.Type)
+	if err != nil {
+		next := j.processError(ctx, message, err)
+		ctx.TellError(next, err)
+		server.Log.Errorf("vm call filter error:%s", err.Error())
+		return err
+	}
+	result := res.IsBoolean()
+	if result {
+		ctx.TellNext(message, protocol.True)
+	} else {
+		ctx.TellNext(message, protocol.False)
+	}
+	return nil
+}
+
+func (j *FilterJavascriptNode) processError(ctx ruleEngine.Context, msg *protocol.Message, err error) *protocol.Message {
+	var metaData map[string]interface{}
+	if msg.MetaData != nil {
+		metaData = msg.MetaData
+	}
+	metaData = make(map[string]interface{})
+	metaData["error"] = err.Error()
+
+	return ctx.TransformMessage(msg, msg.Type, msg.Originator, metaData, msg.Data)
+}
+// FilterJavascriptNodeConfig Js 节点配置
+type FilterJavascriptNodeConfig struct {
+	FuncBody string `json:"func_body"`// 函数体代码
+}
+
+func generateRuleNodeScript(funcName string, scriptBody string, args ...string) string {
+	var msgArg, metaDataArg, msgTypArg string
+	if len(args)  == 3 {
+		msgArg = args[0]
+		metaDataArg = args[1]
+		msgTypArg = args[2]
+	} else {
+		msgArg = "msg"
+		msgTypArg = "msgType"
+		metaDataArg = "metadata"
+	}
+	jsContent := fmt.Sprintf(jsWrapperPrifixTemplate, funcName,
+		ruleNodeFuncName, ruleNodeFuncName,msgArg, metaDataArg, msgTypArg)
+	return fmt.Sprintf("%s%s%s", jsContent, scriptBody, jsWrapperSuffix)
+}
+
+
+

+ 23 - 0
pkg/ruleEngine/nodes/script_filter_node_test.go

@@ -0,0 +1,23 @@
+package nodes
+
+import (
+	"github.com/robertkrimen/otto"
+	"testing"
+)
+
+func TestGenScript(t *testing.T) {
+	result :=generateRuleNodeScript("filter", "return msg.id > 20")
+	t.Log(result)
+	vm :=otto.New()
+	_, err :=vm.Run(result)
+	if err != nil {
+		t.Error(err)
+	}
+	call, err := vm.Call("filter", nil, "{\"id\":1}", "{\"id\":1}", "")
+	if err != nil {
+		t.Error(err)
+	}
+	t.Logf("%+v", call)
+
+
+}