manager.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package mqtt
  2. import (
  3. "github.com/gogf/gf/encoding/gjson"
  4. "net"
  5. "sparrow/pkg/server"
  6. "sync"
  7. "time"
  8. )
  9. type Manager struct {
  10. Provider Provider
  11. CxtMutex sync.RWMutex
  12. IdToConn map[string]*Connection
  13. }
  14. func NewManager(p Provider) *Manager {
  15. m := &Manager{
  16. Provider: p,
  17. IdToConn: make(map[string]*Connection),
  18. }
  19. go m.CleanWorker()
  20. return m
  21. }
  22. func (m *Manager) NewConn(conn net.Conn) {
  23. NewConnection(conn, m)
  24. }
  25. func (m *Manager) AddConn(id string, c *Connection) {
  26. m.CxtMutex.Lock()
  27. oldSub, exist := m.IdToConn[id]
  28. if exist {
  29. oldSub.Close()
  30. }
  31. m.IdToConn[id] = c
  32. m.CxtMutex.Unlock()
  33. }
  34. func (m *Manager) DelConn(id string) {
  35. m.CxtMutex.Lock()
  36. _, exist := m.IdToConn[id]
  37. if exist {
  38. delete(m.IdToConn, id)
  39. }
  40. m.CxtMutex.Unlock()
  41. }
  42. func (m *Manager) GetToken(DeviceID string) ([]byte, error) {
  43. m.CxtMutex.RLock()
  44. con, exist := m.IdToConn[DeviceID]
  45. m.CxtMutex.RUnlock()
  46. if !exist {
  47. return nil, errorf("device not exist: %v[%v]", DeviceID, DeviceID)
  48. }
  49. return con.Token, nil
  50. }
  51. func (m *Manager) PublishMessage2Device(DeviceID string, msg *Publish, timeout time.Duration) error {
  52. m.CxtMutex.RLock()
  53. con, exist := m.IdToConn[DeviceID]
  54. m.CxtMutex.RUnlock()
  55. if !exist {
  56. return errorf("device not exist: %v", DeviceID)
  57. }
  58. return con.Publish(msg, timeout)
  59. }
  60. func (m *Manager) PublishMessage2Server(DeviceID string, vendorId string, msg *Publish) error {
  61. topic := msg.TopicName
  62. payload := msg.Payload.(BytesPayload)
  63. if j, err := gjson.DecodeToJson(payload); err == nil {
  64. m.Provider.OnDeviceMessage(DeviceID, vendorId, topic, j)
  65. } else {
  66. return err
  67. }
  68. return nil
  69. }
  70. func (m *Manager) CleanWorker() {
  71. for {
  72. server.Log.Infoln("scanning and removing inactive connections...")
  73. curTime := time.Now().Unix()
  74. for _, con := range m.IdToConn {
  75. if con.KeepAlive == 0 {
  76. continue
  77. }
  78. if uint16(curTime-con.LastHbTime) > 3*con.KeepAlive/2 {
  79. server.Log.Infof("connection %v inactive , removing", con)
  80. con.Close()
  81. delete(m.IdToConn, con.DeviceID)
  82. }
  83. }
  84. time.Sleep(60 * time.Second)
  85. }
  86. }