|
@@ -12,7 +12,7 @@ import (
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
-const defaultTopicTemp = "/device_message/{{.VendorId}}/{{.DeviceId}}"
|
|
|
+const defaultTopicTemp = "/device_message/{{.VendorId}}/{{.DeviceId}}/{{.SubDeviceId}}"
|
|
|
const defaultTimeout = 5
|
|
|
|
|
|
// MQTTBrokerNode Publish incoming message payload to the topic of the configured MQTT broker with QoS AT_LEAST_ONCE.
|
|
@@ -95,8 +95,9 @@ func (M *MQTTBrokerNode) Init(ctx ruleEngine.Context, config string) error {
|
|
|
}
|
|
|
|
|
|
type MeteData struct {
|
|
|
- VendorId string
|
|
|
- DeviceId string
|
|
|
+ VendorId string
|
|
|
+ DeviceId string
|
|
|
+ SubDeviceId string
|
|
|
}
|
|
|
|
|
|
func (M *MQTTBrokerNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
|
|
@@ -107,6 +108,9 @@ func (M *MQTTBrokerNode) OnMessage(ctx ruleEngine.Context, message *protocol.Mes
|
|
|
if v, ok := message.MetaData["device_id"]; ok {
|
|
|
meteData.DeviceId = v.(string)
|
|
|
}
|
|
|
+ if v, ok := message.MetaData["sub_device_id"]; ok {
|
|
|
+ meteData.SubDeviceId = v.(string)
|
|
|
+ }
|
|
|
tpl, err := template.New("topic").Parse(defaultTopicTemp)
|
|
|
if err != nil {
|
|
|
return err
|