瀏覽代碼

test topic

lijian 1 年之前
父節點
當前提交
df1ca97eb7
共有 2 個文件被更改,包括 81 次插入2 次删除
  1. 72 0
      pkg/protocol/topic.go
  2. 9 2
      services/emqx-agent/agent.go

+ 72 - 0
pkg/protocol/topic.go

@@ -1,5 +1,10 @@
 package protocol
 
+import (
+	"errors"
+	"strings"
+)
+
 /*
 物理型topic:
 $thing/up/property/${productID}/${deviceName}	发布	属性上报
@@ -47,3 +52,70 @@ const (
 	TopicHeadExt     = "$ext"
 	Ext              = "ext"
 )
+
+type Direction int
+
+const (
+	Unknown Direction = iota //设备通信流向:未知
+	Up                       //设备通信流向:上行
+	Down                     //设备通信流向:下行
+)
+
+type TopicInfo struct {
+	ProductKey string
+	DeviceCode string
+	Direction  Direction
+	Types      []string
+	TopicHead  string
+}
+
+func GetTopicInfo(topic string) (topicInfo *TopicInfo, err error) {
+	keys := strings.Split(topic, "/")
+	return parseTopic(keys)
+}
+func parseTopic(topics []string) (topicInfo *TopicInfo, err error) {
+	if len(topics) < 2 {
+		return nil, errors.New("topic is err")
+	}
+	switch topics[0] {
+	case TopicHeadThing, TopicHeadOta, TopicHeadShadow, TopicHeadLog, TopicHeadConfig, TopicHeadGateway, TopicHeadExt:
+		return parseLast(topics)
+	default: //自定义消息
+		return parsePose(0, topics)
+	}
+}
+func parsePose(productPos int, topics []string) (topicInfo *TopicInfo, err error) {
+	return nil, errors.New("topic is err")
+	//先不考虑自定义消息
+	//if len(topics) < (productPos + 2) {
+	//	return nil, errors.Parameter.AddDetail("topic is err")
+	//}
+	//return &TopicInfo{
+	//	ProductID:  topics[productPos],
+	//	DeviceName: topics[productPos+1],
+	//	TopicHead:  topics[0],
+	//}, err
+}
+func parseLast(topics []string) (topicInfo *TopicInfo, err error) {
+	if len(topics) < 4 {
+		return nil, errors.New("topic is err")
+	}
+	return &TopicInfo{
+		ProductKey: topics[len(topics)-2],
+		DeviceCode: topics[len(topics)-1],
+		Direction:  getDirection(topics[1]),
+		Types:      topics[2 : len(topics)-2],
+		TopicHead:  topics[0],
+	}, err
+}
+
+func getDirection(dir string) Direction {
+	switch dir {
+	case "up":
+		return Up
+	case "down":
+		return Down
+	default:
+		return Unknown
+	}
+}

+ 9 - 2
services/emqx-agent/agent.go

@@ -13,7 +13,14 @@ type Agent struct {
 
 // Message 收到设备上报消息处理
 func (a *Agent) Message(topic string, payload []byte) error {
-	fmt.Printf("%s, %s\r\n", topic, payload)
+	topicInfo, err := protocol.GetTopicInfo(topic)
+	if err != nil {
+		return err
+	}
+	if topicInfo.Direction == protocol.Down {
+		return nil
+	}
+	fmt.Printf("%v", topicInfo)
 	return nil
 }
 
@@ -57,7 +64,7 @@ func (a *Agent) Disconnected(status *protocol.DevConnectStatus) error {
 		return nil
 	}
 	args := rpcs.ArgsGetOffline{
-		Id:       status.DeviceId,
+		Id:       device.RecordId,
 		VendorId: device.VendorID,
 	}
 	reply := rpcs.ReplyGetOffline{}