|
@@ -54,6 +54,7 @@ type MQTTBrokerNodeCfg struct {
|
|
|
}
|
|
|
|
|
|
func (M *MQTTBrokerNode) Init(ctx ruleEngine.Context, config string) error {
|
|
|
+ fmt.Printf("进入初始化:%s\r\n", config)
|
|
|
if config != "" {
|
|
|
c := new(MQTTBrokerNodeCfg)
|
|
|
err := json.Unmarshal([]byte(config), c)
|
|
@@ -86,6 +87,7 @@ func (M *MQTTBrokerNode) Init(ctx ruleEngine.Context, config string) error {
|
|
|
opt.AddBroker(fmt.Sprintf("tcp://%s:%d", M.config.Host, M.config.Port))
|
|
|
c := MQTT.NewClient(&opt)
|
|
|
M.mqttClient = c
|
|
|
+ fmt.Printf("M.mqttClient = c:%v\r\n", c)
|
|
|
go func() {
|
|
|
if token := c.Connect(); token.Wait() && token.Error() != nil {
|
|
|
return
|
|
@@ -120,7 +122,6 @@ func (M *MQTTBrokerNode) OnMessage(ctx ruleEngine.Context, message *protocol.Mes
|
|
|
return nil
|
|
|
}
|
|
|
topic := stringBuf.String()
|
|
|
- fmt.Printf("+++++++++++++++%v\r\n", M.mqttClient)
|
|
|
if M.mqttClient != nil {
|
|
|
token := M.mqttClient.Publish(topic, 0, false, message.Data)
|
|
|
if token.Error() != nil {
|