connection.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package mqtt
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "net"
  6. "sparrow/pkg/models"
  7. "sparrow/pkg/rpcs"
  8. "sparrow/pkg/server"
  9. "time"
  10. )
  11. // const def
  12. const (
  13. SendChanLen = 16
  14. defaultKeepAlive = 30
  15. )
  16. // ResponseType response type
  17. type ResponseType struct {
  18. SendTime uint8
  19. PublishType uint8
  20. DataType string
  21. }
  22. // Connection client connection
  23. type Connection struct {
  24. Mgr *Manager
  25. DeviceID string
  26. Conn net.Conn
  27. SendChan chan Message
  28. MessageID uint16
  29. MessageWaitChan map[uint16]chan error
  30. KeepAlive uint16
  31. LastHbTime int64
  32. Token []byte
  33. VendorId string
  34. closeChan chan struct{}
  35. }
  36. // NewConnection create a connection
  37. func NewConnection(conn net.Conn, mgr *Manager) *Connection {
  38. sendchan := make(chan Message, SendChanLen)
  39. c := &Connection{
  40. Conn: conn,
  41. SendChan: sendchan,
  42. Mgr: mgr,
  43. KeepAlive: defaultKeepAlive,
  44. MessageWaitChan: make(map[uint16]chan error),
  45. closeChan: make(chan struct{}),
  46. }
  47. go c.SendMsgToClient()
  48. go c.RcvMsgFromClient()
  49. return c
  50. }
  51. // Submit submit a message to send chan
  52. func (c *Connection) Submit(msg Message) {
  53. if c.Conn != nil {
  54. c.SendChan <- msg
  55. }
  56. }
  57. // Publish will publish a message , and return a chan to wait for completion.
  58. func (c *Connection) Publish(msg Message, timeout time.Duration) error {
  59. message := msg.(*Publish)
  60. message.MessageID = c.MessageID
  61. c.MessageID++
  62. c.Submit(message)
  63. server.Log.Debugf("publishing message : %v, timeout %v", msg, timeout)
  64. ch := make(chan error)
  65. // we don't wait for confirm.
  66. if timeout == 0 {
  67. return nil
  68. }
  69. c.MessageWaitChan[message.MessageID] = ch
  70. // wait for timeout and
  71. go func() {
  72. timer := time.NewTimer(timeout)
  73. <-timer.C
  74. waitCh, exist := c.MessageWaitChan[message.MessageID]
  75. if exist {
  76. waitCh <- errors.New("timeout pushlishing message")
  77. delete(c.MessageWaitChan, message.MessageID)
  78. close(waitCh)
  79. }
  80. }()
  81. err := <-ch
  82. return err
  83. }
  84. func (c *Connection) confirmPublish(MessageID uint16) {
  85. server.Log.Debugf("[confirmPublish]收到消息Id: %d", MessageID)
  86. waitCh, exist := c.MessageWaitChan[MessageID]
  87. if exist {
  88. waitCh <- nil
  89. delete(c.MessageWaitChan, MessageID)
  90. close(waitCh)
  91. }
  92. }
  93. // ValidateToken validate token
  94. func (c *Connection) ValidateToken(token []byte) error {
  95. err := c.Mgr.Provider.ValidateDeviceToken(c.DeviceID, token)
  96. if err != nil {
  97. return err
  98. }
  99. c.Token = token
  100. return nil
  101. }
  102. // Close close
  103. func (c *Connection) Close() {
  104. DeviceID := c.DeviceID
  105. server.Log.Infof("closing connection of device %v", DeviceID)
  106. if c.Conn != nil {
  107. c.Conn.Close()
  108. c.Conn = nil
  109. c.Mgr.Provider.OnDeviceOffline(DeviceID)
  110. }
  111. if c.SendChan != nil {
  112. close(c.SendChan)
  113. c.SendChan = nil
  114. close(c.closeChan)
  115. }
  116. }
  117. func (c *Connection) RcvMsgFromClient() {
  118. conn := c.Conn
  119. host := conn.RemoteAddr().String()
  120. server.Log.Infof("recieve new connection from %s", host)
  121. for {
  122. msg, err := DecodeOneMessage(conn)
  123. if err != nil {
  124. server.Log.Errorf("read error: %s", err)
  125. c.Close()
  126. return
  127. }
  128. c.LastHbTime = time.Now().Unix()
  129. switch msg := msg.(type) {
  130. case *Connect:
  131. ret := RetCodeAccepted
  132. if msg.ProtocolVersion == 3 && msg.ProtocolName != "MQIsdp" {
  133. ret = RetCodeUnacceptableProtocolVersion
  134. } else if msg.ProtocolVersion == 4 && msg.ProtocolName != "MQTT" {
  135. ret = RetCodeUnacceptableProtocolVersion
  136. } else if msg.ProtocolVersion > 4 {
  137. ret = RetCodeUnacceptableProtocolVersion
  138. }
  139. if len(msg.ClientID) < 1 || len(msg.ClientID) > 23 {
  140. server.Log.Errorf("invalid ClientID length: %d", len(msg.ClientID))
  141. ret = RetCodeIdentifierRejected
  142. c.Close()
  143. return
  144. }
  145. DeviceID, err := ClientIDToDeviceID(msg.ClientID)
  146. if err != nil {
  147. server.Log.Errorf("invalid Identify: %d", ret)
  148. c.Close()
  149. return
  150. }
  151. device := &models.Device{}
  152. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceById", DeviceID, device)
  153. if err != nil {
  154. server.Log.Errorf("device not found %d", ret, DeviceID)
  155. c.Close()
  156. return
  157. }
  158. c.DeviceID = device.RecordId
  159. c.VendorId = device.VendorID
  160. token, err := hex.DecodeString(msg.Password)
  161. if err != nil {
  162. server.Log.Errorf("token format error : %v", err)
  163. ret = RetCodeNotAuthorized
  164. c.Close()
  165. return
  166. }
  167. err = c.ValidateToken(token)
  168. if err != nil {
  169. server.Log.Errorf("validate token error : %v", err)
  170. ret = RetCodeNotAuthorized
  171. c.Close()
  172. return
  173. }
  174. if ret != RetCodeAccepted {
  175. server.Log.Errorf("invalid CON: %d", ret)
  176. c.Close()
  177. return
  178. }
  179. args := rpcs.ArgsGetOnline{
  180. Id: device.RecordId,
  181. ClientIP: host,
  182. AccessRPCHost: server.GetRPCHost(),
  183. HeartbeatInterval: uint32(c.KeepAlive),
  184. }
  185. c.Mgr.AddConn(c.DeviceID, c)
  186. connack := &ConnAck{
  187. ReturnCode: ret,
  188. }
  189. c.Submit(connack)
  190. c.KeepAlive = msg.KeepAliveTimer
  191. err = c.Mgr.Provider.OnDeviceOnline(args)
  192. if err != nil {
  193. server.Log.Errorf("device online error : %v", err)
  194. c.Close()
  195. return
  196. }
  197. server.Log.Infof("device %d, connected to server now, host: %s", c.DeviceID, host)
  198. case *Publish:
  199. server.Log.Infof("%s, publish topic: %s", host, msg.TopicName)
  200. _ = c.Mgr.PublishMessage2Server(c.DeviceID, c.VendorId, msg)
  201. if msg.QosLevel.IsAtLeastOnce() {
  202. server.Log.Infof("publish ack send now")
  203. publishack := &PubAck{MessageID: msg.MessageID}
  204. c.Submit(publishack)
  205. } else if msg.QosLevel.IsExactlyOnce() {
  206. server.Log.Infof("publish Rec send now")
  207. publishRec := &PubRec{MessageID: msg.MessageID}
  208. c.Submit(publishRec)
  209. }
  210. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  211. if err != nil {
  212. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  213. c.Close()
  214. return
  215. }
  216. case *PubAck:
  217. server.Log.Infof("%s, comes publish ack", host)
  218. c.confirmPublish(msg.MessageID)
  219. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  220. if err != nil {
  221. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  222. c.Close()
  223. return
  224. }
  225. case *PubRec:
  226. server.Log.Infof("%s, comes publish rec", host)
  227. publishRel := &PubRel{MessageID: msg.MessageID}
  228. c.Submit(publishRel)
  229. case *PubRel:
  230. server.Log.Infof("%s, comes publish rel", host)
  231. publishCom := &PubComp{MessageID: msg.MessageID}
  232. c.Submit(publishCom)
  233. case *PubComp:
  234. server.Log.Infof("%s, comes publish comp", host)
  235. c.confirmPublish(msg.MessageID)
  236. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  237. if err != nil {
  238. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  239. c.Close()
  240. return
  241. }
  242. case *PingReq:
  243. server.Log.Infof("%s, ping req comes", host)
  244. pingrsp := &PingResp{}
  245. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  246. if err != nil {
  247. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  248. c.Close()
  249. return
  250. }
  251. c.Submit(pingrsp)
  252. case *Subscribe:
  253. server.Log.Infof("%s, subscribe topic: %v", host, msg.Topics)
  254. case *Unsubscribe:
  255. server.Log.Infof("%s, unsubscribe topic: %v", host, msg.Topics)
  256. case *Disconnect:
  257. server.Log.Infof("%s, disconnect now, exit...", host)
  258. c.Close()
  259. return
  260. default:
  261. server.Log.Errorf("unknown msg type %T", msg)
  262. c.Close()
  263. return
  264. }
  265. }
  266. }
  267. func (c *Connection) SendMsgToClient() {
  268. if c.Conn == nil {
  269. return
  270. }
  271. host := c.Conn.RemoteAddr()
  272. for {
  273. select {
  274. case <-c.closeChan:
  275. return
  276. case msg, ok :=<-c.SendChan:
  277. if !ok {
  278. server.Log.Errorf("%s is end now", host)
  279. return
  280. }
  281. server.Log.Debugf("send msg to %s=======\n%v\n=========", host, msg)
  282. err := msg.Encode(c.Conn)
  283. if err != nil {
  284. server.Log.Errorf("send msg err: %s=====\n%v\n=====", err, msg)
  285. continue
  286. }
  287. }
  288. }
  289. }