123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- package coap
- import (
- "net"
- "sparrow/pkg/rpcs"
- "sparrow/pkg/server"
- "strconv"
- "sync/atomic"
- "time"
- )
- const (
- // ResponseTimeout is the amount of time to wait for a
- // response.
- ResponseTimeout = time.Second * 2
- // ResponseRandomFactor is a multiplier for response backoff.
- ResponseRandomFactor = 1.5
- // MaxRetransmit is the maximum number of times a message will
- // be retransmitted.
- MaxRetransmit = 4
- maxPktlen = 1500
- maxWorkersCount = 10000
- idleWorkerTimeout = 10 * time.Second
- pubStatusTopic = "s"
- pubEventTopic = "e"
- subCommandTopic = "c"
- )
- type Manager struct {
- queue chan *Request
- Provider Provider
- workersCount int32
- }
- func NewManager(p Provider) *Manager {
- return &Manager{
- Provider: p,
- queue: make(chan *Request),
- }
- }
- func (m *Manager) Handler(conn *net.UDPConn) {
- buf := make([]byte, maxPktlen)
- for {
- nr, addr, err := conn.ReadFromUDP(buf)
- if err != nil {
- if neterr, ok := err.(net.Error); ok && (neterr.Temporary() || neterr.Timeout()) {
- time.Sleep(5 * time.Millisecond)
- continue
- }
- }
- tmp := make([]byte, nr)
- copy(tmp, buf)
- msg, err := ParseMessage(tmp)
- if err != nil {
- server.Log.Error(err)
- }
- m.spawnWorker(&Request{
- Msg: msg,
- Addr: addr,
- Conn: conn,
- })
- }
- }
- func (m *Manager) worker(w *Request) {
- m.serve(w)
- for {
- count := atomic.LoadInt32(&m.workersCount)
- if count > maxWorkersCount {
- return
- }
- if atomic.CompareAndSwapInt32(&m.workersCount, count, count+1) {
- break
- }
- }
- defer atomic.AddInt32(&m.workersCount, -1)
- inUse := false
- timeout := time.NewTimer(idleWorkerTimeout)
- defer timeout.Stop()
- for m.workerChannelHandler(inUse, timeout) {
- }
- }
- func (m *Manager) workerChannelHandler(inUse bool, timeout *time.Timer) bool {
- select {
- case w, ok := <-m.queue:
- if !ok {
- return false
- }
- inUse = true
- m.serve(w)
- case <-timeout.C:
- if !inUse {
- return false
- }
- inUse = false
- timeout.Reset(idleWorkerTimeout)
- }
- return true
- }
- // coap://endpoint/$DEVICE_ID/s
- func (m *Manager) serve(w *Request) {
- msg := w.Msg
- server.Log.Debugf("get packet:%#v", msg)
- if msg.IsConfirmable() && len(msg.Path()) > 1 {
- token := msg.GetToken()
- // TODO:想别的deviceid的
- deviceid, err := strconv.ParseUint(msg.Path()[0], 10, 0)
- if err != nil {
- server.Log.Errorf("device id error:%s", msg.Path()[0])
- return
- }
- if len(token) != 8 {
- res := &BaseMessage{
- Code: Unauthorized,
- Type: ACK,
- MessageID: msg.GetMessageID(),
- Token: msg.GetToken(),
- }
- bytes, _ := res.Encode()
- w.Conn.WriteTo(bytes, w.Addr)
- server.Log.Errorf("token length error, size :%d", len(token))
- return
- }
- //check token
- err = m.Provider.ValidateDeviceToken(deviceid, token)
- if err != nil {
- res := &BaseMessage{
- Code: Unauthorized,
- Type: ACK,
- MessageID: msg.GetMessageID(),
- Token: msg.GetToken(),
- }
- bytes, _ := res.Encode()
- w.Conn.WriteTo(bytes, w.Addr)
- server.Log.Warnf("device %d token not validate, token :%v", deviceid, token)
- return
- }
- args := rpcs.ArgsGetOnline{
- Id: deviceid,
- ClientIP: w.Addr.String(),
- AccessRPCHost: server.GetRPCHost(),
- HeartbeatInterval: 30,
- }
- ack := &BaseMessage{
- Code: Changed,
- Type: ACK,
- MessageID: msg.GetMessageID(),
- Token: msg.GetToken(),
- }
- ackbytes, _ := ack.Encode()
- w.Conn.WriteTo(ackbytes, w.Addr)
- err = m.Provider.OnDeviceOnline(args)
- if err != nil {
- server.Log.Warnf("device online error :%v", err)
- return
- }
- server.Log.Infof("device %d, connected to server now host:%s", deviceid, w.Addr.String())
- topic := msg.Path()[1]
- switch topic {
- case pubStatusTopic, pubEventTopic, subCommandTopic:
- server.Log.Infof("%s, publish status", w.Addr.String())
- m.Provider.OnDeviceMessage(uint64(deviceid), topic, msg.GetPayload())
- err := m.Provider.OnDeviceHeartBeat(uint64(deviceid))
- if err != nil {
- server.Log.Warnf("heartbeat set error:%s", w.Addr.String())
- return
- }
- //pub ack
- ack := &BaseMessage{
- Code: Created,
- Type: ACK,
- MessageID: msg.GetMessageID(),
- Token: msg.GetToken(),
- }
- ackbytes, _ := ack.Encode()
- w.Conn.WriteTo(ackbytes, w.Addr)
- default:
- //无效主题
- server.Log.Errorf("unknown msg type:%s", topic)
- ack := &BaseMessage{
- Code: BadRequest,
- Type: ACK,
- MessageID: msg.GetMessageID(),
- Token: msg.GetToken(),
- }
- ackbytes, _ := ack.Encode()
- w.Conn.WriteTo(ackbytes, w.Addr)
- return
- }
- } else {
- //无效请求
- }
- }
- func (m *Manager) spawnWorker(req *Request) {
- select {
- case m.queue <- req:
- default:
- go m.serve(req)
- }
- }
- // Receive a message.
- func Receive(l *net.UDPConn, buf []byte) (Message, error) {
- l.SetReadDeadline(time.Now().Add(ResponseTimeout))
- nr, _, err := l.ReadFromUDP(buf)
- if err != nil {
- return &BaseMessage{}, err
- }
- return ParseMessage(buf[:nr])
- }
|