package nodes import ( "encoding/json" "fmt" MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" "github.com/gogf/gf/util/guid" "html/template" "sparrow/pkg/protocol" "sparrow/pkg/ruleEngine" "strings" "time" ) const defaultTopicTemp = "/device_message/{{.VendorId}}/{{.DeviceId}}/{{.SubDeviceId}}" const defaultTimeout = 5 // MQTTBrokerNode Publish incoming message payload to the topic of the configured MQTT broker with QoS AT_LEAST_ONCE. // In case of successful message publishing, original Message will be passed to the next nodes via Success chain, //otherwise Failure chain is used. type MQTTBrokerNode struct { mqttClient *MQTT.Client config *MQTTBrokerNodeCfg } type MQTTBrokerNodeCfg struct { // can be a static string, or pattern that is resolved using Message Metadata properties. . // default topic device_message/{{.VendorId}}/{{.DeviceId}} TopicPattern string `json:"topic_pattern"` // MQTT broker host. Host string `json:"host"` // MQTT broker port. Port int `json:"port"` // timeout in seconds for connecting to MQTT broker. Timeout int `json:"timeout"` // optional client identifier used for connecting to MQTT broker. If not specified, default generated clientId will be used. ClientId string `json:"client_id"` // establishes a non persistent connection with the broker when enabled. ClearSession bool `json:"clear_session"` // enable/disable secure communication. SSLEnable bool `json:"ssl_enable"` // MQTT connection credentials. Can be either Anonymous, Basic or PEM. Credentials string `json:"credentials"` // Anonymous, Basic, PEM /* If PEM credentials type is selected, the following configuration should be provided: CA certificate file Certificate file Private key file Private key password */ UserName string `json:"user_name"` Password string `json:"password"` } func (M *MQTTBrokerNode) Init(ctx ruleEngine.Context, config string) error { if config != "" { c := new(MQTTBrokerNodeCfg) err := json.Unmarshal([]byte(config), c) if err != nil { return err } M.config = c } if M.config.Timeout == 0 { M.config.Timeout = defaultTimeout } var opt MQTT.ClientOptions opt.ClientID = "MQTT_NODE_" + guid.S() opt.CleanSession = M.config.ClearSession opt.KeepAlive = time.Second * 30 opt.ConnectTimeout = time.Duration(M.config.Timeout) * time.Second switch M.config.Credentials { case "Basic": opt.Username = M.config.UserName opt.Password = M.config.Password default: } if M.config.ClientId != "" { opt.ClientID = M.config.ClientId } opt.AutoReconnect = true opt.MaxReconnectInterval = 10 opt.AddBroker(fmt.Sprintf("tcp://%s:%d", M.config.Host, M.config.Port)) c := MQTT.NewClient(&opt) M.mqttClient = c go func() { if token := c.Connect(); token.Wait() && token.Error() != nil { fmt.Println(token.Error().Error()) return } }() return nil } type MeteData struct { VendorId string DeviceId string SubDeviceId string } func (M *MQTTBrokerNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error { var meteData MeteData if v, ok := message.MetaData["vendor_id"]; ok { meteData.VendorId = v.(string) } if v, ok := message.MetaData["device_id"]; ok { meteData.DeviceId = v.(string) } if v, ok := message.MetaData["sub_device_id"]; ok { meteData.SubDeviceId = v.(string) } tpl, err := template.New("topic").Parse(defaultTopicTemp) if err != nil { return err } stringBuf := new(strings.Builder) if err = tpl.Execute(stringBuf, &meteData); err != nil { return nil } topic := stringBuf.String() token := M.mqttClient.Publish(topic, 0, false, message.Data) if token.Error() != nil { ctx.TellNext(message, protocol.Failure) } else { ctx.TellNext(message, protocol.Success) } return nil }