mqtt_broker_node.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package nodes
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
  7. "github.com/gogf/gf/util/guid"
  8. "html/template"
  9. "sparrow/pkg/logger"
  10. "sparrow/pkg/protocol"
  11. "sparrow/pkg/ruleEngine"
  12. "strings"
  13. "time"
  14. )
  15. const defaultTopicTemp = "/device_message/{{.VendorId}}/{{.DeviceId}}/{{.SubDeviceId}}"
  16. const defaultTimeout = 5
  17. // MQTTBrokerNode Publish incoming message payload to the topic of the configured MQTT broker with QoS AT_LEAST_ONCE.
  18. // In case of successful message publishing, original Message will be passed to the next nodes via Success chain,
  19. //otherwise Failure chain is used.
  20. type MQTTBrokerNode struct {
  21. mqttClient *MQTT.Client
  22. config *MQTTBrokerNodeCfg
  23. }
  24. type MQTTBrokerNodeCfg struct {
  25. // can be a static string, or pattern that is resolved using Message Metadata properties. .
  26. // default topic device_message/{{.VendorId}}/{{.DeviceId}}
  27. TopicPattern string `json:"topic_pattern"`
  28. // MQTT broker host.
  29. Host string `json:"host"`
  30. // MQTT broker port.
  31. Port int `json:"port"`
  32. // timeout in seconds for connecting to MQTT broker.
  33. Timeout int `json:"timeout"`
  34. // optional client identifier used for connecting to MQTT broker. If not specified, default generated clientId will be used.
  35. ClientId string `json:"client_id"`
  36. // establishes a non persistent connection with the broker when enabled.
  37. ClearSession bool `json:"clear_session"`
  38. // enable/disable secure communication.
  39. SSLEnable bool `json:"ssl_enable"`
  40. // MQTT connection credentials. Can be either Anonymous, Basic or PEM.
  41. Credentials string `json:"credentials"` // Anonymous, Basic, PEM
  42. /*
  43. If PEM credentials type is selected, the following configuration should be provided:
  44. CA certificate file
  45. Certificate file
  46. Private key file
  47. Private key password
  48. */
  49. UserName string `json:"user_name"`
  50. Password string `json:"password"`
  51. }
  52. func (M *MQTTBrokerNode) Init(ctx ruleEngine.Context, config string) error {
  53. if config != "" {
  54. c := new(MQTTBrokerNodeCfg)
  55. err := json.Unmarshal([]byte(config), c)
  56. if err != nil {
  57. logger.Errorf(context.Background(), "初始化失败:%s", err.Error())
  58. return err
  59. }
  60. M.config = c
  61. }
  62. if M.config.Timeout == 0 {
  63. M.config.Timeout = defaultTimeout
  64. }
  65. var opt MQTT.ClientOptions
  66. opt.ClientID = "MQTT_NODE_" + guid.S()
  67. opt.CleanSession = M.config.ClearSession
  68. opt.KeepAlive = time.Second * 30
  69. opt.ConnectTimeout = time.Duration(M.config.Timeout) * time.Second
  70. switch M.config.Credentials {
  71. case "Basic":
  72. opt.Username = M.config.UserName
  73. opt.Password = M.config.Password
  74. default:
  75. }
  76. if M.config.ClientId != "" {
  77. opt.ClientID = M.config.ClientId
  78. }
  79. opt.AutoReconnect = true
  80. opt.MaxReconnectInterval = 10
  81. opt.AddBroker(fmt.Sprintf("tcp://%s:%d", M.config.Host, M.config.Port))
  82. c := MQTT.NewClient(&opt)
  83. M.mqttClient = c
  84. fmt.Printf("M.mqttClient = c:%v\r\n", c)
  85. go func() {
  86. if token := c.Connect(); token.Wait() && token.Error() != nil {
  87. return
  88. }
  89. }()
  90. return nil
  91. }
  92. type MeteData struct {
  93. VendorId string
  94. DeviceId string
  95. SubDeviceId string
  96. }
  97. func (M *MQTTBrokerNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
  98. var meteData MeteData
  99. if v, ok := message.MetaData["vendor_id"]; ok {
  100. meteData.VendorId = v.(string)
  101. }
  102. if v, ok := message.MetaData["device_id"]; ok {
  103. meteData.DeviceId = v.(string)
  104. }
  105. if v, ok := message.MetaData["sub_device_id"]; ok {
  106. meteData.SubDeviceId = v.(string)
  107. }
  108. tpl, err := template.New("topic").Parse(defaultTopicTemp)
  109. if err != nil {
  110. return err
  111. }
  112. stringBuf := new(strings.Builder)
  113. if err = tpl.Execute(stringBuf, &meteData); err != nil {
  114. return nil
  115. }
  116. topic := stringBuf.String()
  117. if M.mqttClient != nil {
  118. token := M.mqttClient.Publish(topic, 0, false, message.Data)
  119. if token.Error() != nil {
  120. ctx.TellNext(message, protocol.Failure)
  121. } else {
  122. ctx.TellNext(message, protocol.Success)
  123. }
  124. }
  125. return nil
  126. }