123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- package mqtt
- import (
- "net"
- "sparrow/pkg/server"
- "sync"
- "time"
- )
- type Manager struct {
- Provider Provider
- CxtMutex sync.RWMutex
- IdToConn map[uint64]*Connection
- }
- func NewManager(p Provider) *Manager {
- m := &Manager{
- Provider: p,
- IdToConn: make(map[uint64]*Connection),
- }
- go m.CleanWorker()
- return m
- }
- func (m *Manager) NewConn(conn net.Conn) {
- NewConnection(conn, m)
- }
- func (m *Manager) AddConn(id uint64, c *Connection) {
- m.CxtMutex.Lock()
- oldSub, exist := m.IdToConn[id]
- if exist {
- oldSub.Close()
- }
- m.IdToConn[id] = c
- m.CxtMutex.Unlock()
- }
- func (m *Manager) DelConn(id uint64) {
- m.CxtMutex.Lock()
- _, exist := m.IdToConn[id]
- if exist {
- delete(m.IdToConn, id)
- }
- m.CxtMutex.Unlock()
- }
- func (m *Manager) GetToken(DeviceID uint64) ([]byte, error) {
- m.CxtMutex.RLock()
- con, exist := m.IdToConn[DeviceID]
- m.CxtMutex.RUnlock()
- if !exist {
- return nil, errorf("device not exist: %v[%v]", DeviceID, DeviceID)
- }
- return con.Token, nil
- }
- func (m *Manager) PublishMessage2Device(DeviceID uint64, msg *Publish, timeout time.Duration) error {
- m.CxtMutex.RLock()
- con, exist := m.IdToConn[DeviceID]
- m.CxtMutex.RUnlock()
- if !exist {
- return errorf("device not exist: %v", DeviceID)
- }
- return con.Publish(msg, timeout)
- }
- func (m *Manager) PublishMessage2Server(DeviceID uint64, msg *Publish) error {
- topic := msg.TopicName
- payload := msg.Payload.(BytesPayload)
- m.Provider.OnDeviceMessage(DeviceID, topic, payload)
- return nil
- }
- func (m *Manager) CleanWorker() {
- for {
- server.Log.Infoln("scanning and removing inactive connections...")
- curTime := time.Now().Unix()
- for _, con := range m.IdToConn {
- if con.KeepAlive == 0 {
- continue
- }
- if uint16(curTime-con.LastHbTime) > uint16(3*con.KeepAlive/2) {
- server.Log.Infof("connection %v inactive , removing", con)
- con.Close()
- delete(m.IdToConn, con.DeviceID)
- }
- }
- time.Sleep(60 * time.Second)
- }
- }
|