connection.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. package mqtt
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "sparrow/pkg/models"
  8. "sparrow/pkg/rpcs"
  9. "sparrow/pkg/server"
  10. "sync"
  11. "time"
  12. )
  13. // const def
  14. const (
  15. SendChanLen = 16
  16. defaultKeepAlive = 30
  17. )
  18. // ResponseType response type
  19. type ResponseType struct {
  20. SendTime uint8
  21. PublishType uint8
  22. DataType string
  23. }
  24. // Connection client connection
  25. type Connection struct {
  26. Mgr *Manager
  27. DeviceID string
  28. Conn net.Conn
  29. SendChan chan Message
  30. MessageID uint16
  31. MessageWaitChan map[uint16]chan error
  32. KeepAlive uint16
  33. LastHbTime int64
  34. Token []byte
  35. VendorId string
  36. lock sync.Mutex
  37. }
  38. // NewConnection create a connection
  39. func NewConnection(conn net.Conn, mgr *Manager) *Connection {
  40. sendchan := make(chan Message, SendChanLen)
  41. c := &Connection{
  42. Conn: conn,
  43. SendChan: sendchan,
  44. Mgr: mgr,
  45. KeepAlive: defaultKeepAlive,
  46. MessageWaitChan: make(map[uint16]chan error),
  47. }
  48. go c.SendMsgToClient()
  49. go c.RcvMsgFromClient()
  50. return c
  51. }
  52. // Submit submit a message to send chan
  53. func (c *Connection) Submit(msg Message) {
  54. if c.Conn != nil {
  55. c.SendChan <- msg
  56. }
  57. }
  58. // Publish will publish a message , and return a chan to wait for completion.
  59. func (c *Connection) Publish(msg Message, timeout time.Duration) error {
  60. message := msg.(*Publish)
  61. message.MessageID = c.MessageID
  62. c.MessageID++
  63. c.Submit(message)
  64. server.Log.Debugf("publishing message : %v, timeout %v", msg, timeout)
  65. ch := make(chan error)
  66. // we don't wait for confirm.
  67. if timeout == 0 {
  68. return nil
  69. }
  70. c.MessageWaitChan[message.MessageID] = ch
  71. // wait for timeout and
  72. go func() {
  73. timer := time.NewTimer(timeout)
  74. <-timer.C
  75. waitCh, exist := c.MessageWaitChan[message.MessageID]
  76. if exist {
  77. waitCh <- errors.New("timeout pushlishing message")
  78. delete(c.MessageWaitChan, message.MessageID)
  79. close(waitCh)
  80. }
  81. }()
  82. err := <-ch
  83. return err
  84. }
  85. func (c *Connection) confirmPublish(MessageID uint16) {
  86. server.Log.Debugf("[confirmPublish]收到消息Id: %d", MessageID)
  87. waitCh, exist := c.MessageWaitChan[MessageID]
  88. if exist {
  89. waitCh <- nil
  90. delete(c.MessageWaitChan, MessageID)
  91. close(waitCh)
  92. }
  93. }
  94. // ValidateToken validate token
  95. func (c *Connection) ValidateToken(token []byte) error {
  96. err := c.Mgr.Provider.ValidateDeviceToken(c.DeviceID, token)
  97. if err != nil {
  98. return err
  99. }
  100. c.Token = token
  101. return nil
  102. }
  103. // Close close
  104. func (c *Connection) Close() {
  105. c.lock.Lock()
  106. defer c.lock.Unlock()
  107. DeviceID := c.DeviceID
  108. server.Log.Infof("closing connection of device %v", DeviceID)
  109. if c.Conn != nil {
  110. c.Conn.Close()
  111. c.Conn = nil
  112. c.Mgr.Provider.OnDeviceOffline(DeviceID)
  113. }
  114. if c.SendChan != nil {
  115. close(c.SendChan)
  116. c.SendChan = nil
  117. }
  118. }
  119. func (c *Connection) RcvMsgFromClient() {
  120. conn := c.Conn
  121. host := conn.RemoteAddr().String()
  122. server.Log.Infof("recieve new connection from %s", host)
  123. for {
  124. msg, err := DecodeOneMessage(conn)
  125. if err != nil {
  126. server.Log.Errorf("read error: %s", err)
  127. c.Close()
  128. return
  129. }
  130. c.LastHbTime = time.Now().Unix()
  131. switch msg := msg.(type) {
  132. case *Connect:
  133. ret := RetCodeAccepted
  134. if msg.ProtocolVersion == 3 && msg.ProtocolName != "MQIsdp" {
  135. ret = RetCodeUnacceptableProtocolVersion
  136. } else if msg.ProtocolVersion == 4 && msg.ProtocolName != "MQTT" {
  137. ret = RetCodeUnacceptableProtocolVersion
  138. } else if msg.ProtocolVersion > 4 {
  139. ret = RetCodeUnacceptableProtocolVersion
  140. }
  141. if len(msg.ClientID) < 1 || len(msg.ClientID) > 23 {
  142. server.Log.Errorf("invalid ClientID length: %d", len(msg.ClientID))
  143. ret = RetCodeIdentifierRejected
  144. c.Close()
  145. return
  146. }
  147. DeviceID, err := ClientIDToDeviceID(msg.ClientID)
  148. if err != nil {
  149. server.Log.Errorf("invalid Identify: %d", ret)
  150. c.Close()
  151. return
  152. }
  153. device := &models.Device{}
  154. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceById", DeviceID, device)
  155. if err != nil {
  156. server.Log.Errorf("device not found %d", ret, DeviceID)
  157. c.Close()
  158. return
  159. }
  160. c.DeviceID = device.RecordId
  161. c.VendorId = device.VendorID
  162. token, err := hex.DecodeString(msg.Password)
  163. if err != nil {
  164. server.Log.Errorf("token format error : %v", err)
  165. ret = RetCodeNotAuthorized
  166. c.Close()
  167. return
  168. }
  169. err = c.ValidateToken(token)
  170. if err != nil {
  171. server.Log.Errorf("validate token error : %v", err)
  172. ret = RetCodeNotAuthorized
  173. c.Close()
  174. return
  175. }
  176. if ret != RetCodeAccepted {
  177. server.Log.Errorf("invalid CON: %d", ret)
  178. c.Close()
  179. return
  180. }
  181. args := rpcs.ArgsGetOnline{
  182. Id: device.RecordId,
  183. ClientIP: host,
  184. AccessRPCHost: server.GetRPCHost(),
  185. HeartbeatInterval: uint32(c.KeepAlive),
  186. }
  187. c.Mgr.AddConn(c.DeviceID, c)
  188. connack := &ConnAck{
  189. ReturnCode: ret,
  190. }
  191. c.Submit(connack)
  192. c.KeepAlive = msg.KeepAliveTimer
  193. err = c.Mgr.Provider.OnDeviceOnline(args)
  194. if err != nil {
  195. server.Log.Errorf("device online error : %v", err)
  196. c.Close()
  197. return
  198. }
  199. server.Log.Infof("device %d, connected to server now, host: %s", c.DeviceID, host)
  200. case *Publish:
  201. server.Log.Infof("%s, publish topic: %s", host, msg.TopicName)
  202. _ = c.Mgr.PublishMessage2Server(c.DeviceID, c.VendorId, msg)
  203. if msg.QosLevel.IsAtLeastOnce() {
  204. server.Log.Infof("publish ack send now")
  205. publishack := &PubAck{MessageID: msg.MessageID}
  206. c.Submit(publishack)
  207. } else if msg.QosLevel.IsExactlyOnce() {
  208. server.Log.Infof("publish Rec send now")
  209. publishRec := &PubRec{MessageID: msg.MessageID}
  210. c.Submit(publishRec)
  211. }
  212. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  213. if err != nil {
  214. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  215. c.Close()
  216. return
  217. }
  218. case *PubAck:
  219. server.Log.Infof("%s, comes publish ack", host)
  220. c.confirmPublish(msg.MessageID)
  221. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  222. if err != nil {
  223. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  224. c.Close()
  225. return
  226. }
  227. case *PubRec:
  228. server.Log.Infof("%s, comes publish rec", host)
  229. publishRel := &PubRel{MessageID: msg.MessageID}
  230. c.Submit(publishRel)
  231. case *PubRel:
  232. server.Log.Infof("%s, comes publish rel", host)
  233. publishCom := &PubComp{MessageID: msg.MessageID}
  234. c.Submit(publishCom)
  235. case *PubComp:
  236. server.Log.Infof("%s, comes publish comp", host)
  237. c.confirmPublish(msg.MessageID)
  238. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  239. if err != nil {
  240. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  241. c.Close()
  242. return
  243. }
  244. case *PingReq:
  245. server.Log.Infof("%s, ping req comes", host)
  246. pingrsp := &PingResp{}
  247. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  248. if err != nil {
  249. server.Log.Errorf("%s, heartbeat set error %s, close now...", host, err)
  250. c.Close()
  251. return
  252. }
  253. c.Submit(pingrsp)
  254. case *Subscribe:
  255. server.Log.Infof("%s, subscribe topic: %v", host, msg.Topics)
  256. case *Unsubscribe:
  257. server.Log.Infof("%s, unsubscribe topic: %v", host, msg.Topics)
  258. case *Disconnect:
  259. server.Log.Infof("%s, disconnect now, exit...", host)
  260. c.Close()
  261. return
  262. default:
  263. server.Log.Errorf("unknown msg type %T", msg)
  264. c.Close()
  265. return
  266. }
  267. }
  268. }
  269. func (c *Connection) SendMsgToClient() {
  270. host := c.Conn.RemoteAddr()
  271. for {
  272. msg, ok := <-c.SendChan
  273. if !ok {
  274. server.Log.Errorf("%s is end now", host)
  275. return
  276. }
  277. c.lock.Lock()
  278. server.Log.Debugf("send msg to %s=======\n%v\n=========", host, msg)
  279. if c.Conn == nil {
  280. fmt.Printf("客户端实例:%v, 消息内容:%v\r\n", c.Conn, msg)
  281. }
  282. err := msg.Encode(c.Conn)
  283. if err != nil {
  284. server.Log.Errorf("send msg err: %s=====\n%v\n=====", err, msg)
  285. continue
  286. }
  287. c.lock.Unlock()
  288. }
  289. }