connection.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package mqtt
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "sparrow/pkg/models"
  8. "sparrow/pkg/rpcs"
  9. "sparrow/pkg/server"
  10. "sync"
  11. "time"
  12. )
  13. // const def
  14. const (
  15. SendChanLen = 16
  16. defaultKeepAlive = 30
  17. )
  18. // ResponseType response type
  19. type ResponseType struct {
  20. SendTime uint8
  21. PublishType uint8
  22. DataType string
  23. }
  24. // Connection client connection
  25. type Connection struct {
  26. Mgr *Manager
  27. DeviceID string
  28. Conn net.Conn
  29. SendChan chan Message
  30. MessageID uint16
  31. MessageWaitChan map[uint16]chan error
  32. KeepAlive uint16
  33. LastHbTime int64
  34. Token []byte
  35. VendorId string
  36. closeChan chan struct{}
  37. DeviceCode string
  38. mLock sync.Mutex
  39. }
  40. // NewConnection create a connection
  41. func NewConnection(conn net.Conn, mgr *Manager) *Connection {
  42. sendchan := make(chan Message, SendChanLen)
  43. c := &Connection{
  44. Conn: conn,
  45. SendChan: sendchan,
  46. Mgr: mgr,
  47. KeepAlive: defaultKeepAlive,
  48. MessageWaitChan: make(map[uint16]chan error),
  49. closeChan: make(chan struct{}),
  50. }
  51. go c.SendMsgToClient()
  52. go c.RcvMsgFromClient()
  53. return c
  54. }
  55. // Submit submit a message to send chan
  56. func (c *Connection) Submit(msg Message) {
  57. if c.Conn != nil {
  58. c.SendChan <- msg
  59. }
  60. }
  61. // Publish will publish a message , and return a chan to wait for completion.
  62. func (c *Connection) Publish(msg Message, timeout time.Duration) error {
  63. message := msg.(*Publish)
  64. message.MessageID = c.MessageID
  65. c.MessageID++
  66. c.Submit(message)
  67. server.Log.Debugf("publishing message Id : %v, timeout %v", message.MessageID, timeout)
  68. ch := make(chan error)
  69. // we don't wait for confirm.
  70. if timeout == 0 {
  71. return nil
  72. }
  73. c.MessageWaitChan[message.MessageID] = ch
  74. // wait for timeout and
  75. go func() {
  76. timer := time.NewTimer(timeout)
  77. <-timer.C
  78. waitCh, exist := c.MessageWaitChan[message.MessageID]
  79. if exist {
  80. waitCh <- errors.New("timeout publishing message")
  81. delete(c.MessageWaitChan, message.MessageID)
  82. close(waitCh)
  83. }
  84. }()
  85. err := <-ch
  86. return err
  87. }
  88. func (c *Connection) confirmPublish(MessageID uint16) {
  89. server.Log.Debugf("[confirmPublish]收到消息Id: %d", MessageID)
  90. waitCh, exist := c.MessageWaitChan[MessageID]
  91. if exist {
  92. waitCh <- nil
  93. delete(c.MessageWaitChan, MessageID)
  94. close(waitCh)
  95. }
  96. }
  97. // ValidateToken validate token
  98. func (c *Connection) ValidateToken(token []byte) error {
  99. err := c.Mgr.Provider.ValidateDeviceToken(c.DeviceID, token)
  100. if err != nil {
  101. return err
  102. }
  103. c.Token = token
  104. return nil
  105. }
  106. // Close connection
  107. func (c *Connection) Close() {
  108. c.mLock.Lock()
  109. defer c.mLock.Unlock()
  110. DeviceID := c.DeviceID
  111. c.MessageID = 0
  112. server.Log.Infof("closing connection of device %v", DeviceID)
  113. if c.Conn != nil {
  114. _ = c.Conn.Close()
  115. c.Conn = nil
  116. _ = c.Mgr.Provider.OnDeviceOffline(c.DeviceCode, c.VendorId)
  117. }
  118. if c.SendChan != nil {
  119. close(c.SendChan)
  120. c.SendChan = nil
  121. close(c.closeChan)
  122. }
  123. }
  124. func (c *Connection) RcvMsgFromClient() {
  125. conn := c.Conn
  126. host := conn.RemoteAddr().String()
  127. server.Log.Infof("receive new connection from %s", host)
  128. for {
  129. msg, err := DecodeOneMessage(conn)
  130. if err != nil {
  131. server.Log.Errorf("read error: %s", err)
  132. c.Close()
  133. return
  134. }
  135. c.LastHbTime = time.Now().Unix()
  136. switch msg := msg.(type) {
  137. case *Connect:
  138. ret := RetCodeAccepted
  139. if msg.ProtocolVersion == 3 && msg.ProtocolName != "MQIsdp" {
  140. ret = RetCodeUnacceptableProtocolVersion
  141. } else if msg.ProtocolVersion == 4 && msg.ProtocolName != "MQTT" {
  142. ret = RetCodeUnacceptableProtocolVersion
  143. } else if msg.ProtocolVersion > 4 {
  144. ret = RetCodeUnacceptableProtocolVersion
  145. }
  146. if len(msg.ClientID) < 1 || len(msg.ClientID) > 23 {
  147. server.Log.Errorf("invalid ClientID length: %d", len(msg.ClientID))
  148. ret = RetCodeIdentifierRejected
  149. c.Close()
  150. return
  151. }
  152. DeviceID, err := ClientIDToDeviceID(msg.ClientID)
  153. if err != nil {
  154. server.Log.Errorf("invalid Identify: %d", ret)
  155. c.Close()
  156. return
  157. }
  158. device := &models.Device{}
  159. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceById", DeviceID, device)
  160. if err != nil {
  161. server.Log.Errorf("device not found %d", DeviceID)
  162. c.Close()
  163. return
  164. }
  165. c.DeviceID = device.RecordId
  166. c.VendorId = device.VendorID
  167. c.DeviceCode = device.DeviceIdentifier
  168. token, err := hex.DecodeString(msg.Password)
  169. if err != nil {
  170. server.Log.Errorf("token format error : %v", err)
  171. ret = RetCodeNotAuthorized
  172. c.Close()
  173. return
  174. }
  175. err = c.ValidateToken(token)
  176. if err != nil {
  177. server.Log.Errorf("validate device token error. deviceid : %v, token : %s, error: %v", c.DeviceCode, hex.EncodeToString(token), err)
  178. ret = RetCodeNotAuthorized
  179. c.Close()
  180. return
  181. }
  182. if ret != RetCodeAccepted {
  183. server.Log.Errorf("invalid CON: %d", ret)
  184. c.Close()
  185. return
  186. }
  187. args := rpcs.ArgsGetOnline{
  188. Id: device.DeviceIdentifier,
  189. ClientIP: host,
  190. AccessRPCHost: server.GetRPCHost(),
  191. //HeartbeatInterval: uint32(c.KeepAlive),
  192. HeartbeatInterval: 300,
  193. }
  194. c.Mgr.AddConn(c.DeviceCode, c)
  195. connack := &ConnAck{
  196. ReturnCode: ret,
  197. }
  198. c.Submit(connack)
  199. c.KeepAlive = msg.KeepAliveTimer
  200. err = c.Mgr.Provider.OnDeviceOnline(args, c.VendorId)
  201. if err != nil {
  202. server.Log.Errorf("device online error : %v", err)
  203. c.Close()
  204. return
  205. }
  206. case *Publish:
  207. err = c.Mgr.PublishMessage2Server(c.DeviceCode, c.VendorId, msg)
  208. if err != nil {
  209. server.Log.Errorf("PublishMessage2Server error:%s", err.Error())
  210. }
  211. if msg.QosLevel.IsAtLeastOnce() {
  212. publishack := &PubAck{MessageID: msg.MessageID}
  213. c.Submit(publishack)
  214. } else if msg.QosLevel.IsExactlyOnce() {
  215. server.Log.Infof("publish Rec send now")
  216. publishRec := &PubRec{MessageID: msg.MessageID}
  217. c.Submit(publishRec)
  218. }
  219. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceCode)
  220. if err != nil {
  221. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  222. c.Close()
  223. return
  224. }
  225. case *PubAck:
  226. c.confirmPublish(msg.MessageID)
  227. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceCode)
  228. if err != nil {
  229. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  230. c.Close()
  231. return
  232. }
  233. case *PubRec:
  234. server.Log.Infof("%s, comes publish rec", host)
  235. publishRel := &PubRel{MessageID: msg.MessageID}
  236. c.Submit(publishRel)
  237. case *PubRel:
  238. server.Log.Infof("%s, comes publish rel", host)
  239. publishCom := &PubComp{MessageID: msg.MessageID}
  240. c.Submit(publishCom)
  241. case *PubComp:
  242. server.Log.Infof("%s, comes publish comp", host)
  243. c.confirmPublish(msg.MessageID)
  244. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceCode)
  245. if err != nil {
  246. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  247. c.Close()
  248. return
  249. }
  250. case *PingReq:
  251. if c.DeviceCode == "08D1F9E77840" {
  252. println(fmt.Sprintf("收到ping req:%s", c.DeviceCode))
  253. }
  254. pingrsp := &PingResp{}
  255. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceCode)
  256. if err != nil {
  257. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  258. c.Close()
  259. return
  260. }
  261. c.Submit(pingrsp)
  262. case *Subscribe:
  263. server.Log.Infof("%s, subscribe topic: %v", c.DeviceCode, msg.Topics)
  264. case *Unsubscribe:
  265. server.Log.Infof("%s, unsubscribe topic: %v", host, msg.Topics)
  266. case *Disconnect:
  267. server.Log.Infof("%s, disconnect now, exit...", c.DeviceCode)
  268. c.Close()
  269. return
  270. default:
  271. server.Log.Errorf("unknown msg type %T", msg)
  272. c.Close()
  273. return
  274. }
  275. }
  276. }
  277. func (c *Connection) SendMsgToClient() {
  278. if c.Conn == nil {
  279. return
  280. }
  281. host := c.Conn.RemoteAddr()
  282. for {
  283. select {
  284. case <-c.closeChan:
  285. return
  286. case msg, ok := <-c.SendChan:
  287. if c.Conn == nil {
  288. return
  289. }
  290. if !ok {
  291. server.Log.Errorf("%s is end now", host)
  292. return
  293. }
  294. err := msg.Encode(c.Conn)
  295. if err != nil {
  296. server.Log.Errorf("send msg err: %s=====\n%v\n=====", err, msg)
  297. continue
  298. }
  299. }
  300. }
  301. }