Browse Source

上报消息头增加productKey

liuxiulin 8 months ago
parent
commit
23c60786b8

+ 1 - 0
pkg/rpcs/controller.go

@@ -8,6 +8,7 @@ type ArgsOnStatus struct {
 	DeviceId    string
 	Timestamp   uint64
 	SubData     []byte
+	ProductKey  string
 	VendorId    string
 	Action      klink.PacketAction
 	SubDeviceId string

+ 1 - 0
services/controller/controller.go

@@ -167,6 +167,7 @@ func (c *Controller) OnStatus(args rpcs.ArgsOnStatus, reply *rpcs.ReplyOnStatus)
 			"vendor_id":     args.VendorId,
 			"device_id":     args.DeviceId,
 			"sub_device_id": args.SubDeviceId,
+			"product_key":   args.ProductKey,
 			"timestamp":     strconv.Itoa(int(args.Timestamp)),
 		},
 		Originator: "device",

+ 48 - 10
services/emqx-agent/agent.go

@@ -42,17 +42,17 @@ func (a *Access) Message(topic string, payload []byte) error {
 
 	switch topicInfo.Types[0] {
 	case "status":
-		return a.processStatus(topicInfo.DeviceCode, device.VendorID, jsonPayload)
+		return a.processStatus(topicInfo, device.VendorID, jsonPayload)
 	case "event":
-		return a.processEvent(topicInfo.DeviceCode, device.VendorID, jsonPayload)
+		return a.processEvent(topicInfo, device.VendorID, jsonPayload)
 	}
 	return nil
 }
 
