Browse Source

更新节点配置

lijian 4 years ago
parent
commit
2be150fe7f

+ 3 - 3
pkg/mongo/recorder_test.go

@@ -20,12 +20,12 @@ func TestRecorder(t *testing.T) {
 		t.Error(err)
 	}
 
-	deviceid := uint64(12345)
+	deviceid := "123345"
 	timestamp := uint64(time.Now().Unix() * 1000)
 
 	subdata := protocol.SubData{
 		Head: protocol.SubDataHead{
-			SubDeviceid: "1",
+			SubDeviceid: 1,
 			PropertyNum: 2,
 			ParamsCount: 3,
 		},
@@ -39,7 +39,7 @@ func TestRecorder(t *testing.T) {
 	data := rpcs.ArgsOnStatus{
 		DeviceId:  deviceid,
 		Timestamp: timestamp,
-		Subdata:   subdatas,
+		SubData:   subdatas,
 	}
 
 	err = r.Insert(data)

+ 6 - 3
pkg/rpcs/common.go

@@ -6,8 +6,11 @@ type ArgsDeviceId struct {
 
 type ReplyEmptyResult struct{}
 
-
+// 定义 rpc服务的名字
 const (
 	RegistryServerName = "registry"
-	AccessServerName = "access"
-)
+	MQTTAccessName     = "MQTTAccess"
+	DeviceManagerName  = "deviceManager"
+	ControllerName     = "controller"
+	HttpAccessName     = "HTTPAccess"
+)

+ 1 - 1
pkg/rpcs/controller.go

@@ -8,7 +8,7 @@ import (
 type ArgsOnStatus struct {
 	DeviceId  string
 	Timestamp uint64
-	Subdata   []protocol.SubData
+	SubData   []protocol.SubData
 	VendorId  string
 }
 type ReplyOnStatus ReplyEmptyResult

+ 2 - 2
pkg/rule/rule_action.go

@@ -66,7 +66,7 @@ func performRuleAction(target string, action string) error {
 			Params:    command.Params,
 		}
 		cmdreply := rpcs.ReplySendCommand{}
-		err = server.RPCCallByName(nil, "controller", "Controller.SendCommand", cmdargs, &cmdreply)
+		err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.SendCommand", cmdargs, &cmdreply)
 		if err != nil {
 			server.Log.Errorf("send device command error: %v", err)
 			return err
@@ -82,7 +82,7 @@ func performRuleAction(target string, action string) error {
 			Status:   status,
 		}
 		statusreply := rpcs.ReplySetStatus{}
-		err = server.RPCCallByName(nil, "controller", "Controller.SetStatus", statusargs, &statusreply)
+		err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.SetStatus", statusargs, &statusreply)
 		if err != nil {
 			server.Log.Errorf("set devie status error: %v", err)
 			return err

+ 11 - 0
pkg/ruleEngine/nodes/rest_api_request_node.go

@@ -13,6 +13,17 @@ import (
 )
 
 // RestApiRequestNode 请求外部API节点
+// 支持的配置字符
+/*
+{
+    "url": "http://localhost/api/test",
+    "headers": {},
+    "retry": 3,
+    "method": "post",
+    "time_out": 5,
+    "retry_wait": 1
+}
+ */
 type RestApiRequestNode struct {
 	pool   *grpool.Pool
 	config *RestApiRequestNodeConfig

+ 3 - 3
pkg/ruleEngine/rule_chain_service.go

@@ -42,7 +42,7 @@ func (t *TestRuleChainService) GetRuleNodeRelations(tenantId, nodeId string) ([]
 
 func (t *TestRuleChainService) FindRuleChainById(tenantId, ruleChainId string) (*RuleChain, error) {
 	return &RuleChain{
-		TenantId:    "tenant_1",
+		TenantId:    "4jnh0r0hrl0c5a8mmecuoew200o32b8g",
 		Name:        "test rule chain 1",
 		FirstNodeId: "node1",
 		IsDebug:     true,
@@ -108,7 +108,7 @@ func (t *TestRuleChainService) GetRuleChainNodes(tenantId, ruleChainId string) (
 			Type:        "RestApiRequestNode",
 			Name:        "rest api",
 			IsDebug:     true,
-			Config:      "",
+			Config:      "{\"url\":\"http://localhost/api/test\",\"headers\":{},\"retry\":3,\"method\":\"post\",\"time_out\":5,\"retry_wait\":1}",
 			RuleNodeId:  "node3",
 		},
 	}, nil
@@ -117,7 +117,7 @@ func (t *TestRuleChainService) GetRuleChainNodes(tenantId, ruleChainId string) (
 func (t *TestRuleChainService) FindRuleChains(tenantId string) ([]*RuleChain, error) {
 	return []*RuleChain{
 		{
-			TenantId:    "tenant_1",
+			TenantId:    "4jnh0r0hrl0c5a8mmecuoew200o32b8g",
 			Name:        "test rule chain 1",
 			FirstNodeId: "node1",
 			IsDebug:     true,

+ 5 - 2
pkg/ruleEngine/tenant_service.go

@@ -17,12 +17,15 @@ type TestTenantService struct {
 func (t *TestTenantService) FindTenants() ([]*Tenant, error) {
 	return []*Tenant{
 		{
-			Id:   "tenant_1",
+			Id:   "4jnh0r0hrl0c5a8mmecuoew200o32b8g",
 			Name: "测试租户",
 		},
 	}, nil
 }
 
 func (t *TestTenantService) GetTenant(tId string) (*Tenant, error) {
-	panic("implement me")
+	return &Tenant{
+		Id:   "4jnh0r0hrl0c5a8mmecuoew200o32b8g",
+		Name: "测试租户",
+	}, nil
 }

+ 3 - 3
services/apiprovider/actions.go

@@ -127,7 +127,7 @@ func GetDeviceCurrentStatus(device *models.Device, config *productconfig.Product
 	defer span.Finish()
 	ext.SpanKindRPCClient.Set(span)
 
-	err := server.RPCCallByName(ctx, "controller", "Controller.GetStatus", statusargs, &statusreply)
+	err := server.RPCCallByName(ctx, rpcs.ControllerName, "Controller.GetStatus", statusargs, &statusreply)
 	if err != nil {
 		server.Log.Errorf("get devie status error: %v", err)
 		r.JSON(http.StatusOK, renderError(ErrSystemFault, err))
@@ -187,7 +187,7 @@ func SetDeviceStatus(device *models.Device, config *productconfig.ProductConfig,
 	defer span.Finish()
 	ext.SpanKindRPCClient.Set(span)
 
-	err = server.RPCCallByName(ctx, "controller", "Controller.SetStatus", statusargs, &statusreply)
+	err = server.RPCCallByName(ctx, rpcs.ControllerName, "Controller.SetStatus", statusargs, &statusreply)
 	if err != nil {
 		server.Log.Errorf("set devie status error: %v", err)
 		r.JSON(http.StatusOK, renderError(ErrSystemFault, err))
@@ -241,7 +241,7 @@ func SendCommandToDevice(device *models.Device, config *productconfig.ProductCon
 	defer span.Finish()
 	ext.SpanKindRPCClient.Set(span)
 
-	err = server.RPCCallByName(ctx, "controller", "Controller.SendCommand", cmdargs, &cmdreply)
+	err = server.RPCCallByName(ctx, rpcs.ControllerName, "Controller.SendCommand", cmdargs, &cmdreply)
 	if err != nil {
 		server.Log.Errorf("send devie command error: %v", err)
 		r.JSON(http.StatusOK, renderError(ErrSystemFault, err))

+ 1 - 1
services/apiprovider/middleware.go

@@ -104,7 +104,7 @@ func CheckDeviceOnline(context martini.Context, params martini.Params, req *http
 		Id: device.RecordId,
 	}
 	onlinereply := rpcs.ReplyGetDeviceOnlineStatus{}
-	err = server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply)
+	err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply)
 	if err != nil {
 		server.Log.Errorf("get device online status error: %v", err)
 		r.JSON(http.StatusOK, renderError(ErrDeviceNotOnline, errors.New("get device online status error "+err.Error())))

+ 4 - 4
services/coapaccess/coap_provider.go

@@ -18,7 +18,7 @@ func (mp *CoAPProvider) ValidateDeviceToken(deviceid string, token []byte) error
 		AccessToken: token,
 	}
 	reply := rpcs.ReplyValidateDeviceAccessToken{}
-	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.ValidateDeviceAccessToken", args, &reply)
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.ValidateDeviceAccessToken", args, &reply)
 	if err != nil {
 		server.Log.Errorf("validate device token error. deviceid : %v, token : %v, error: %v", deviceid, token, err)
 		return err
@@ -27,7 +27,7 @@ func (mp *CoAPProvider) ValidateDeviceToken(deviceid string, token []byte) error
 }
 func (mp *CoAPProvider) OnDeviceOnline(args rpcs.ArgsGetOnline) error {
 	reply := rpcs.ReplyGetOnline{}
-	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetOnline", args, &reply)
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnline", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device online error. args: %v, error: %v", args, err)
 	}
@@ -39,7 +39,7 @@ func (mp *CoAPProvider) OnDeviceOffline(deviceid string) error {
 		Id: deviceid,
 	}
 	reply := rpcs.ReplyGetOffline{}
-	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetOffline", args, &reply)
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device offline error. deviceid: %v, error: %v", deviceid, err)
 	}
@@ -51,7 +51,7 @@ func (mp *CoAPProvider) OnDeviceHeartBeat(deviceid string) error {
 		Id: deviceid,
 	}
 	reply := rpcs.ReplyHeartBeat{}
-	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.HeartBeat", args, &reply)
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.HeartBeat", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device heartbeat error. deviceid: %v, error: %v", deviceid, err)
 	}

+ 2 - 2
services/controller/controller.go

@@ -122,7 +122,7 @@ func (c *Controller) processStatusToQueue(args rpcs.ArgsOnStatus) (string, error
 		return result, err
 	}
 	ev := &protocol.Data{}
-	ev.SubData = args.Subdata
+	ev.SubData = args.SubData
 	m, err := pc.StatusToMap(ev.SubData)
 	if err != nil {
 		server.Log.Errorf("gen status json error : %v", err)
@@ -161,7 +161,7 @@ func getAccessRPCHost(deviceid string) (string, error) {
 		Id: deviceid,
 	}
 	reply := &rpcs.ReplyGetDeviceOnlineStatus{}
-	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetDeviceOnlineStatus", args, reply)
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply)
 	if err != nil {
 		return "", err
 	}

+ 2 - 1
services/controller/main.go

@@ -1,12 +1,13 @@
 package main
 
 import (
+	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
 )
 
 func main() {
 	// init server
-	err := server.Init("controller")
+	err := server.Init(rpcs.ControllerName)
 	if err != nil {
 		server.Log.Fatal(err)
 		return

+ 2 - 1
services/devicemanager/main.go

@@ -1,12 +1,13 @@
 package main
 
 import (
+	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
 )
 
 func main() {
 	// init server
-	err := server.Init("devicemanager")
+	err := server.Init(rpcs.DeviceManagerName)
 	if err != nil {
 		server.Log.Fatal(err)
 		return

+ 3 - 2
services/httpaccess/main.go

@@ -1,14 +1,15 @@
 package main
 
 import (
-	"sparrow/pkg/server"
 	"github.com/go-martini/martini"
 	"github.com/martini-contrib/render"
+	"sparrow/pkg/rpcs"
+	"sparrow/pkg/server"
 )
 
 func main() {
 	// init server
-	err := server.Init("httpaccess")
+	err := server.Init(rpcs.HttpAccessName)
 	if err != nil {
 		server.Log.Fatal(err)
 		return

+ 1 - 1
services/knowoapi/controllers/device.go

@@ -76,7 +76,7 @@ func (a *DeviceController) GetDevicestatus() {
 	}
 
 	onlinereply := rpcs.ReplyGetDeviceOnlineStatus{}
-	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply)
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply)
 	if err != nil && err.Error() != "redigo: nil returned" {
 		server.Log.Errorf("get devie online status error: %v", err)
 		responseError(a.Ctx, ErrDatabase, err.Error())

+ 2 - 2
services/knowoapi/services/device.go

@@ -52,7 +52,7 @@ func (a deviceservice) GetDevices(vendorid string, proid, pi, ps int, deviceid s
 		}
 
 		onlinereply := rpcs.ReplyGetDeviceOnlineStatus{}
-		err = server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply)
+		err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply)
 		if err != nil && err.Error() != "redigo: nil returned" {
 			server.Log.Errorf("get devie online status error: %v", err)
 
@@ -102,7 +102,7 @@ func (a deviceservice) GetDevicesCountByVenderId(vendorid string) (map[string]in
 			Id: device.RecordId,
 		}
 		onlinereply := rpcs.ReplyGetDeviceOnlineStatus{}
-		err = server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply)
+		err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply)
 		if err != nil && err.Error() != "redigo: nil returned" {
 			server.Log.Errorf("get devie online status error: %v", err)
 			return nil, err

+ 2 - 1
services/mqttaccess/main.go

@@ -1,12 +1,13 @@
 package main
 
 import (
+	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
 )
 
 func main() {
 	// init server
-	err := server.Init("mqttaccess")
+	err := server.Init(rpcs.MQTTAccessName)
 	if err != nil {
 		server.Log.Fatal(err)
 		return

+ 8 - 8
services/mqttaccess/mqtt_provider.go

@@ -18,7 +18,7 @@ func (mp *MQTTProvider) ValidateDeviceToken(deviceid string, token []byte) error
 		AccessToken: token,
 	}
 	reply := rpcs.ReplyValidateDeviceAccessToken{}
-	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.ValidateDeviceAccessToken", args, &reply)
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.ValidateDeviceAccessToken", args, &reply)
 	if err != nil {
 		server.Log.Errorf("validate device token error. deviceid : %v, token : %v, error: %v", deviceid, token, err)
 		return err
@@ -27,7 +27,7 @@ func (mp *MQTTProvider) ValidateDeviceToken(deviceid string, token []byte) error
 }
 func (mp *MQTTProvider) OnDeviceOnline(args rpcs.ArgsGetOnline) error {
 	reply := rpcs.ReplyGetOnline{}
-	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetOnline", args, &reply)
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnline", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device online error. args: %v, error: %v", args, err)
 	}
@@ -39,7 +39,7 @@ func (mp *MQTTProvider) OnDeviceOffline(deviceid string) error {
 		Id: deviceid,
 	}
 	reply := rpcs.ReplyGetOffline{}
-	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetOffline", args, &reply)
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device offline error. deviceid: %v, error: %v", deviceid, err)
 	}
@@ -51,7 +51,7 @@ func (mp *MQTTProvider) OnDeviceHeartBeat(deviceid string) error {
 		Id: deviceid,
 	}
 	reply := rpcs.ReplyHeartBeat{}
-	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.HeartBeat", args, &reply)
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.HeartBeat", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device heartbeat error. deviceid: %v, error: %v", deviceid, err)
 	}
@@ -80,10 +80,10 @@ func (mp *MQTTProvider) OnDeviceMessage(deviceid, vendorId string, msgtype strin
 		args := rpcs.ArgsOnStatus{
 			DeviceId:  deviceid,
 			Timestamp: data.Head.Timestamp,
-			Subdata:   data.SubData,
+			SubData:   data.SubData,
 			VendorId:  vendorId,
 		}
-		err = server.RPCCallByName(nil, "controller", "Controller.OnStatus", args, &reply)
+		err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
 		if err != nil {
 			server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
 			return
@@ -105,12 +105,12 @@ func (mp *MQTTProvider) OnDeviceMessage(deviceid, vendorId string, msgtype strin
 			Priority:  event.Head.Priority,
 			Params:    event.Params,
 		}
-		err = server.RPCCallByName(nil, "controller", "Controller.OnEvent", args, &reply)
+		err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
 		if err != nil {
 			server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
 			return
 		}
 	default:
-		server.Log.Infof("unkown message type: %v", msgtype)
+		server.Log.Infof("unknown message type: %v", msgtype)
 	}
 }