connection.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. package mqtt
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "sparrow/pkg/rpcs"
  6. "sparrow/pkg/server"
  7. "net"
  8. "time"
  9. )
  10. const (
  11. SendChanLen = 16
  12. defaultKeepAlive = 30
  13. )
  14. type ResponseType struct {
  15. SendTime uint8
  16. PublishType uint8
  17. DataType string
  18. }
  19. type Connection struct {
  20. Mgr *Manager
  21. DeviceId uint64
  22. Conn net.Conn
  23. SendChan chan Message
  24. MessageId uint16
  25. MessageWaitChan map[uint16]chan error
  26. KeepAlive uint16
  27. LastHbTime int64
  28. Token []byte
  29. }
  30. func NewConnection(conn net.Conn, mgr *Manager) *Connection {
  31. sendchan := make(chan Message, SendChanLen)
  32. c := &Connection{
  33. Conn: conn,
  34. SendChan: sendchan,
  35. Mgr: mgr,
  36. KeepAlive: defaultKeepAlive,
  37. MessageWaitChan: make(map[uint16]chan error),
  38. }
  39. go c.SendMsgToClient()
  40. go c.RcvMsgFromClient()
  41. return c
  42. }
  43. func (c *Connection) Submit(msg Message) {
  44. if c.Conn != nil {
  45. c.SendChan <- msg
  46. }
  47. }
  48. // Publish will publish a message , and return a chan to wait for completion.
  49. func (c *Connection) Publish(msg Message, timeout time.Duration) error {
  50. server.Log.Debugf("publishing message : %v, timeout %v", msg, timeout)
  51. message := msg.(*Publish)
  52. message.MessageId = c.MessageId
  53. c.MessageId++
  54. c.Submit(message)
  55. ch := make(chan error)
  56. // we don't wait for confirm.
  57. if timeout == 0 {
  58. return nil
  59. }
  60. c.MessageWaitChan[message.MessageId] = ch
  61. // wait for timeout and
  62. go func() {
  63. timer := time.NewTimer(timeout)
  64. <-timer.C
  65. waitCh, exist := c.MessageWaitChan[message.MessageId]
  66. if exist {
  67. waitCh <- errors.New("timeout pushlishing message.")
  68. delete(c.MessageWaitChan, message.MessageId)
  69. close(waitCh)
  70. }
  71. }()
  72. err := <-ch
  73. return err
  74. }
  75. func (c *Connection) confirmPublish(messageid uint16) {
  76. waitCh, exist := c.MessageWaitChan[messageid]
  77. if exist {
  78. waitCh <- nil
  79. delete(c.MessageWaitChan, messageid)
  80. close(waitCh)
  81. }
  82. }
  83. func (c *Connection) ValidateToken(token []byte) error {
  84. err := c.Mgr.Provider.ValidateDeviceToken(c.DeviceId, token)
  85. if err != nil {
  86. return err
  87. }
  88. c.Token = token
  89. return nil
  90. }
  91. func (c *Connection) Close() {
  92. deviceid := c.DeviceId
  93. server.Log.Infof("closing connection of device %v", deviceid)
  94. if c.Conn != nil {
  95. c.Conn.Close()
  96. c.Conn = nil
  97. c.Mgr.Provider.OnDeviceOffline(deviceid)
  98. }
  99. if c.SendChan != nil {
  100. close(c.SendChan)
  101. c.SendChan = nil
  102. }
  103. }
  104. func (c *Connection) RcvMsgFromClient() {
  105. conn := c.Conn
  106. host := conn.RemoteAddr().String()
  107. server.Log.Infof("recieve new connection from %s", host)
  108. for {
  109. msg, err := DecodeOneMessage(conn)
  110. if err != nil {
  111. server.Log.Errorf("read error: %s", err)
  112. c.Close()
  113. return
  114. }
  115. server.Log.Infof("%s, come msg===\n%v\n=====", host, msg)
  116. c.LastHbTime = time.Now().Unix()
  117. switch msg := msg.(type) {
  118. case *Connect:
  119. ret := RetCodeAccepted
  120. if msg.ProtocolVersion == 3 && msg.ProtocolName != "MQIsdp" {
  121. ret = RetCodeUnacceptableProtocolVersion
  122. } else if msg.ProtocolVersion == 4 && msg.ProtocolName != "MQTT" {
  123. ret = RetCodeUnacceptableProtocolVersion
  124. } else if msg.ProtocolVersion > 4 {
  125. ret = RetCodeUnacceptableProtocolVersion
  126. }
  127. if len(msg.ClientId) < 1 || len(msg.ClientId) > 23 {
  128. server.Log.Warn("invalid clientid length: %d", len(msg.ClientId))
  129. ret = RetCodeIdentifierRejected
  130. c.Close()
  131. return
  132. }
  133. deviceid, err := ClientIdToDeviceId(msg.ClientId)
  134. if err != nil {
  135. server.Log.Warn("invalid Identify: %d", ret)
  136. c.Close()
  137. return
  138. }
  139. c.DeviceId = deviceid
  140. token, err := hex.DecodeString(msg.Password)
  141. if err != nil {
  142. server.Log.Warn("token format error : %v", err)
  143. ret = RetCodeNotAuthorized
  144. c.Close()
  145. return
  146. }
  147. err = c.ValidateToken(token)
  148. if err != nil {
  149. server.Log.Warn("validate token error : %v", err)
  150. ret = RetCodeNotAuthorized
  151. c.Close()
  152. return
  153. }
  154. if ret != RetCodeAccepted {
  155. server.Log.Warn("invalid CON: %d", ret)
  156. c.Close()
  157. return
  158. }
  159. args := rpcs.ArgsGetOnline{
  160. Id: c.DeviceId,
  161. ClientIP: host,
  162. AccessRPCHost: server.GetRPCHost(),
  163. HeartbeatInterval: uint32(c.KeepAlive),
  164. }
  165. c.Mgr.AddConn(c.DeviceId, c)
  166. connack := &ConnAck{
  167. ReturnCode: ret,
  168. }
  169. c.Submit(connack)
  170. c.KeepAlive = msg.KeepAliveTimer
  171. err = c.Mgr.Provider.OnDeviceOnline(args)
  172. if err != nil {
  173. server.Log.Warn("device online error : %v", err)
  174. c.Close()
  175. return
  176. }
  177. server.Log.Infof("device %d, connected to server now, host: %s", c.DeviceId, host)
  178. case *Publish:
  179. server.Log.Infof("%s, publish topic: %s", host, msg.TopicName)
  180. c.Mgr.PublishMessage2Server(c.DeviceId, msg)
  181. if msg.QosLevel.IsAtLeastOnce() {
  182. server.Log.Infof("publish ack send now")
  183. publishack := &PubAck{MessageId: msg.MessageId}
  184. c.Submit(publishack)
  185. } else if msg.QosLevel.IsExactlyOnce() {
  186. server.Log.Infof("publish Rec send now")
  187. publishRec := &PubRec{MessageId: msg.MessageId}
  188. c.Submit(publishRec)
  189. }
  190. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceId)
  191. if err != nil {
  192. server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
  193. c.Close()
  194. return
  195. }
  196. case *PubAck:
  197. server.Log.Infof("%s, comes publish ack", host)
  198. c.confirmPublish(msg.MessageId)
  199. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceId)
  200. if err != nil {
  201. server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
  202. c.Close()
  203. return
  204. }
  205. case *PubRec:
  206. server.Log.Infof("%s, comes publish rec", host)
  207. publishRel := &PubRel{MessageId: msg.MessageId}
  208. c.Submit(publishRel)
  209. case *PubRel:
  210. server.Log.Infof("%s, comes publish rel", host)
  211. publishCom := &PubComp{MessageId: msg.MessageId}
  212. c.Submit(publishCom)
  213. case *PubComp:
  214. server.Log.Infof("%s, comes publish comp", host)
  215. c.confirmPublish(msg.MessageId)
  216. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceId)
  217. if err != nil {
  218. server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
  219. c.Close()
  220. return
  221. }
  222. case *PingReq:
  223. server.Log.Infof("%s, ping req comes", host)
  224. pingrsp := &PingResp{}
  225. err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceId)
  226. if err != nil {
  227. server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
  228. c.Close()
  229. return
  230. }
  231. c.Submit(pingrsp)
  232. case *Subscribe:
  233. server.Log.Infof("%s, subscribe topic: %v", host, msg.Topics)
  234. case *Unsubscribe:
  235. server.Log.Infof("%s, unsubscribe topic: %v", host, msg.Topics)
  236. case *Disconnect:
  237. server.Log.Infof("%s, disconnect now, exit...", host)
  238. c.Close()
  239. return
  240. default:
  241. server.Log.Errorf("unknown msg type %T", msg)
  242. c.Close()
  243. return
  244. }
  245. }
  246. }
  247. func (c *Connection) SendMsgToClient() {
  248. host := c.Conn.RemoteAddr()
  249. for {
  250. msg, ok := <-c.SendChan
  251. if !ok {
  252. server.Log.Errorf("%s is end now", host)
  253. return
  254. }
  255. server.Log.Debugf("send msg to %s=======\n%v\n=========", host, msg)
  256. err := msg.Encode(c.Conn)
  257. if err != nil {
  258. server.Log.Errorf("send msg err: %s=====\n%v\n=====", err, msg)
  259. continue
  260. }
  261. }
  262. }