Browse Source

增加mqtt broker节点

lijian 2 years ago
parent
commit
721f8c3895

+ 1 - 0
pkg/mysql/migrate.go

@@ -47,6 +47,7 @@ func MigrateDatabase(dbhost, dbport, dbname, dbuser, dbpass string) error {
 		&models.RuleNode{},
 		&models.RuleChain{},
 		&models.DeviceNetConfig{},
+		&models.SubDevice{},
 	).Error
 	if err != nil {
 		fmt.Printf("%s", err.Error())

+ 124 - 0
pkg/ruleEngine/nodes/mqtt_broker_node.go

@@ -0,0 +1,124 @@
+package nodes
+
+import (
+	"encoding/json"
+	"fmt"
+	MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
+	"github.com/gogf/gf/util/guid"
+	"html/template"
+	"sparrow/pkg/protocol"
+	"sparrow/pkg/ruleEngine"
+	"strings"
+	"time"
+)
+
+const defaultTopicTemp = "device_message/{{.VendorId}}/{{.DeviceId}}"
+const defaultTimeout = 5
+
+// MQTTBrokerNode Publish incoming message payload to the topic of the configured MQTT broker with QoS AT_LEAST_ONCE.
+// In case of successful message publishing, original Message will be passed to the next nodes via Success chain,
+//otherwise Failure chain is used.
+type MQTTBrokerNode struct {
+	mqttClient *MQTT.Client
+	config     *MQTTBrokerNodeCfg
+}
+
+type MQTTBrokerNodeCfg struct {
+	// can be a static string, or pattern that is resolved using Message Metadata properties. .
+	// default topic device_message/{{.VendorId}}/{{.DeviceId}}
+	TopicPattern string `json:"topic_pattern"`
+	// MQTT broker host.
+	Host string `json:"host"`
+	// MQTT broker port.
+	Port int `json:"port"`
+	//  timeout in seconds for connecting to MQTT broker.
+	Timeout int `json:"timeout"`
+	//  optional client identifier used for connecting to MQTT broker. If not specified, default generated clientId will be used.
+	ClientId string `json:"client_id"`
+	// establishes a non persistent connection with the broker when enabled.
+	ClearSession bool `json:"clear_session"`
+	// enable/disable secure communication.
+	SSLEnable bool `json:"ssl_enable"`
+	// MQTT connection credentials. Can be either Anonymous, Basic or PEM.
+	Credentials string `json:"credentials"` // Anonymous, Basic, PEM
+	/*
+		If PEM credentials type is selected, the following configuration should be provided:
+
+		CA certificate file
+		Certificate file
+		Private key file
+		Private key password
+	*/
+	UserName string `json:"user_name"`
+	Password string `json:"password"`
+}
+
+func (M *MQTTBrokerNode) Init(ctx ruleEngine.Context, config string) error {
+	if config != "" {
+		c := new(MQTTBrokerNodeCfg)
+		err := json.Unmarshal([]byte(config), c)
+		if err != nil {
+			return err
+		}
+		M.config = c
+	}
+	if M.config.Timeout == 0 {
+		M.config.Timeout = defaultTimeout
+	}
+
+	var opt MQTT.ClientOptions
+	opt.ClientID = "MQTT_NODE_" + guid.S()
+	opt.CleanSession = M.config.ClearSession
+	opt.KeepAlive = time.Second * 30
+	opt.ConnectTimeout = time.Duration(M.config.Timeout) * time.Second
+	switch M.config.Credentials {
+	case "Basic":
+		opt.Username = M.config.UserName
+		opt.Password = M.config.Password
+	default:
+
+	}
+	if M.config.ClientId != "" {
+		opt.ClientID = M.config.ClientId
+	}
+	opt.AutoReconnect = true
+	opt.MaxReconnectInterval = 10
+	opt.AddBroker(fmt.Sprintf("tcp://%s:%d", M.config.Host, M.config.Port))
+	c := MQTT.NewClient(&opt)
+	M.mqttClient = c
+	go func() {
+		if token := c.Connect(); token.Wait() && token.Error() != nil {
+			return
+		}
+	}()
+	return nil
+}
+
+type MeteData struct {
+	VendorId string
+	DeviceId string
+}
+
+func (M *MQTTBrokerNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
+	if M.mqttClient.IsConnected() {
+		var meteData MeteData
+		meteData.VendorId = message.MetaData["vendor_id"].(string)
+		meteData.VendorId = message.MetaData["device_id"].(string)
+		tpl, err := template.New("topic").Parse(defaultTopicTemp)
+		if err != nil {
+			return err
+		}
+		stringBuf := new(strings.Builder)
+		if err = tpl.Execute(stringBuf, &meteData); err != nil {
+			return nil
+		}
+		topic := stringBuf.String()
+		token := M.mqttClient.Publish(topic, 0, false, message.Data)
+		if token.Error() != nil {
+			ctx.TellNext(message, protocol.Failure)
+		} else {
+			ctx.TellNext(message, protocol.Success)
+		}
+	}
+	return nil
+}

+ 18 - 0
pkg/ruleEngine/nodes/reg_types_test.go

@@ -3,7 +3,9 @@ package nodes
 import (
 	"encoding/json"
 	"sparrow/pkg/ruleEngine"
+	"strings"
 	"testing"
+	"text/template"
 )
 
 func TestCreateNodeByType(t *testing.T) {
@@ -19,3 +21,19 @@ func TestCreateNodeByType(t *testing.T) {
 
 	t.Log(node)
 }
+
+func TestMQTTBrokerNode_OnMessage(t *testing.T) {
+	var meteData MeteData
+	tpl, err := template.New("topic").Parse(defaultTopicTemp)
+	if err != nil {
+		t.Error(err)
+	}
+	meteData.VendorId = "lijian"
+	meteData.DeviceId = "deviceId"
+
+	stringBuf := &strings.Builder{}
+	if err = tpl.Execute(stringBuf, &meteData); err != nil {
+		t.Error(err)
+	}
+	t.Log(stringBuf.String())
+}

+ 5 - 5
pkg/ruleEngine/nodes/rest_api_request_node.go

@@ -63,11 +63,11 @@ func (r *RestApiRequestNode) OnMessage(ctx ruleEngine.Context, message *protocol
 	for k, v := range message.MetaData {
 		headers[k] = v.(string)
 	}
-	//if r.config.Headers != nil {
-	//	for _, v := range r.config.Headers {
-	//		headers[v.Key] = v.Value
-	//	}
-	//}
+	if r.config.Headers != nil {
+		for _, v := range r.config.Headers {
+			headers[v.Key] = v.Value
+		}
+	}
 
 	req, err := utils.NewRequest(r.config.Method, r.config.Url, []byte(body))
 	if err != nil {

+ 1 - 1
services/controller/controller.go

@@ -156,7 +156,7 @@ func (c *Controller) OnStatus(args rpcs.ArgsOnStatus, reply *rpcs.ReplyOnStatus)
 		Data:     data,
 		Callback: nil,
 		MetaData: map[string]interface{}{
-			"tenant_id":     args.VendorId,
+			"vendor_id":     args.VendorId,
 			"device_id":     args.DeviceId,
 			"sub_device_id": args.SubDeviceId,
 		},

BIN
tests/device/device