Sfoglia il codice sorgente

子设备上下线

lijian 2 anni fa
parent
commit
88e667bc85

+ 3 - 0
pkg/ruleEngine/nodes/influxdb_node.go

@@ -39,6 +39,9 @@ func (i *InfluxDBNode) Init(ctx ruleEngine.Context, config string) error {
 		}
 		i.config = c
 	}
+	if i.config.Tags == nil {
+		i.config.Tags = make(map[string]string)
+	}
 	i.pool = grpool.New(10)
 	client := influxdb2.NewClient(i.config.URL, i.config.Token)
 	i.writeApi = client.WriteAPI(i.config.Org, i.config.Bucket)

+ 7 - 8
pkg/ruleEngine/nodes/script_filter_node.go

@@ -13,14 +13,15 @@ import (
 const jsWrapperPrifixTemplate = `function %s(msgStr, metaDataStr, msgType) { 
 									var msg = JSON.parse(msgStr);
 									var metaData = JSON.parse(metaDataStr);
-									return JSON.stringify(%s(msg, metaData, msgType));
+									return %s(msg, metaData, msgType);
 									function %s(%s, %s, %s) {`
 const jsWrapperSuffix = `}}`
 const ruleNodeFuncName = "ruleNodeFunc"
+
 // FilterJavascriptNode js代码逻辑节点,提供执行Js代码的执行能力,依赖执行结果输出节点的最终判定,脚本返回true或false
 // 默认Filter函数接收三个参数,Filter(msg, metadata, msgType)
 type FilterJavascriptNode struct {
-	vm *otto.Otto
+	vm     *otto.Otto
 	config *FilterJavascriptNodeConfig
 }
 
@@ -80,14 +81,15 @@ func (j *FilterJavascriptNode) processError(ctx ruleEngine.Context, msg *protoco
 
 	return ctx.TransformMessage(msg, msg.Type, msg.Originator, metaData, msg.Data)
 }
+
 // FilterJavascriptNodeConfig Js 节点配置
 type FilterJavascriptNodeConfig struct {
-	FuncBody string `json:"func_body"`// 函数体代码
+	FuncBody string `json:"func_body"` // 函数体代码
 }
 
 func generateRuleNodeScript(funcName string, scriptBody string, args ...string) string {
 	var msgArg, metaDataArg, msgTypArg string
-	if len(args)  == 3 {
+	if len(args) == 3 {
 		msgArg = args[0]
 		metaDataArg = args[1]
 		msgTypArg = args[2]
@@ -97,9 +99,6 @@ func generateRuleNodeScript(funcName string, scriptBody string, args ...string)
 		metaDataArg = "metadata"
 	}
 	jsContent := fmt.Sprintf(jsWrapperPrifixTemplate, funcName,
-		ruleNodeFuncName, ruleNodeFuncName,msgArg, metaDataArg, msgTypArg)
+		ruleNodeFuncName, ruleNodeFuncName, msgArg, metaDataArg, msgTypArg)
 	return fmt.Sprintf("%s%s%s", jsContent, scriptBody, jsWrapperSuffix)
 }
-
-
-

+ 5 - 6
pkg/ruleEngine/nodes/script_filter_node_test.go

@@ -6,22 +6,21 @@ import (
 )
 
 func TestGenScript(t *testing.T) {
-	result :=generateRuleNodeScript("filter", "return msg.heart_beat !== undefined")
+	result := generateRuleNodeScript("filter", "return metadata.id===2")
 	t.Log(result)
-	vm :=otto.New()
-	_, err :=vm.Run(result)
+	vm := otto.New()
+	_, err := vm.Run(result)
 	if err != nil {
 		t.Error(err)
 	}
-	call, err := vm.Call("filter", nil, "{\"device_id\":2,\"heart_beat\":{\"device_id\":20274757167}}", "{\"id\":1}", "")
+	call, err := vm.Call("filter", nil, `{"device_id":1,"heart_beat":{"device_id":20274757167}}`, `{"id":2}`, "")
 	if err != nil {
 		t.Error(err)
 	}
-	v, err :=call.ToBoolean()
+	v, err := call.ToBoolean()
 	if err != nil {
 		t.Error(err)
 	}
 	t.Logf("%+v", v)
 
-
 }

BIN
services/.DS_Store


+ 0 - 1
services/knowoapi/services/rule_chain.go

@@ -67,7 +67,6 @@ func (a ruleChainService) Create(ruleChain *models.RuleChain) error {
 }
 
 func (a ruleChainService) create(params *models.RuleChainParams) (firstNodeId string, err error) {
-	fmt.Println(params.Cell.MustToJsonString())
 	nodeMap := make(map[string]models.RuleNode)
 	var inputNodeId string
 	for _, v := range params.Cells {

+ 10 - 24
services/mqttaccess/mqtt_provider.go

@@ -89,10 +89,10 @@ func (mp *MQTTProvider) OnDeviceMessage(deviceid, vendorId string, msgtype strin
 			switch act {
 			case klink.DevSendAction:
 				processReportStatus(deviceid, vendorId, message)
-			//case klink.DevLoginAction:
-			//	_ = processDevLogin(deviceid)
-			//case klink.DevLogoutAction:
-			//	_ = processDevLogout(deviceid)
+			case klink.DevLoginAction:
+				_ = processDevLogin(deviceid, message.GetString("subDeviceId"))
+			case klink.DevLogoutAction:
+				_ = processDevLogout(deviceid, message.GetString("subDeviceId"))
 			case klink.DevNetConfigAction:
 				_ = processDevNetConfig(deviceid, message.GetString("md5"))
 			}
@@ -133,28 +133,14 @@ func processReportStatus(deviceid, vendorId string, message *gjson.Json) {
 	}
 }
 
-func processDevLogin(subDeviceId string) error {
-	reply := rpcs.ReplyGetOnline{}
-	var args rpcs.ArgsGetOnline
-	args.Id = subDeviceId
-	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnline", args, &reply)
-	if err != nil {
-		server.Log.Errorf("device online error. args: %v, error: %v", args, err)
-	}
-	return err
+func processDevLogin(deviceCode, subDeviceId string) error {
+	server.Log.Debugf("子设备上线:%s,%s", deviceCode, subDeviceId)
+	return nil
 }
 
-func processDevLogout(subDeviceId string) error {
-	args := rpcs.ArgsGetOffline{
-		Id: subDeviceId,
-	}
-	reply := rpcs.ReplyGetOffline{}
-	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
-	if err != nil {
-		server.Log.Errorf("device offline error. deviceid: %v, error: %v", subDeviceId, err)
-	}
-
-	return err
+func processDevLogout(deviceCode, subDeviceId string) error {
+	server.Log.Debugf("子设备下线:%s,%s", deviceCode, subDeviceId)
+	return nil
 }
 
 // 处理设备配网信息