-func (a *Access) processEvent(deviceId, vendorId string, message *gjson.Json) error {
+func (a *Access) processEvent(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
 	reply := rpcs.ReplyOnEvent{}
 	args := rpcs.ArgsOnEvent{
-		DeviceId:    deviceId,
+		DeviceId:    topicInfo.DeviceCode,
 		TimeStamp:   message.GetUint64("timestamp"),
 		SubDeviceId: message.GetString("subDeviceId"),
 		SubData:     message.GetJson("data").MustToJson(),
@@ -66,20 +66,22 @@ func (a *Access) processEvent(deviceId, vendorId string, message *gjson.Json) er
 	return nil
 }
 
-func (a *Access) processStatus(deviceId, vendorId string, message *gjson.Json) error {
+func (a *Access) processStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
 	act := klink.PacketAction(message.GetString("action"))
 	if act != "" {
 		switch act {
 		case klink.DevSendAction:
-			processReportStatus(deviceId, vendorId, message)
+			processReportStatus(topicInfo.DeviceCode, vendorId, message)
 		case klink.DevLoginAction:
-			_ = processDevLogin(deviceId, message.GetString("subDeviceId"))
+			_ = processDevLogin(topicInfo.DeviceCode, message.GetString("subDeviceId"))
 		case klink.DevLogoutAction:
-			_ = processDevLogout(deviceId, message.GetString("subDeviceId"))
+			_ = processDevLogout(topicInfo.DeviceCode, message.GetString("subDeviceId"))
 		case klink.DevNetConfigAction:
-			_ = processDevNetConfig(deviceId, message.GetString("md5"))
+			_ = processDevNetConfig(topicInfo.DeviceCode, message.GetString("md5"))
 		case klink.ReportFirmwareAction:
-			_ = processDeviceReportUpgrade(deviceId, message.GetString("version"))
+			_ = processDeviceReportUpgrade(topicInfo.DeviceCode, message.GetString("version"))
+		case klink.DevUpgradeAction:
+			_ = processDeviceUpgrade(topicInfo.DeviceCode, message)
 		}
 	}
 	return nil
@@ -155,6 +157,42 @@ func processReportStatus(deviceid, vendorId string, message *gjson.Json) {
 	}
 }
 
+func processDeviceUpgrade(deviceId string, message *gjson.Json) error {
+	var reply rpcs.ReplyEmptyResult
+	data := gjson.New(message.GetJson("data").MustToJson())
+	switch data.GetString("cmd") {
+	case "download":
+		params := gjson.New(data.GetJson("params").MustToJson())
+
+		args := &rpcs.ChunkUpgrade{
+			DeviceId: deviceId,
+			FileId:   params.GetString("fileId"),
+			FileSize: params.GetInt64("fileSize"),
+			Size:     params.GetInt64("size"),
+			Offset:   params.GetInt("offset"),
+		}
+
+		err := server.RPCCallByName(nil, rpcs.MQTTAccessName, "Access.ChunkUpgrade", args, &reply)
+		if err != nil {
+			server.Log.Errorf("分片下载发送失败:%v", err)
+		}
+
+	case "downProgress":
+		params := gjson.New(data.GetJson("params").MustToJson())
+		var args rpcs.ArgsOtaProgress
+		args.DeviceId = deviceId
+		args.Progress = params.GetInt("progress")
+
+		err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.UpdateProgress", args, &reply)
+		if err != nil {
+			server.Log.Errorf("OTA升级进度保存失败:%v", err)
+			return err
+		}
+	}
+
+	return nil
+}
+
 // Connected 设备接入时
 func (a *Access) Connected(status *protocol.DevConnectStatus) error {
 	server.Log.Infof("设备上线;%s", status.DeviceId)

+ 16 - 39
services/mqttaccess/mqtt_provider.go

@@ -110,8 +110,6 @@ func (mp *MQTTProvider) OnDeviceMessage(deviceid, vendorId string, msgtype strin
 				_ = processDevNetConfig(deviceid, message.GetString("md5"))
 			case klink.ReportFirmwareAction:
 				_ = processDeviceReportUpgrade(deviceid, message.GetString("version"))
-			case klink.DevUpgradeAction:
-				_ = processDeviceUpgrade(deviceid, message)
 			}
 		}
 	case "e":
@@ -134,6 +132,20 @@ func (mp *MQTTProvider) OnDeviceMessage(deviceid, vendorId string, msgtype strin
 }
 
 func processReportStatus(deviceid, vendorId string, message *gjson.Json) {
+
+	device := &models.Device{}
+	err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", deviceid, device)
+	if err != nil {
+		server.Log.Errorf("device not found %s", deviceid)
+		return
+	}
+	product := &models.Product{}
+	err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
+	if err != nil {
+		server.Log.Errorf("find product error : %v", err)
+		return
+	}
+
 	reply := rpcs.ReplyOnStatus{}
 	args := rpcs.ArgsOnStatus{
 		DeviceId:    deviceid,
@@ -141,9 +153,10 @@ func processReportStatus(deviceid, vendorId string, message *gjson.Json) {
 		SubData:     message.GetJson("data").MustToJson(),
 		VendorId:    vendorId,
 		SubDeviceId: message.GetString("subDeviceId"),
+		ProductKey:  product.ProductKey,
 		Action:      klink.PacketAction(message.GetString("action")),
 	}
-	err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
+	err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
 		return
@@ -204,39 +217,3 @@ func processDeviceReportUpgrade(deviceId, version string) error {
 	}
 	return nil
 }
-
-func processDeviceUpgrade(deviceId string, message *gjson.Json) error {
-	var reply rpcs.ReplyEmptyResult
-	data := gjson.New(message.GetJson("data").MustToJson())
-	switch data.GetString("cmd") {
-	case "download":
-		params := gjson.New(data.GetJson("params").MustToJson())
-
-		args := &rpcs.ChunkUpgrade{
-			DeviceId: deviceId,
-			FileId:   params.GetString("fileId"),
-			FileSize: params.GetInt64("fileSize"),
-			Size:     params.GetInt64("size"),
-			Offset:   params.GetInt("offset"),
-		}
-
-		err := server.RPCCallByName(nil, rpcs.MQTTAccessName, "Access.ChunkUpgrade", args, &reply)
-		if err != nil {
-			server.Log.Errorf("分片下载发送失败:%v", err)
-		}
-
-	case "downProgress":
-		params := gjson.New(data.GetJson("params").MustToJson())
-		var args rpcs.ArgsOtaProgress
-		args.DeviceId = deviceId
-		args.Progress = params.GetInt("progress")
-
-		err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetProgress", args, &reply)
-		if err != nil {
-			server.Log.Errorf("OTA升级进度获取失败:%v", err)
-			return err
-		}
-	}
-
-	return nil
-}