lijian 4 лет назад
Родитель
Сommit
e98d0897f4
1 измененных файлов с 17 добавлено и 16 удалено
  1. 17 16
      pkg/mqtt/connection.go

+ 17 - 16
pkg/mqtt/connection.go

@@ -7,7 +7,6 @@ import (
 	"sparrow/pkg/models"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
-	"sync"
 	"time"
 )
 
@@ -36,7 +35,8 @@ type Connection struct {
 	LastHbTime      int64
 	Token           []byte
 	VendorId        string
-	lock sync.Mutex
+	closeChan chan struct{}
+
 }
 
 // NewConnection create a connection
@@ -48,6 +48,7 @@ func NewConnection(conn net.Conn, mgr *Manager) *Connection {
 		Mgr:             mgr,
 		KeepAlive:       defaultKeepAlive,
 		MessageWaitChan: make(map[uint16]chan error),
+		closeChan: make(chan struct{}),
 	}
 
 	go c.SendMsgToClient()
@@ -119,8 +120,6 @@ func (c *Connection) ValidateToken(token []byte) error {
 
 // Close close
 func (c *Connection) Close() {
-	c.lock.Lock()
-	defer c.lock.Unlock()
 	DeviceID := c.DeviceID
 	server.Log.Infof("closing connection of device %v", DeviceID)
 	if c.Conn != nil {
@@ -132,6 +131,7 @@ func (c *Connection) Close() {
 		close(c.SendChan)
 		c.SendChan = nil
 	}
+	close(c.closeChan)
 }
 
 func (c *Connection) RcvMsgFromClient() {
@@ -311,19 +311,20 @@ func (c *Connection) SendMsgToClient() {
 	}
 	host := c.Conn.RemoteAddr()
 	for {
-		if c.SendChan == nil {
-			return
-		}
-		msg, ok := <-c.SendChan
-		if !ok {
-			server.Log.Errorf("%s is end now", host)
+		select {
+		case <-c.closeChan:
 			return
-		}
-		server.Log.Debugf("send msg to %s=======\n%v\n=========", host, msg)
-		err := msg.Encode(c.Conn)
-		if err != nil {
-			server.Log.Errorf("send msg err: %s=====\n%v\n=====", err, msg)
-			continue
+		case msg, ok :=<-c.SendChan:
+			if !ok {
+				server.Log.Errorf("%s is end now", host)
+				return
+			}
+			server.Log.Debugf("send msg to %s=======\n%v\n=========", host, msg)
+			err := msg.Encode(c.Conn)
+			if err != nil {
+				server.Log.Errorf("send msg err: %s=====\n%v\n=====", err, msg)
+				continue
+			}
 		}
 	}
 }