manager.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package mqtt
  2. import (
  3. "net"
  4. "sparrow/pkg/server"
  5. "sync"
  6. "time"
  7. )
  8. type Manager struct {
  9. Provider Provider
  10. CxtMutex sync.RWMutex
  11. IdToConn map[uint64]*Connection
  12. }
  13. func NewManager(p Provider) *Manager {
  14. m := &Manager{
  15. Provider: p,
  16. IdToConn: make(map[uint64]*Connection),
  17. }
  18. go m.CleanWorker()
  19. return m
  20. }
  21. func (m *Manager) NewConn(conn net.Conn) {
  22. NewConnection(conn, m)
  23. }
  24. func (m *Manager) AddConn(id uint64, c *Connection) {
  25. m.CxtMutex.Lock()
  26. oldSub, exist := m.IdToConn[id]
  27. if exist {
  28. oldSub.Close()
  29. }
  30. m.IdToConn[id] = c
  31. m.CxtMutex.Unlock()
  32. }
  33. func (m *Manager) DelConn(id uint64) {
  34. m.CxtMutex.Lock()
  35. _, exist := m.IdToConn[id]
  36. if exist {
  37. delete(m.IdToConn, id)
  38. }
  39. m.CxtMutex.Unlock()
  40. }
  41. func (m *Manager) GetToken(DeviceID uint64) ([]byte, error) {
  42. m.CxtMutex.RLock()
  43. con, exist := m.IdToConn[DeviceID]
  44. m.CxtMutex.RUnlock()
  45. if !exist {
  46. return nil, errorf("device not exist: %v[%v]", DeviceID, DeviceID)
  47. }
  48. return con.Token, nil
  49. }
  50. func (m *Manager) PublishMessage2Device(DeviceID uint64, msg *Publish, timeout time.Duration) error {
  51. m.CxtMutex.RLock()
  52. con, exist := m.IdToConn[DeviceID]
  53. m.CxtMutex.RUnlock()
  54. if !exist {
  55. return errorf("device not exist: %v", DeviceID)
  56. }
  57. return con.Publish(msg, timeout)
  58. }
  59. func (m *Manager) PublishMessage2Server(DeviceID uint64, msg *Publish) error {
  60. topic := msg.TopicName
  61. payload := msg.Payload.(BytesPayload)
  62. m.Provider.OnDeviceMessage(DeviceID, topic, payload)
  63. return nil
  64. }
  65. func (m *Manager) CleanWorker() {
  66. for {
  67. server.Log.Infoln("scanning and removing inactive connections...")
  68. curTime := time.Now().Unix()
  69. for _, con := range m.IdToConn {
  70. if con.KeepAlive == 0 {
  71. continue
  72. }
  73. if uint16(curTime-con.LastHbTime) > uint16(3*con.KeepAlive/2) {
  74. server.Log.Infof("connection %v inactive , removing", con)
  75. con.Close()
  76. delete(m.IdToConn, con.DeviceID)
  77. }
  78. }
  79. time.Sleep(60 * time.Second)
  80. }
  81. }