package coap import ( "net" "sparrow/pkg/rpcs" "sparrow/pkg/server" "sync/atomic" "time" ) const ( // ResponseTimeout is the amount of time to wait for a // response. ResponseTimeout = time.Second * 2 // ResponseRandomFactor is a multiplier for response backoff. ResponseRandomFactor = 1.5 // MaxRetransmit is the maximum number of times a message will // be retransmitted. MaxRetransmit = 4 maxPktlen = 1500 maxWorkersCount = 10000 idleWorkerTimeout = 10 * time.Second pubStatusTopic = "s" pubEventTopic = "e" subCommandTopic = "c" ) // Manager manager type Manager struct { queue chan *Request Provider Provider workersCount int32 } // NewManager new manager func NewManager(p Provider) *Manager { return &Manager{ Provider: p, queue: make(chan *Request), } } // Handler udp handler func (m *Manager) Handler(conn *net.UDPConn) { buf := make([]byte, maxPktlen) for { nr, addr, err := conn.ReadFromUDP(buf) if err != nil { if neterr, ok := err.(net.Error); ok && (neterr.Temporary() || neterr.Timeout()) { time.Sleep(5 * time.Millisecond) continue } } tmp := make([]byte, nr) copy(tmp, buf) msg, err := ParseMessage(tmp) if err != nil { server.Log.Error(err) } m.spawnWorker(&Request{ Msg: msg, Addr: addr, Conn: conn, }) } } func (m *Manager) worker(w *Request) { m.serve(w) for { count := atomic.LoadInt32(&m.workersCount) if count > maxWorkersCount { return } if atomic.CompareAndSwapInt32(&m.workersCount, count, count+1) { break } } defer atomic.AddInt32(&m.workersCount, -1) inUse := false timeout := time.NewTimer(idleWorkerTimeout) defer timeout.Stop() for m.workerChannelHandler(inUse, timeout) { } } func (m *Manager) workerChannelHandler(inUse bool, timeout *time.Timer) bool { select { case w, ok := <-m.queue: if !ok { return false } inUse = true m.serve(w) case <-timeout.C: if !inUse { return false } inUse = false timeout.Reset(idleWorkerTimeout) } return true } // coap://endpoint/$DEVICE_ID/s func (m *Manager) serve(w *Request) { msg := w.Msg server.Log.Debugf("get packet:%#v, workers count :%d", msg, m.workersCount) if msg.IsConfirmable() && len(msg.Path()) > 1 { token := msg.GetToken() // TODO:想别的deviceid的 var err error deviceid :="" if err != nil { server.Log.Errorf("device id error:%s", msg.Path()[0]) return } if len(token) != 8 { res := &BaseMessage{ Code: Unauthorized, Type: ACK, MessageID: msg.GetMessageID(), Token: msg.GetToken(), } bytes, _ := res.Encode() w.Conn.WriteTo(bytes, w.Addr) server.Log.Errorf("token length error, size :%d", len(token)) return } //check token err = m.Provider.ValidateDeviceToken(deviceid, token) if err != nil { res := &BaseMessage{ Code: Unauthorized, Type: ACK, MessageID: msg.GetMessageID(), Token: msg.GetToken(), } bytes, _ := res.Encode() w.Conn.WriteTo(bytes, w.Addr) server.Log.Warnf("device %d token not validate, token :%v", deviceid, token) return } args := rpcs.ArgsGetOnline{ Id: deviceid, ClientIP: w.Addr.String(), AccessRPCHost: server.GetRPCHost(), HeartbeatInterval: 30, } ack := &BaseMessage{ Code: Changed, Type: ACK, MessageID: msg.GetMessageID(), Token: msg.GetToken(), } ackbytes, _ := ack.Encode() w.Conn.WriteTo(ackbytes, w.Addr) err = m.Provider.OnDeviceOnline(args) if err != nil { server.Log.Warnf("device online error :%v", err) return } server.Log.Infof("device %d, connected to server now host:%s", deviceid, w.Addr.String()) topic := msg.Path()[1] switch topic { case pubStatusTopic, pubEventTopic, subCommandTopic: server.Log.Infof("%s, publish status", w.Addr.String()) m.Provider.OnDeviceMessage(deviceid, topic, msg.GetPayload()) err := m.Provider.OnDeviceHeartBeat(deviceid) if err != nil { server.Log.Warnf("heartbeat set error:%s", w.Addr.String()) return } //pub ack ack := &BaseMessage{ Code: Created, Type: ACK, MessageID: msg.GetMessageID(), Token: msg.GetToken(), } ackbytes, _ := ack.Encode() w.Conn.WriteTo(ackbytes, w.Addr) default: //无效主题 server.Log.Errorf("unknown msg type:%s", topic) ack := &BaseMessage{ Code: BadRequest, Type: ACK, MessageID: msg.GetMessageID(), Token: msg.GetToken(), } ackbytes, _ := ack.Encode() w.Conn.WriteTo(ackbytes, w.Addr) return } } else { //TODO:无效请求 } } func (m *Manager) spawnWorker(req *Request) { select { case m.queue <- req: default: go m.serve(req) } } // Receive a message. func Receive(l *net.UDPConn, buf []byte) (Message, error) { l.SetReadDeadline(time.Now().Add(ResponseTimeout)) nr, _, err := l.ReadFromUDP(buf) if err != nil { return &BaseMessage{}, err } return ParseMessage(buf[:nr]) }