lijian 4 vuotta sitten
vanhempi
commit
a3cde1b9e2
1 muutettua tiedostoa jossa 11 lisäystä ja 0 poistoa
  1. 11 0
      pkg/mqtt/connection.go

+ 11 - 0
pkg/mqtt/connection.go

@@ -3,10 +3,12 @@ package mqtt
 import (
 	"encoding/hex"
 	"errors"
+	"fmt"
 	"net"
 	"sparrow/pkg/models"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
+	"sync"
 	"time"
 )
 
@@ -35,6 +37,7 @@ type Connection struct {
 	LastHbTime      int64
 	Token           []byte
 	VendorId        string
+	lock sync.Mutex
 }
 
 // NewConnection create a connection
@@ -117,6 +120,8 @@ func (c *Connection) ValidateToken(token []byte) error {
 
 // Close close
 func (c *Connection) Close() {
+	c.lock.Lock()
+	defer c.lock.Unlock()
 	DeviceID := c.DeviceID
 	server.Log.Infof("closing connection of device %v", DeviceID)
 	if c.Conn != nil {
@@ -309,12 +314,18 @@ func (c *Connection) SendMsgToClient() {
 			server.Log.Errorf("%s is end now", host)
 			return
 		}
+		c.lock.Lock()
 
 		server.Log.Debugf("send msg to %s=======\n%v\n=========", host, msg)
+		fmt.Printf("客户端实例:%v, 消息内容:%v", c.Conn, msg)
+		if c.Conn == nil {
+			fmt.Println("实例为空")
+		}
 		err := msg.Encode(c.Conn)
 		if err != nil {
 			server.Log.Errorf("send msg err: %s=====\n%v\n=====", err, msg)
 			continue
 		}
+		c.lock.Unlock()
 	}
 }