connection.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package mqtt
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "net"
  6. "sparrow/pkg/models"
  7. "sparrow/pkg/rpcs"
  8. "sparrow/pkg/server"
  9. "time"
  10. )
  11. // const def
  12. const (
  13. SendChanLen = 16
  14. defaultKeepAlive = 30
  15. )
  16. // ResponseType response type
  17. type ResponseType struct {
  18. SendTime uint8
  19. PublishType uint8
  20. DataType string
  21. }
  22. // Connection client connection
  23. type Connection struct {
  24. Mgr *Manager
  25. DeviceID string
  26. Conn net.Conn
  27. SendChan chan Message
  28. MessageID uint16
  29. MessageWaitChan map[uint16]chan error
  30. KeepAlive uint16
  31. LastHbTime int64
  32. Token []byte
  33. VendorId string
  34. }
  35. // NewConnection create a connection
  36. func NewConnection(conn net.Conn, mgr *Manager) *Connection {
  37. sendchan := make(chan Message, SendChanLen)
  38. c := &Connection{
  39. Conn: conn,
  40. SendChan: sendchan,
  41. Mgr: mgr,
  42. KeepAlive: defaultKeepAlive,
  43. MessageWaitChan: make(map[uint16]chan error),
  44. }
  45. go c.SendMsgToClient()
  46. go c.RcvMsgFromClient()
  47. return c
  48. }
  49. // Submit submit a message to send chan
  50. func (c *Connection) Submit(msg Message) {
  51. if c.Conn != nil {
  52. c.SendChan <- msg
  53. }
  54. }
  55. // Publish will publish a message , and return a chan to wait for completion.
  56. func (c *Connection) Publish(msg Message, timeout time.Duration) error {
  57. message := msg.(*Publish)
  58. message.MessageID = c.MessageID
  59. c.MessageID++
  60. c.Submit(message)
  61. server.Log.Debugf("publishing message : %v, timeout %v", msg, timeout)
  62. ch := make(chan error)
  63. // we don't wait for confirm.
  64. if timeout == 0 {
  65. return nil
  66. }
  67. c.MessageWaitChan[message.MessageID] = ch
  68. // wait for timeout and
  69. go func() {
  70. timer := time.NewTimer(timeout)
  71. <-timer.C
  72. waitCh, exist := c.MessageWaitChan[message.MessageID]
  73. if exist {
  74. waitCh <- errors.New("timeout pushlishing message")
  75. delete(c.MessageWaitChan, message.MessageID)
  76. close(waitCh)
  77. }
  78. }()
  79. err := <-ch
  80. return err
  81. }
  82. func (c *Connection) confirmPublish(MessageID uint16) {
  83. server.Log.Debugf("[confirmPublish]收到消息Id: %d", MessageID)
  84. waitCh, exist := c.MessageWaitChan[MessageID]
  85. if exist {
  86. waitCh <- nil
  87. delete(c.MessageWaitChan, MessageID)
  88. close(waitCh)
  89. }
  90. }
  91. // ValidateToken validate token
  92. func (c *Connection) ValidateToken(token []byte) error {
  93. err := c.Mgr.Provider.ValidateDeviceToken(c.DeviceID, token)
  94. if err != nil {
  95. return err
  96. }
  97. c.Token = token
  98. return nil
  99. }
  100. // Close close
  101. func (c *Connection) Close() {
  102. DeviceID := c.DeviceID
  103. server.Log.Infof("closing connection of device %v", DeviceID)
  104. if c.Conn != nil {
  105. c.Conn.Close()
  106. c.Conn = nil
  107. c.Mgr.Provider.OnDeviceOffline(DeviceID)
  108. }
  109. if c.SendChan != nil {
  110. close(c.SendChan)
  111. c.SendChan = nil
  112. }
  113. }
  114. func (c *Connection) RcvMsgFromClient() {
  115. conn := c.Conn
  116. host := conn.RemoteAddr().String()
  117. server.Log.Infof("recieve new connection from %s", host)
  118. for {
  119. msg, err := DecodeOneMessage(conn)
  120. if err != nil {
  121. server.Log.Errorf("read error: %s", err)
  122. c.Close()
  123. return
  124. }
  125. server.Log.Infof("%s, come msg===\n%v\n=====", host, msg)
  126. c.LastHbTime = time.Now().Unix()
  127. switch msg := msg.(type) {
  128. case *Connect:
  129. ret := RetCodeAccepted
  130. if msg.ProtocolVersion == 3 && msg.ProtocolName != "MQIsdp" {
  131. ret = RetCodeUnacceptableProtocolVersion
  132. } else if msg.ProtocolVersion == 4 && msg.ProtocolName != "MQTT" {
  133. ret = RetCodeUnacceptableProtocolVersion
  134. } else if msg.ProtocolVersion > 4 {
  135. ret = RetCodeUnacceptableProtocolVersion
  136. }
  137. if len(msg.ClientID) < 1 || len(msg.ClientID) > 23 {
  138. server.Log.Warn("invalid ClientID length: %d", len(msg.ClientID))
  139. ret = RetCodeIdentifierRejected
  140. c.Close()
  141. return
  142. }
  143. DeviceID, err := ClientIDToDeviceID(msg.ClientID)
  144. if err != nil {
  145. server.Log.Warn("invalid Identify: %d", ret)
  146. c.Close()
  147. return
  148. }
  149. device := &models.Device{}
  150. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceById", DeviceID, device)
  151. if err != nil {
  152. server.Log.Warn("device not found %d", ret, DeviceID)
  153. c.Close()
  154. return
  155. }
  156. c.DeviceID = device.RecordId
  157. c.VendorId = device.VendorID
  158. token, err := hex.DecodeString(msg.Password)
  159. if err != nil {
  160. server.Log.Warn("token format error : %v", err)
  161. ret = RetCodeNotAuthorized
  162. c.Close()
  163. return
  164. }
  165. err = c.ValidateToken(token)
  166. if err != nil {
  167. server.Log.Warn("validate token error : %v", err)
  168. ret = RetCodeNotAuthorized
  169. c.Close()
  170. return
  171. }
  172. if ret != RetCodeAccepted {
  173. server.Log.Warn("invalid CON: %d", ret)
  174. c.Close()
  175. return
  176. }
  177. args := rpcs.ArgsGetOnline{
  178. Id: device.RecordId,
  179. ClientIP: host,
  180. AccessRPCHost: server.GetRPCHost(),
  181. HeartbeatInterval: uint32(c.KeepAlive),
  182. }
  183. c.Mgr.AddConn(c.DeviceID, c)
  184. connack := &ConnAck{
  185. ReturnCode: ret,
  186. }
  187. c.Submit(connack)
  188. c.KeepAlive = msg.KeepAliveTimer
  189. err = c.Mgr.Provider.OnDeviceOnline(args)
  190. if err != nil {
  191. server.Log.Warn("device online error : %v", err)
  192. c.Close()
  193. return
  194. }
  195. server.Log.Infof("device %d, connected to server now, host: %s", c.DeviceID, host)
  196. case *Publish:
  197. server.Log.Infof("%s, publish topic: %s", host, msg.TopicName)
  198. _ = c.Mgr.PublishMessage2Server(c.DeviceID, c.VendorId, msg)
  199. if msg.QosLevel.IsAtLeastOnce() {
  200. server.Log.Infof("publish ack send now")
  201. publishack := &PubAck{MessageID: msg.MessageID}
  202. c.Submit(publishack)
  203. } else if msg.QosLevel.IsExactlyOnce() {
  204. server.Log.Infof("publish Rec send now")
  205. publishRec := &PubRec{MessageID: msg.MessageID}
  206. c.Submit(publishRec)
  207. }
  208. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  209. if err != nil {
  210. server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
  211. c.Close()
  212. return
  213. }
  214. case *PubAck:
  215. server.Log.Infof("%s, comes publish ack", host)
  216. c.confirmPublish(msg.MessageID)
  217. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  218. if err != nil {
  219. server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
  220. c.Close()
  221. return
  222. }
  223. case *PubRec:
  224. server.Log.Infof("%s, comes publish rec", host)
  225. publishRel := &PubRel{MessageID: msg.MessageID}
  226. c.Submit(publishRel)
  227. case *PubRel:
  228. server.Log.Infof("%s, comes publish rel", host)
  229. publishCom := &PubComp{MessageID: msg.MessageID}
  230. c.Submit(publishCom)
  231. case *PubComp:
  232. server.Log.Infof("%s, comes publish comp", host)
  233. c.confirmPublish(msg.MessageID)
  234. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  235. if err != nil {
  236. server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
  237. c.Close()
  238. return
  239. }
  240. case *PingReq:
  241. server.Log.Infof("%s, ping req comes", host)
  242. pingrsp := &PingResp{}
  243. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  244. if err != nil {
  245. server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
  246. c.Close()
  247. return
  248. }
  249. c.Submit(pingrsp)
  250. case *Subscribe:
  251. server.Log.Infof("%s, subscribe topic: %v", host, msg.Topics)
  252. case *Unsubscribe:
  253. server.Log.Infof("%s, unsubscribe topic: %v", host, msg.Topics)
  254. case *Disconnect:
  255. server.Log.Infof("%s, disconnect now, exit...", host)
  256. c.Close()
  257. return
  258. default:
  259. server.Log.Errorf("unknown msg type %T", msg)
  260. c.Close()
  261. return
  262. }
  263. }
  264. }
  265. func (c *Connection) SendMsgToClient() {
  266. host := c.Conn.RemoteAddr()
  267. for {
  268. msg, ok := <-c.SendChan
  269. if !ok {
  270. server.Log.Errorf("%s is end now", host)
  271. return
  272. }
  273. server.Log.Debugf("send msg to %s=======\n%v\n=========", host, msg)
  274. err := msg.Encode(c.Conn)
  275. if err != nil {
  276. server.Log.Errorf("send msg err: %s=====\n%v\n=====", err, msg)
  277. continue
  278. }
  279. }
  280. }