connection.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. package mqtt
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "net"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/server"
  8. "time"
  9. )
  10. // const def
  11. const (
  12. SendChanLen = 16
  13. defaultKeepAlive = 30
  14. )
  15. // ResponseType response type
  16. type ResponseType struct {
  17. SendTime uint8
  18. PublishType uint8
  19. DataType string
  20. }
  21. // Connection client connection
  22. type Connection struct {
  23. Mgr *Manager
  24. DeviceID uint64
  25. Conn net.Conn
  26. SendChan chan Message
  27. MessageID uint16
  28. MessageWaitChan map[uint16]chan error
  29. KeepAlive uint16
  30. LastHbTime int64
  31. Token []byte
  32. }
  33. // NewConnection create a connection
  34. func NewConnection(conn net.Conn, mgr *Manager) *Connection {
  35. sendchan := make(chan Message, SendChanLen)
  36. c := &Connection{
  37. Conn: conn,
  38. SendChan: sendchan,
  39. Mgr: mgr,
  40. KeepAlive: defaultKeepAlive,
  41. MessageWaitChan: make(map[uint16]chan error),
  42. }
  43. go c.SendMsgToClient()
  44. go c.RcvMsgFromClient()
  45. return c
  46. }
  47. // Submit submit a message to send chan
  48. func (c *Connection) Submit(msg Message) {
  49. if c.Conn != nil {
  50. c.SendChan <- msg
  51. }
  52. }
  53. // Publish will publish a message , and return a chan to wait for completion.
  54. func (c *Connection) Publish(msg Message, timeout time.Duration) error {
  55. server.Log.Debugf("publishing message : %v, timeout %v", msg, timeout)
  56. message := msg.(*Publish)
  57. message.MessageID = c.MessageID
  58. c.MessageID++
  59. c.Submit(message)
  60. ch := make(chan error)
  61. // we don't wait for confirm.
  62. if timeout == 0 {
  63. return nil
  64. }
  65. c.MessageWaitChan[message.MessageID] = ch
  66. // wait for timeout and
  67. go func() {
  68. timer := time.NewTimer(timeout)
  69. <-timer.C
  70. waitCh, exist := c.MessageWaitChan[message.MessageID]
  71. if exist {
  72. waitCh <- errors.New("timeout pushlishing message")
  73. delete(c.MessageWaitChan, message.MessageID)
  74. close(waitCh)
  75. }
  76. }()
  77. err := <-ch
  78. return err
  79. }
  80. func (c *Connection) confirmPublish(MessageID uint16) {
  81. waitCh, exist := c.MessageWaitChan[MessageID]
  82. if exist {
  83. waitCh <- nil
  84. delete(c.MessageWaitChan, MessageID)
  85. close(waitCh)
  86. }
  87. }
  88. // ValidateToken validate token
  89. func (c *Connection) ValidateToken(token []byte) error {
  90. err := c.Mgr.Provider.ValidateDeviceToken(c.DeviceID, token)
  91. if err != nil {
  92. return err
  93. }
  94. c.Token = token
  95. return nil
  96. }
  97. // Close close
  98. func (c *Connection) Close() {
  99. DeviceID := c.DeviceID
  100. server.Log.Infof("closing connection of device %v", DeviceID)
  101. if c.Conn != nil {
  102. c.Conn.Close()
  103. c.Conn = nil
  104. c.Mgr.Provider.OnDeviceOffline(DeviceID)
  105. }
  106. if c.SendChan != nil {
  107. close(c.SendChan)
  108. c.SendChan = nil
  109. }
  110. }
  111. func (c *Connection) RcvMsgFromClient() {
  112. conn := c.Conn
  113. host := conn.RemoteAddr().String()
  114. server.Log.Infof("recieve new connection from %s", host)
  115. for {
  116. msg, err := DecodeOneMessage(conn)
  117. if err != nil {
  118. server.Log.Errorf("read error: %s", err)
  119. c.Close()
  120. return
  121. }
  122. server.Log.Infof("%s, come msg===\n%v\n=====", host, msg)
  123. c.LastHbTime = time.Now().Unix()
  124. switch msg := msg.(type) {
  125. case *Connect:
  126. ret := RetCodeAccepted
  127. if msg.ProtocolVersion == 3 && msg.ProtocolName != "MQIsdp" {
  128. ret = RetCodeUnacceptableProtocolVersion
  129. } else if msg.ProtocolVersion == 4 && msg.ProtocolName != "MQTT" {
  130. ret = RetCodeUnacceptableProtocolVersion
  131. } else if msg.ProtocolVersion > 4 {
  132. ret = RetCodeUnacceptableProtocolVersion
  133. }
  134. if len(msg.ClientID) < 1 || len(msg.ClientID) > 23 {
  135. server.Log.Warn("invalid ClientID length: %d", len(msg.ClientID))
  136. ret = RetCodeIdentifierRejected
  137. c.Close()
  138. return
  139. }
  140. DeviceID, err := ClientIDToDeviceID(msg.ClientID)
  141. if err != nil {
  142. server.Log.Warn("invalid Identify: %d", ret)
  143. c.Close()
  144. return
  145. }
  146. c.DeviceID = DeviceID
  147. token, err := hex.DecodeString(msg.Password)
  148. if err != nil {
  149. server.Log.Warn("token format error : %v", err)
  150. ret = RetCodeNotAuthorized
  151. c.Close()
  152. return
  153. }
  154. err = c.ValidateToken(token)
  155. if err != nil {
  156. server.Log.Warn("validate token error : %v", err)
  157. ret = RetCodeNotAuthorized
  158. c.Close()
  159. return
  160. }
  161. if ret != RetCodeAccepted {
  162. server.Log.Warn("invalid CON: %d", ret)
  163. c.Close()
  164. return
  165. }
  166. args := rpcs.ArgsGetOnline{
  167. Id: c.DeviceID,
  168. ClientIP: host,
  169. AccessRPCHost: server.GetRPCHost(),
  170. HeartbeatInterval: uint32(c.KeepAlive),
  171. }
  172. c.Mgr.AddConn(c.DeviceID, c)
  173. connack := &ConnAck{
  174. ReturnCode: ret,
  175. }
  176. c.Submit(connack)
  177. c.KeepAlive = msg.KeepAliveTimer
  178. err = c.Mgr.Provider.OnDeviceOnline(args)
  179. if err != nil {
  180. server.Log.Warn("device online error : %v", err)
  181. c.Close()
  182. return
  183. }
  184. server.Log.Infof("device %d, connected to server now, host: %s", c.DeviceID, host)
  185. case *Publish:
  186. server.Log.Infof("%s, publish topic: %s", host, msg.TopicName)
  187. c.Mgr.PublishMessage2Server(c.DeviceID, msg)
  188. if msg.QosLevel.IsAtLeastOnce() {
  189. server.Log.Infof("publish ack send now")
  190. publishack := &PubAck{MessageID: msg.MessageID}
  191. c.Submit(publishack)
  192. } else if msg.QosLevel.IsExactlyOnce() {
  193. server.Log.Infof("publish Rec send now")
  194. publishRec := &PubRec{MessageID: msg.MessageID}
  195. c.Submit(publishRec)
  196. }
  197. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  198. if err != nil {
  199. server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
  200. c.Close()
  201. return
  202. }
  203. case *PubAck:
  204. server.Log.Infof("%s, comes publish ack", host)
  205. c.confirmPublish(msg.MessageID)
  206. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  207. if err != nil {
  208. server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
  209. c.Close()
  210. return
  211. }
  212. case *PubRec:
  213. server.Log.Infof("%s, comes publish rec", host)
  214. publishRel := &PubRel{MessageID: msg.MessageID}
  215. c.Submit(publishRel)
  216. case *PubRel:
  217. server.Log.Infof("%s, comes publish rel", host)
  218. publishCom := &PubComp{MessageID: msg.MessageID}
  219. c.Submit(publishCom)
  220. case *PubComp:
  221. server.Log.Infof("%s, comes publish comp", host)
  222. c.confirmPublish(msg.MessageID)
  223. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  224. if err != nil {
  225. server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
  226. c.Close()
  227. return
  228. }
  229. case *PingReq:
  230. server.Log.Infof("%s, ping req comes", host)
  231. pingrsp := &PingResp{}
  232. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
  233. if err != nil {
  234. server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
  235. c.Close()
  236. return
  237. }
  238. c.Submit(pingrsp)
  239. case *Subscribe:
  240. server.Log.Infof("%s, subscribe topic: %v", host, msg.Topics)
  241. case *Unsubscribe:
  242. server.Log.Infof("%s, unsubscribe topic: %v", host, msg.Topics)
  243. case *Disconnect:
  244. server.Log.Infof("%s, disconnect now, exit...", host)
  245. c.Close()
  246. return
  247. default:
  248. server.Log.Errorf("unknown msg type %T", msg)
  249. c.Close()
  250. return
  251. }
  252. }
  253. }
  254. func (c *Connection) SendMsgToClient() {
  255. host := c.Conn.RemoteAddr()
  256. for {
  257. msg, ok := <-c.SendChan
  258. if !ok {
  259. server.Log.Errorf("%s is end now", host)
  260. return
  261. }
  262. server.Log.Debugf("send msg to %s=======\n%v\n=========", host, msg)
  263. err := msg.Encode(c.Conn)
  264. if err != nil {
  265. server.Log.Errorf("send msg err: %s=====\n%v\n=====", err, msg)
  266. continue
  267. }
  268. }
  269. }