lijian il y a 1 an
Parent
commit
ab3ee010ef
3 fichiers modifiés avec 134 ajouts et 5 suppressions
  1. 2 2
      pkg/protocol/topic.go
  2. 131 2
      services/emqx-agent/agent.go
  3. 1 1
      services/mqttaccess/mqtt_provider.go

+ 2 - 2
pkg/protocol/topic.go

@@ -7,8 +7,8 @@ import (
 
 /*
 物理型topic:
-$thing/up/property/${productID}/${deviceName}	发布	属性上报
-$thing/down/property/${productID}/${deviceName}	订阅	属性下发与属性上报响应
+$thing/up/status/${productID}/${deviceName}	发布	属性上报
+$thing/down/status/${productID}/${deviceName}	订阅	属性下发与属性上报响应
 $thing/up/event/${productID}/${deviceName}	发布	事件上报
 $thing/down/event/${productID}/${deviceName}	订阅	事件上报响应
 $thing/up/action/${productID}/${deviceName}	发布	设备响应行为执行结果

+ 131 - 2
services/emqx-agent/agent.go

@@ -1,7 +1,9 @@
 package main
 
 import (
-	"fmt"
+	"context"
+	"github.com/gogf/gf/encoding/gjson"
+	"sparrow/pkg/klink"
 	"sparrow/pkg/models"
 	"sparrow/pkg/protocol"
 	"sparrow/pkg/rpcs"
@@ -13,6 +15,7 @@ type Agent struct {
 
 // Message 收到设备上报消息处理
 func (a *Agent) Message(topic string, payload []byte) error {
+
 	topicInfo, err := protocol.GetTopicInfo(topic)
 	if err != nil {
 		return err
@@ -20,9 +23,135 @@ func (a *Agent) Message(topic string, payload []byte) error {
 	if topicInfo.Direction == protocol.Down {
 		return nil
 	}
-	fmt.Printf("%v", topicInfo)
+	device := &models.Device{}
+	err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", topicInfo.DeviceCode, device)
+	if err != nil {
+		server.Log.Errorf("device not found %s", topicInfo.DeviceCode)
+		return nil
+	}
+	server.Log.Debugf("device {%v} message {%v} : %s", device.DeviceIdentifier, topicInfo.Types, payload)
+	if len(topicInfo.Types) == 0 {
+		return nil
+	}
+	jsonPayload, err := gjson.DecodeToJson(payload)
+	if err != nil {
+		return nil
+	}
+
+	switch topicInfo.Types[0] {
+	case "status":
+		return a.processStatus(topicInfo.DeviceCode, device.VendorID, jsonPayload)
+	case "event":
+		return a.processEvent(topicInfo.DeviceCode, device.VendorID, jsonPayload)
+	}
+	return nil
+}
+
+func (a *Agent) processEvent(deviceId, vendorId string, message *gjson.Json) error {
+	reply := rpcs.ReplyOnEvent{}
+	args := rpcs.ArgsOnEvent{
+		DeviceId:    deviceId,
+		TimeStamp:   message.GetUint64("timestamp"),
+		SubDeviceId: message.GetString("subDeviceId"),
+		SubData:     message.GetJson("data").MustToJson(),
+		VendorId:    vendorId,
+	}
+	err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
+	if err != nil {
+		server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
+		return err
+	}
+	return nil
+}
+
+func (a *Agent) processStatus(deviceId, vendorId string, message *gjson.Json) error {
+	act := klink.PacketAction(message.GetString("action"))
+	if act != "" {
+		switch act {
+		case klink.DevSendAction:
+			processReportStatus(deviceId, vendorId, message)
+		case klink.DevLoginAction:
+			_ = processDevLogin(deviceId, message.GetString("subDeviceId"))
+		case klink.DevLogoutAction:
+			_ = processDevLogout(deviceId, message.GetString("subDeviceId"))
+		case klink.DevNetConfigAction:
+			_ = processDevNetConfig(deviceId, message.GetString("md5"))
+		case klink.ReportFirmwareAction:
+			_ = processDeviceReportUpgrade(deviceId, message.GetString("version"))
+		}
+	}
+	return nil
+}
+func processDevLogin(deviceCode, subDeviceId string) error {
+	var args rpcs.SubDeviceArgs
+	args.DeviceCode = deviceCode
+	args.Status = 1
+	args.SubDeviceId = subDeviceId
+	var reply *models.SubDevice
+	err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
+	if err != nil {
+		server.Log.Errorf("子设备上线出错:%s", err.Error())
+	}
+	return nil
+}
+
+func processDevLogout(deviceCode, subDeviceId string) error {
+	var args rpcs.SubDeviceArgs
+	args.DeviceCode = deviceCode
+	args.Status = 0
+	args.SubDeviceId = subDeviceId
+	var reply *models.SubDevice
+	err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
+	if err != nil {
+		server.Log.Errorf("子设备下线出错:%s", err.Error())
+	}
+	return nil
+}
+
+// 处理设备配网信息
+func processDevNetConfig(deviceCode, md5 string) error {
+	args := &models.DeviceNetConfig{
+		DeviceIdentifier: deviceCode,
+		MD5:              md5,
+		Status:           1,
+	}
+	reply := rpcs.ReplyCheckDeviceNetConfig{}
+	err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateDeviceNetConfig", args, &reply)
+	if err != nil {
+		server.Log.Errorf("set device:%s net config info error:%v", deviceCode, err)
+	}
+	return nil
+}
+
+// 设备上报固件信息处理
+func processDeviceReportUpgrade(deviceId, version string) error {
+	args := &rpcs.ArgsUpdateDeviceVersion{
+		DeviceId: deviceId,
+		Version:  version,
+	}
+	var reply rpcs.ReplyEmptyResult
+	err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply)
+	if err != nil {
+		server.Log.Errorf("更新设备版本号失败:%v", err)
+	}
 	return nil
 }
+func processReportStatus(deviceid, vendorId string, message *gjson.Json) {
+	reply := rpcs.ReplyOnStatus{}
+	args := rpcs.ArgsOnStatus{
+		DeviceId:    deviceid,
+		Timestamp:   message.GetUint64("timestamp"),
+		SubData:     message.GetJson("data").MustToJson(),
+		VendorId:    vendorId,
+		SubDeviceId: message.GetString("subDeviceId"),
+		Action:      klink.PacketAction(message.GetString("action")),
+	}
+	err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
+	if err != nil {
+		server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
+		return
+	}
+}
 
 // Connected 设备接入时
 func (a *Agent) Connected(status *protocol.DevConnectStatus) error {

+ 1 - 1
services/mqttaccess/mqtt_provider.go

@@ -94,7 +94,7 @@ func (mp *MQTTProvider) OnDeviceHeartBeat(deviceid string) error {
 }
 func (mp *MQTTProvider) OnDeviceMessage(deviceid, vendorId string, msgtype string, message *gjson.Json) {
 	deviceMessageCount.WithLabelValues(deviceid).Inc()
-	server.Log.Infof("device {%v} message {%v} : %s", deviceid, msgtype, message.MustToJsonString())
+	server.Log.Debugf("device {%v} message {%v} : %s", deviceid, msgtype, message.MustToJsonString())
 	switch msgtype {
 	case "s":
 		act := klink.PacketAction(message.GetString("action"))