123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- package nodes
- import (
- "encoding/json"
- "fmt"
- MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
- "github.com/gogf/gf/util/guid"
- "html/template"
- "sparrow/pkg/protocol"
- "sparrow/pkg/ruleEngine"
- "strings"
- "time"
- )
- const defaultTopicTemp = "/device_message/{{.VendorId}}/{{.DeviceId}}/{{.SubDeviceId}}"
- const defaultTimeout = 5
- // MQTTBrokerNode Publish incoming message payload to the topic of the configured MQTT broker with QoS AT_LEAST_ONCE.
- // In case of successful message publishing, original Message will be passed to the next nodes via Success chain,
- //otherwise Failure chain is used.
- type MQTTBrokerNode struct {
- mqttClient *MQTT.Client
- config *MQTTBrokerNodeCfg
- }
- type MQTTBrokerNodeCfg struct {
- // can be a static string, or pattern that is resolved using Message Metadata properties. .
- // default topic device_message/{{.VendorId}}/{{.DeviceId}}
- TopicPattern string `json:"topic_pattern"`
- // MQTT broker host.
- Host string `json:"host"`
- // MQTT broker port.
- Port int `json:"port"`
- // timeout in seconds for connecting to MQTT broker.
- Timeout int `json:"timeout"`
- // optional client identifier used for connecting to MQTT broker. If not specified, default generated clientId will be used.
- ClientId string `json:"client_id"`
- // establishes a non persistent connection with the broker when enabled.
- ClearSession bool `json:"clear_session"`
- // enable/disable secure communication.
- SSLEnable bool `json:"ssl_enable"`
- // MQTT connection credentials. Can be either Anonymous, Basic or PEM.
- Credentials string `json:"credentials"` // Anonymous, Basic, PEM
- /*
- If PEM credentials type is selected, the following configuration should be provided:
- CA certificate file
- Certificate file
- Private key file
- Private key password
- */
- UserName string `json:"user_name"`
- Password string `json:"password"`
- }
- func (M *MQTTBrokerNode) Init(ctx ruleEngine.Context, config string) error {
- if config != "" {
- c := new(MQTTBrokerNodeCfg)
- err := json.Unmarshal([]byte(config), c)
- if err != nil {
- return err
- }
- M.config = c
- }
- if M.config.Timeout == 0 {
- M.config.Timeout = defaultTimeout
- }
- var opt MQTT.ClientOptions
- opt.ClientID = "MQTT_NODE_" + guid.S()
- opt.CleanSession = M.config.ClearSession
- opt.KeepAlive = time.Second * 30
- opt.ConnectTimeout = time.Duration(M.config.Timeout) * time.Second
- switch M.config.Credentials {
- case "Basic":
- opt.Username = M.config.UserName
- opt.Password = M.config.Password
- default:
- }
- if M.config.ClientId != "" {
- opt.ClientID = M.config.ClientId
- }
- opt.AutoReconnect = true
- opt.MaxReconnectInterval = 10
- opt.AddBroker(fmt.Sprintf("tcp://%s:%d", M.config.Host, M.config.Port))
- c := MQTT.NewClient(&opt)
- M.mqttClient = c
- go func() {
- if token := c.Connect(); token.Wait() && token.Error() != nil {
- return
- }
- }()
- return nil
- }
- type MeteData struct {
- VendorId string
- DeviceId string
- SubDeviceId string
- }
- func (M *MQTTBrokerNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
- var meteData MeteData
- if v, ok := message.MetaData["vendor_id"]; ok {
- meteData.VendorId = v.(string)
- }
- if v, ok := message.MetaData["device_id"]; ok {
- meteData.DeviceId = v.(string)
- }
- if v, ok := message.MetaData["sub_device_id"]; ok {
- meteData.SubDeviceId = v.(string)
- }
- tpl, err := template.New("topic").Parse(defaultTopicTemp)
- if err != nil {
- return err
- }
- stringBuf := new(strings.Builder)
- if err = tpl.Execute(stringBuf, &meteData); err != nil {
- return nil
- }
- topic := stringBuf.String()
- token := M.mqttClient.Publish(topic, 0, false, message.Data)
- fmt.Printf("+++++++++++++++%v\r\n", M.mqttClient)
- if token.Error() != nil {
- ctx.TellNext(message, protocol.Failure)
- } else {
- ctx.TellNext(message, protocol.Success)
- }
- return nil
- }
|