Forráskód Böngészése

设备connection中增加vendor id

lijian 4 éve
szülő
commit
3b57743cf0

+ 3 - 1
pkg/mqtt/connection.go

@@ -34,6 +34,7 @@ type Connection struct {
 	KeepAlive       uint16
 	LastHbTime      int64
 	Token           []byte
+	VendorId        string
 }
 
 // NewConnection create a connection
@@ -177,6 +178,7 @@ func (c *Connection) RcvMsgFromClient() {
 				return
 			}
 			c.DeviceID = device.RecordId
+			c.VendorId = device.VendorID
 			token, err := hex.DecodeString(msg.Password)
 			if err != nil {
 				server.Log.Warn("token format error : %v", err)
@@ -223,7 +225,7 @@ func (c *Connection) RcvMsgFromClient() {
 
 		case *Publish:
 			server.Log.Infof("%s, publish topic: %s", host, msg.TopicName)
-			_ = c.Mgr.PublishMessage2Server(c.DeviceID, msg)
+			_ = c.Mgr.PublishMessage2Server(c.DeviceID, c.VendorId, msg)
 			if msg.QosLevel.IsAtLeastOnce() {
 				server.Log.Infof("publish ack send now")
 				publishack := &PubAck{MessageID: msg.MessageID}

+ 2 - 4
pkg/mqtt/manager.go

@@ -71,12 +71,10 @@ func (m *Manager) PublishMessage2Device(DeviceID string, msg *Publish, timeout t
 	return con.Publish(msg, timeout)
 }
 
-func (m *Manager) PublishMessage2Server(DeviceID string, msg *Publish) error {
+func (m *Manager) PublishMessage2Server(DeviceID string, vendorId string, msg *Publish) error {
 	topic := msg.TopicName
-
 	payload := msg.Payload.(BytesPayload)
-
-	m.Provider.OnDeviceMessage(DeviceID, topic, payload)
+	m.Provider.OnDeviceMessage(DeviceID, vendorId, topic, payload)
 	return nil
 }
 

+ 1 - 1
pkg/mqtt/provider.go

@@ -9,5 +9,5 @@ type Provider interface {
 	OnDeviceOnline(args rpcs.ArgsGetOnline) error
 	OnDeviceOffline(deviceid string) error
 	OnDeviceHeartBeat(deviceid string) error
-	OnDeviceMessage(deviceid string, msgtype string, message []byte)
+	OnDeviceMessage(deviceid, vendorId string, msgtype string, message []byte)
 }

+ 1 - 0
pkg/rpcs/controller.go

@@ -9,6 +9,7 @@ type ArgsOnStatus struct {
 	DeviceId  string
 	Timestamp uint64
 	Subdata   []protocol.SubData
+	VendorId  string
 }
 type ReplyOnStatus ReplyEmptyResult
 

+ 2 - 2
services/controller/controller.go

@@ -84,14 +84,14 @@ func (c *Controller) OnStatus(args rpcs.ArgsOnStatus, reply *rpcs.ReplyOnStatus)
 		Data:     data,
 		Callback: nil,
 		MetaData: map[string]interface{}{
-			"tenant_id": "tenant_1",
+			"tenant_id": args.VendorId,
 			"device_id": args.DeviceId,
 		},
 		Originator: "device",
 	}
 	tpi := queue.ResolvePartition("RULE_ENGINE",
 		msg.GetQueueName(),
-		"tenant_1",
+		args.VendorId,
 		args.DeviceId)
 	g, err := queue.NewGobQueueMessage(msg)
 	if err != nil {

+ 2 - 1
services/mqttaccess/mqtt_provider.go

@@ -57,7 +57,7 @@ func (mp *MQTTProvider) OnDeviceHeartBeat(deviceid string) error {
 	}
 	return err
 }
-func (mp *MQTTProvider) OnDeviceMessage(deviceid string, msgtype string, message []byte) {
+func (mp *MQTTProvider) OnDeviceMessage(deviceid, vendorId string, msgtype string, message []byte) {
 	server.Log.Infof("device {%v} message {%v} : %x", deviceid, msgtype, message)
 	switch msgtype {
 	case "s":
@@ -81,6 +81,7 @@ func (mp *MQTTProvider) OnDeviceMessage(deviceid string, msgtype string, message
 			DeviceId:  deviceid,
 			Timestamp: data.Head.Timestamp,
 			Subdata:   data.SubData,
+			VendorId:  vendorId,
 		}
 		err = server.RPCCallByName(nil, "controller", "Controller.OnStatus", args, &reply)
 		if err != nil {

BIN
tests/device/device