connection.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package mqtt
  2. import (
  3. "errors"
  4. "net"
  5. "sparrow/pkg/models"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/server"
  8. "sync"
  9. "time"
  10. )
  11. // const def
  12. const (
  13. SendChanLen = 16
  14. defaultKeepAlive = 30
  15. )
  16. // ResponseType response type
  17. type ResponseType struct {
  18. SendTime uint8
  19. PublishType uint8
  20. DataType string
  21. }
  22. // Connection client connection
  23. type Connection struct {
  24. Mgr *Manager
  25. DeviceID string
  26. Conn net.Conn
  27. SendChan chan Message
  28. MessageID uint16
  29. MessageWaitChan map[uint16]chan error
  30. KeepAlive uint16
  31. LastHbTime int64
  32. Token []byte
  33. VendorId string
  34. closeChan chan struct{}
  35. DeviceCode string
  36. mLock sync.Mutex
  37. }
  38. // NewConnection create a connection
  39. func NewConnection(conn net.Conn, mgr *Manager) *Connection {
  40. sendchan := make(chan Message, SendChanLen)
  41. c := &Connection{
  42. Conn: conn,
  43. SendChan: sendchan,
  44. Mgr: mgr,
  45. KeepAlive: defaultKeepAlive,
  46. MessageWaitChan: make(map[uint16]chan error),
  47. closeChan: make(chan struct{}),
  48. }
  49. go c.SendMsgToClient()
  50. go c.RcvMsgFromClient()
  51. return c
  52. }
  53. // Submit submit a message to send chan
  54. func (c *Connection) Submit(msg Message) {
  55. if c.Conn != nil {
  56. c.SendChan <- msg
  57. }
  58. }
  59. // Publish will publish a message , and return a chan to wait for completion.
  60. func (c *Connection) Publish(msg Message, timeout time.Duration) error {
  61. message := msg.(*Publish)
  62. message.MessageID = c.MessageID
  63. c.MessageID++
  64. c.Submit(message)
  65. server.Log.Debugf("publishing message Id : %v, timeout %v", message.MessageID, timeout)
  66. ch := make(chan error)
  67. // we don't wait for confirm.
  68. if timeout == 0 {
  69. return nil
  70. }
  71. c.MessageWaitChan[message.MessageID] = ch
  72. // wait for timeout and
  73. go func() {
  74. timer := time.NewTimer(timeout)
  75. <-timer.C
  76. waitCh, exist := c.MessageWaitChan[message.MessageID]
  77. if exist {
  78. waitCh <- errors.New("timeout publishing message")
  79. delete(c.MessageWaitChan, message.MessageID)
  80. close(waitCh)
  81. }
  82. }()
  83. err := <-ch
  84. return err
  85. }
  86. func (c *Connection) confirmPublish(MessageID uint16) {
  87. server.Log.Debugf("[confirmPublish]收到消息Id: %d", MessageID)
  88. waitCh, exist := c.MessageWaitChan[MessageID]
  89. if exist {
  90. waitCh <- nil
  91. delete(c.MessageWaitChan, MessageID)
  92. close(waitCh)
  93. }
  94. }
  95. // ValidateToken validate token
  96. func (c *Connection) ValidateToken(token []byte) error {
  97. err := c.Mgr.Provider.ValidateDeviceToken(c.DeviceID, token)
  98. if err != nil {
  99. return err
  100. }
  101. c.Token = token
  102. return nil
  103. }
  104. // Close connection
  105. func (c *Connection) Close() {
  106. c.mLock.Lock()
  107. defer c.mLock.Unlock()
  108. DeviceID := c.DeviceID
  109. c.MessageID = 0
  110. server.Log.Infof("closing connection of device %v", DeviceID)
  111. if c.Conn != nil {
  112. _ = c.Conn.Close()
  113. c.Conn = nil
  114. _ = c.Mgr.Provider.OnDeviceOffline(c.DeviceCode, c.VendorId)
  115. }
  116. if c.SendChan != nil {
  117. close(c.SendChan)
  118. c.SendChan = nil
  119. close(c.closeChan)
  120. }
  121. }
  122. func (c *Connection) RcvMsgFromClient() {
  123. conn := c.Conn
  124. host := conn.RemoteAddr().String()
  125. server.Log.Infof("receive new connection from %s", host)
  126. for {
  127. msg, err := DecodeOneMessage(conn)
  128. if err != nil {
  129. server.Log.Errorf("read error: %s", err)
  130. c.Close()
  131. return
  132. }
  133. c.LastHbTime = time.Now().Unix()
  134. switch msg := msg.(type) {
  135. case *Connect:
  136. ret := RetCodeAccepted
  137. if msg.ProtocolVersion == 3 && msg.ProtocolName != "MQIsdp" {
  138. ret = RetCodeUnacceptableProtocolVersion
  139. } else if msg.ProtocolVersion == 4 && msg.ProtocolName != "MQTT" {
  140. ret = RetCodeUnacceptableProtocolVersion
  141. } else if msg.ProtocolVersion > 4 {
  142. ret = RetCodeUnacceptableProtocolVersion
  143. }
  144. if len(msg.ClientID) < 1 || len(msg.ClientID) > 23 {
  145. server.Log.Errorf("invalid ClientID length: %d", len(msg.ClientID))
  146. ret = RetCodeIdentifierRejected
  147. c.Close()
  148. return
  149. }
  150. DeviceID, err := ClientIDToDeviceID(msg.ClientID)
  151. if err != nil {
  152. server.Log.Errorf("invalid Identify: %d", ret)
  153. c.Close()
  154. return
  155. }
  156. device := &models.Device{}
  157. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceById", DeviceID, device)
  158. if err != nil {
  159. server.Log.Errorf("device not found %d", DeviceID)
  160. c.Close()
  161. return
  162. }
  163. c.DeviceID = device.RecordId
  164. c.VendorId = device.VendorID
  165. c.DeviceCode = device.DeviceIdentifier
  166. //token, err := hex.DecodeString(msg.Password)
  167. //if err != nil {
  168. // server.Log.Errorf("token format error : %v", err)
  169. // ret = RetCodeNotAuthorized
  170. // c.Close()
  171. // return
  172. //}
  173. //err = c.ValidateToken(token)
  174. //if err != nil {
  175. // server.Log.Errorf("validate device token error. deviceid : %v, token : %s, error: %v", c.DeviceCode, hex.EncodeToString(token), err)
  176. // ret = RetCodeNotAuthorized
  177. // c.Close()
  178. // return
  179. //}
  180. if ret != RetCodeAccepted {
  181. server.Log.Errorf("invalid CON: %d", ret)
  182. c.Close()
  183. return
  184. }
  185. args := rpcs.ArgsGetOnline{
  186. Id: device.DeviceIdentifier,
  187. ClientIP: host,
  188. AccessRPCHost: server.GetRPCHost(),
  189. HeartbeatInterval: 300,
  190. }
  191. c.Mgr.AddConn(c.DeviceCode, c)
  192. connack := &ConnAck{
  193. ReturnCode: ret,
  194. }
  195. c.Submit(connack)
  196. c.KeepAlive = msg.KeepAliveTimer
  197. err = c.Mgr.Provider.OnDeviceOnline(args, c.VendorId)
  198. if err != nil {
  199. server.Log.Errorf("device online error : %v", err)
  200. c.Close()
  201. return
  202. }
  203. case *Publish:
  204. err = c.Mgr.PublishMessage2Server(c.DeviceCode, c.VendorId, msg)
  205. if err != nil {
  206. server.Log.Errorf("PublishMessage2Server error:%s", err.Error())
  207. }
  208. if msg.QosLevel.IsAtLeastOnce() {
  209. publishack := &PubAck{MessageID: msg.MessageID}
  210. c.Submit(publishack)
  211. } else if msg.QosLevel.IsExactlyOnce() {
  212. server.Log.Infof("publish Rec send now")
  213. publishRec := &PubRec{MessageID: msg.MessageID}
  214. c.Submit(publishRec)
  215. }
  216. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceCode)
  217. if err != nil {
  218. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  219. c.Close()
  220. return
  221. }
  222. case *PubAck:
  223. c.confirmPublish(msg.MessageID)
  224. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceCode)
  225. if err != nil {
  226. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  227. c.Close()
  228. return
  229. }
  230. case *PubRec:
  231. server.Log.Infof("%s, comes publish rec", host)
  232. publishRel := &PubRel{MessageID: msg.MessageID}
  233. c.Submit(publishRel)
  234. case *PubRel:
  235. server.Log.Infof("%s, comes publish rel", host)
  236. publishCom := &PubComp{MessageID: msg.MessageID}
  237. c.Submit(publishCom)
  238. case *PubComp:
  239. server.Log.Infof("%s, comes publish comp", host)
  240. c.confirmPublish(msg.MessageID)
  241. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceCode)
  242. if err != nil {
  243. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  244. c.Close()
  245. return
  246. }
  247. case *PingReq:
  248. pingrsp := &PingResp{}
  249. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceCode)
  250. if err != nil {
  251. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  252. c.Close()
  253. return
  254. }
  255. c.Submit(pingrsp)
  256. case *Subscribe:
  257. server.Log.Infof("%s, subscribe topic: %v", c.DeviceCode, msg.Topics)
  258. case *Unsubscribe:
  259. server.Log.Infof("%s, unsubscribe topic: %v", host, msg.Topics)
  260. case *Disconnect:
  261. server.Log.Infof("%s, disconnect now, exit...", c.DeviceCode)
  262. c.Close()
  263. return
  264. default:
  265. server.Log.Errorf("unknown msg type %T", msg)
  266. c.Close()
  267. return
  268. }
  269. }
  270. }
  271. func (c *Connection) SendMsgToClient() {
  272. if c.Conn == nil {
  273. return
  274. }
  275. host := c.Conn.RemoteAddr()
  276. for {
  277. select {
  278. case <-c.closeChan:
  279. return
  280. case msg, ok := <-c.SendChan:
  281. if c.Conn == nil {
  282. return
  283. }
  284. if !ok {
  285. server.Log.Errorf("%s is end now", host)
  286. return
  287. }
  288. err := msg.Encode(c.Conn)
  289. if err != nil {
  290. server.Log.Errorf("send msg err: %s=====\n%v\n=====", err, msg)
  291. continue
  292. }
  293. }
  294. }
  295. }