package mqtt import ( "net" "sparrow/pkg/server" "sync" "time" ) type Manager struct { Provider Provider CxtMutex sync.RWMutex IdToConn map[string]*Connection } func NewManager(p Provider) *Manager { m := &Manager{ Provider: p, IdToConn: make(map[string]*Connection), } go m.CleanWorker() return m } func (m *Manager) NewConn(conn net.Conn) { NewConnection(conn, m) } func (m *Manager) AddConn(id string, c *Connection) { m.CxtMutex.Lock() oldSub, exist := m.IdToConn[id] if exist { oldSub.Close() } m.IdToConn[id] = c m.CxtMutex.Unlock() } func (m *Manager) DelConn(id string) { m.CxtMutex.Lock() _, exist := m.IdToConn[id] if exist { delete(m.IdToConn, id) } m.CxtMutex.Unlock() } func (m *Manager) GetToken(DeviceID string) ([]byte, error) { m.CxtMutex.RLock() con, exist := m.IdToConn[DeviceID] m.CxtMutex.RUnlock() if !exist { return nil, errorf("device not exist: %v[%v]", DeviceID, DeviceID) } return con.Token, nil } func (m *Manager) PublishMessage2Device(DeviceID string, msg *Publish, timeout time.Duration) error { m.CxtMutex.RLock() con, exist := m.IdToConn[DeviceID] m.CxtMutex.RUnlock() if !exist { return errorf("device not exist: %v", DeviceID) } return con.Publish(msg, timeout) } func (m *Manager) PublishMessage2Server(DeviceID string, vendorId string, msg *Publish) error { topic := msg.TopicName payload := msg.Payload.(BytesPayload) m.Provider.OnDeviceMessage(DeviceID, vendorId, topic, payload) return nil } func (m *Manager) CleanWorker() { for { server.Log.Infoln("scanning and removing inactive connections...") curTime := time.Now().Unix() for _, con := range m.IdToConn { if con.KeepAlive == 0 { continue } if uint16(curTime-con.LastHbTime) > uint16(3*con.KeepAlive/2) { server.Log.Infof("connection %v inactive , removing", con) con.Close() delete(m.IdToConn, con.DeviceID) } } time.Sleep(60 * time.Second) } }