|
@@ -100,29 +100,30 @@ type MeteData struct {
|
|
|
}
|
|
|
|
|
|
func (M *MQTTBrokerNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
|
|
|
- if M.mqttClient.IsConnected() {
|
|
|
- var meteData MeteData
|
|
|
- if v, ok := message.MetaData["vendor_id"]; ok {
|
|
|
- meteData.VendorId = v.(string)
|
|
|
- }
|
|
|
- if v, ok := message.MetaData["device_id"]; ok {
|
|
|
- meteData.DeviceId = v.(string)
|
|
|
- }
|
|
|
- tpl, err := template.New("topic").Parse(defaultTopicTemp)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- stringBuf := new(strings.Builder)
|
|
|
- if err = tpl.Execute(stringBuf, &meteData); err != nil {
|
|
|
- return nil
|
|
|
- }
|
|
|
- topic := stringBuf.String()
|
|
|
- token := M.mqttClient.Publish(topic, 0, false, message.Data)
|
|
|
- if token.Error() != nil {
|
|
|
- ctx.TellNext(message, protocol.Failure)
|
|
|
- } else {
|
|
|
- ctx.TellNext(message, protocol.Success)
|
|
|
- }
|
|
|
+
|
|
|
+ var meteData MeteData
|
|
|
+ if v, ok := message.MetaData["vendor_id"]; ok {
|
|
|
+ meteData.VendorId = v.(string)
|
|
|
+ }
|
|
|
+ if v, ok := message.MetaData["device_id"]; ok {
|
|
|
+ meteData.DeviceId = v.(string)
|
|
|
+ }
|
|
|
+ tpl, err := template.New("topic").Parse(defaultTopicTemp)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
+ stringBuf := new(strings.Builder)
|
|
|
+ if err = tpl.Execute(stringBuf, &meteData); err != nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ topic := stringBuf.String()
|
|
|
+ fmt.Println(topic)
|
|
|
+ token := M.mqttClient.Publish(topic, 0, false, message.Data)
|
|
|
+ if token.Error() != nil {
|
|
|
+ ctx.TellNext(message, protocol.Failure)
|
|
|
+ } else {
|
|
|
+ ctx.TellNext(message, protocol.Success)
|
|
|
+ }
|
|
|
+
|
|
|
return nil
|
|
|
}
|