liuxiulin 4 months ago
parent
commit
7c32c9f40c

+ 1 - 2
services/emqx-agent/client/mqtt_client.go

@@ -59,8 +59,7 @@ func initMqttClient(conf *MqttConfig) (mc MQTT.Client, err error) {
 		SetUsername(conf.User).
 		SetPassword(conf.Password).
 		SetConnectRetry(true).
-		SetCleanSession(true).
-		SetKeepAlive(120)
+		SetCleanSession(true)
 	opts.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
 		server.Log.Infof("connect to %s", broker.String())
 		return tlsCfg

+ 1 - 1
services/emqx-agent/main.go

@@ -234,7 +234,7 @@ func main() {
 		User:     g.Cfg().GetString("mqtt.user"),
 		Password: g.Cfg().GetString("mqtt.password"),
 		Brokers:  g.Cfg().GetStrings("mqtt.brokers"),
-		ConnNum:  1,
+		ConnNum:  10,
 	})
 	if err != nil {
 		panic(err)

+ 6 - 7
services/emqx-agent/sub_dev.go

@@ -97,13 +97,12 @@ func (d *MqttClient) SubDevMsg(handle Handle) error {
 }
 
 func (d *MqttClient) subDevMsg(cli mqtt.Client, handle Handle) error {
-	//err := d.subscribeWithFunc(cli, TopicConnectStatus, d.subConnectStatus(handle))
-	//server.Log.Infof("subDevMsg")
-	//if err != nil {
-	//	server.Log.Infof("subDevMsg err:%v", err)
-	//	return err
-	//}
-	err := d.subscribeWithFunc(cli, TopicThing, func(ctx context.Context, topic string, payload []byte) error {
+	err := d.subscribeWithFunc(cli, TopicConnectStatus, d.subConnectStatus(handle))
+	if err != nil {
+		server.Log.Infof("subDevMsg err:%v", err)
+		return err
+	}
+	err = d.subscribeWithFunc(cli, TopicThing, func(ctx context.Context, topic string, payload []byte) error {
 		return handle(ctx).Message(topic, payload)
 	})
 	if err != nil {