connection.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package mqtt
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "net"
  6. "sparrow/pkg/models"
  7. "sparrow/pkg/rpcs"
  8. "sparrow/pkg/server"
  9. "sync"
  10. "time"
  11. )
  12. // const def
  13. const (
  14. SendChanLen = 16
  15. defaultKeepAlive = 30
  16. )
  17. // ResponseType response type
  18. type ResponseType struct {
  19. SendTime uint8
  20. PublishType uint8
  21. DataType string
  22. }
  23. // Connection client connection
  24. type Connection struct {
  25. Mgr *Manager
  26. DeviceID string
  27. Conn net.Conn
  28. SendChan chan Message
  29. MessageID uint16
  30. MessageWaitChan map[uint16]chan error
  31. KeepAlive uint16
  32. LastHbTime int64
  33. Token []byte
  34. VendorId string
  35. closeChan chan struct{}
  36. DeviceCode string
  37. mLock sync.Mutex
  38. }
  39. // NewConnection create a connection
  40. func NewConnection(conn net.Conn, mgr *Manager) *Connection {
  41. sendchan := make(chan Message, SendChanLen)
  42. c := &Connection{
  43. Conn: conn,
  44. SendChan: sendchan,
  45. Mgr: mgr,
  46. KeepAlive: defaultKeepAlive,
  47. MessageWaitChan: make(map[uint16]chan error),
  48. closeChan: make(chan struct{}),
  49. }
  50. go c.SendMsgToClient()
  51. go c.RcvMsgFromClient()
  52. return c
  53. }
  54. // Submit submit a message to send chan
  55. func (c *Connection) Submit(msg Message) {
  56. if c.Conn != nil {
  57. c.SendChan <- msg
  58. }
  59. }
  60. // Publish will publish a message , and return a chan to wait for completion.
  61. func (c *Connection) Publish(msg Message, timeout time.Duration) error {
  62. message := msg.(*Publish)
  63. message.MessageID = c.MessageID
  64. c.MessageID++
  65. c.Submit(message)
  66. server.Log.Debugf("publishing message Id : %v, timeout %v", message.MessageID, timeout)
  67. ch := make(chan error)
  68. // we don't wait for confirm.
  69. if timeout == 0 {
  70. return nil
  71. }
  72. c.MessageWaitChan[message.MessageID] = ch
  73. // wait for timeout and
  74. go func() {
  75. timer := time.NewTimer(timeout)
  76. <-timer.C
  77. waitCh, exist := c.MessageWaitChan[message.MessageID]
  78. if exist {
  79. waitCh <- errors.New("timeout publishing message")
  80. delete(c.MessageWaitChan, message.MessageID)
  81. close(waitCh)
  82. }
  83. }()
  84. err := <-ch
  85. return err
  86. }
  87. func (c *Connection) confirmPublish(MessageID uint16) {
  88. server.Log.Debugf("[confirmPublish]收到消息Id: %d", MessageID)
  89. waitCh, exist := c.MessageWaitChan[MessageID]
  90. if exist {
  91. waitCh <- nil
  92. delete(c.MessageWaitChan, MessageID)
  93. close(waitCh)
  94. }
  95. }
  96. // ValidateToken validate token
  97. func (c *Connection) ValidateToken(token []byte) error {
  98. err := c.Mgr.Provider.ValidateDeviceToken(c.DeviceID, token)
  99. if err != nil {
  100. return err
  101. }
  102. c.Token = token
  103. return nil
  104. }
  105. // Close connection
  106. func (c *Connection) Close() {
  107. c.mLock.Lock()
  108. defer c.mLock.Unlock()
  109. DeviceID := c.DeviceID
  110. c.MessageID = 0
  111. server.Log.Infof("closing connection of device %v", DeviceID)
  112. if c.Conn != nil {
  113. _ = c.Conn.Close()
  114. c.Conn = nil
  115. _ = c.Mgr.Provider.OnDeviceOffline(c.DeviceCode, c.VendorId)
  116. }
  117. if c.SendChan != nil {
  118. close(c.SendChan)
  119. c.SendChan = nil
  120. close(c.closeChan)
  121. }
  122. }
  123. func (c *Connection) RcvMsgFromClient() {
  124. conn := c.Conn
  125. host := conn.RemoteAddr().String()
  126. server.Log.Infof("receive new connection from %s", host)
  127. for {
  128. msg, err := DecodeOneMessage(conn)
  129. if err != nil {
  130. server.Log.Errorf("read error: %s", err)
  131. c.Close()
  132. return
  133. }
  134. c.LastHbTime = time.Now().Unix()
  135. switch msg := msg.(type) {
  136. case *Connect:
  137. ret := RetCodeAccepted
  138. if msg.ProtocolVersion == 3 && msg.ProtocolName != "MQIsdp" {
  139. ret = RetCodeUnacceptableProtocolVersion
  140. } else if msg.ProtocolVersion == 4 && msg.ProtocolName != "MQTT" {
  141. ret = RetCodeUnacceptableProtocolVersion
  142. } else if msg.ProtocolVersion > 4 {
  143. ret = RetCodeUnacceptableProtocolVersion
  144. }
  145. if len(msg.ClientID) < 1 || len(msg.ClientID) > 23 {
  146. server.Log.Errorf("invalid ClientID length: %d", len(msg.ClientID))
  147. ret = RetCodeIdentifierRejected
  148. c.Close()
  149. return
  150. }
  151. DeviceID, err := ClientIDToDeviceID(msg.ClientID)
  152. if err != nil {
  153. server.Log.Errorf("invalid Identify: %d", ret)
  154. c.Close()
  155. return
  156. }
  157. device := &models.Device{}
  158. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceById", DeviceID, device)
  159. if err != nil {
  160. server.Log.Errorf("device not found %d", DeviceID)
  161. c.Close()
  162. return
  163. }
  164. c.DeviceID = device.RecordId
  165. c.VendorId = device.VendorID
  166. c.DeviceCode = device.DeviceIdentifier
  167. token, err := hex.DecodeString(msg.Password)
  168. if err != nil {
  169. server.Log.Errorf("token format error : %v", err)
  170. ret = RetCodeNotAuthorized
  171. c.Close()
  172. return
  173. }
  174. err = c.ValidateToken(token)
  175. if err != nil {
  176. server.Log.Errorf("validate device token error. deviceid : %v, token : %s, error: %v", c.DeviceCode, hex.EncodeToString(token), err)
  177. ret = RetCodeNotAuthorized
  178. c.Close()
  179. return
  180. }
  181. if ret != RetCodeAccepted {
  182. server.Log.Errorf("invalid CON: %d", ret)
  183. c.Close()
  184. return
  185. }
  186. args := rpcs.ArgsGetOnline{
  187. Id: device.DeviceIdentifier,
  188. ClientIP: host,
  189. AccessRPCHost: server.GetRPCHost(),
  190. HeartbeatInterval: 300,
  191. }
  192. c.Mgr.AddConn(c.DeviceCode, c)
  193. connack := &ConnAck{
  194. ReturnCode: ret,
  195. }
  196. c.Submit(connack)
  197. c.KeepAlive = msg.KeepAliveTimer
  198. err = c.Mgr.Provider.OnDeviceOnline(args, c.VendorId)
  199. if err != nil {
  200. server.Log.Errorf("device online error : %v", err)
  201. c.Close()
  202. return
  203. }
  204. case *Publish:
  205. err = c.Mgr.PublishMessage2Server(c.DeviceCode, c.VendorId, msg)
  206. if err != nil {
  207. server.Log.Errorf("PublishMessage2Server error:%s", err.Error())
  208. }
  209. if msg.QosLevel.IsAtLeastOnce() {
  210. publishack := &PubAck{MessageID: msg.MessageID}
  211. c.Submit(publishack)
  212. } else if msg.QosLevel.IsExactlyOnce() {
  213. server.Log.Infof("publish Rec send now")
  214. publishRec := &PubRec{MessageID: msg.MessageID}
  215. c.Submit(publishRec)
  216. }
  217. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceCode)
  218. if err != nil {
  219. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  220. c.Close()
  221. return
  222. }
  223. case *PubAck:
  224. c.confirmPublish(msg.MessageID)
  225. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceCode)
  226. if err != nil {
  227. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  228. c.Close()
  229. return
  230. }
  231. case *PubRec:
  232. server.Log.Infof("%s, comes publish rec", host)
  233. publishRel := &PubRel{MessageID: msg.MessageID}
  234. c.Submit(publishRel)
  235. case *PubRel:
  236. server.Log.Infof("%s, comes publish rel", host)
  237. publishCom := &PubComp{MessageID: msg.MessageID}
  238. c.Submit(publishCom)
  239. case *PubComp:
  240. server.Log.Infof("%s, comes publish comp", host)
  241. c.confirmPublish(msg.MessageID)
  242. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceCode)
  243. if err != nil {
  244. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  245. c.Close()
  246. return
  247. }
  248. case *PingReq:
  249. pingrsp := &PingResp{}
  250. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceCode)
  251. if err != nil {
  252. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  253. c.Close()
  254. return
  255. }
  256. c.Submit(pingrsp)
  257. case *Subscribe:
  258. server.Log.Infof("%s, subscribe topic: %v", c.DeviceCode, msg.Topics)
  259. case *Unsubscribe:
  260. server.Log.Infof("%s, unsubscribe topic: %v", host, msg.Topics)
  261. case *Disconnect:
  262. server.Log.Infof("%s, disconnect now, exit...", c.DeviceCode)
  263. c.Close()
  264. return
  265. default:
  266. server.Log.Errorf("unknown msg type %T", msg)
  267. c.Close()
  268. return
  269. }
  270. }
  271. }
  272. func (c *Connection) SendMsgToClient() {
  273. if c.Conn == nil {
  274. return
  275. }
  276. host := c.Conn.RemoteAddr()
  277. for {
  278. select {
  279. case <-c.closeChan:
  280. return
  281. case msg, ok := <-c.SendChan:
  282. if c.Conn == nil {
  283. return
  284. }
  285. if !ok {
  286. server.Log.Errorf("%s is end now", host)
  287. return
  288. }
  289. err := msg.Encode(c.Conn)
  290. if err != nil {
  291. server.Log.Errorf("send msg err: %s=====\n%v\n=====", err, msg)
  292. continue
  293. }
  294. }
  295. }
  296. }