123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- package coap
- import (
- "net"
- "sparrow/pkg/server"
- "sync/atomic"
- "time"
- )
- const (
- // ResponseTimeout is the amount of time to wait for a
- // response.
- ResponseTimeout = time.Second * 2
- // ResponseRandomFactor is a multiplier for response backoff.
- ResponseRandomFactor = 1.5
- // MaxRetransmit is the maximum number of times a message will
- // be retransmitted.
- MaxRetransmit = 4
- maxPktlen = 1500
- maxWorkersCount = 10000
- idleWorkerTimeout = 10 * time.Second
- )
- type Manager struct {
- queue chan *Request
- Provider Provider
- workersCount int32
- }
- func NewManager(p Provider) *Manager {
- return &Manager{
- Provider: p,
- queue: make(chan *Request),
- }
- }
- func (m *Manager) Handler(conn *net.UDPConn) {
- buf := make([]byte, maxPktlen)
- for {
- nr, addr, err := conn.ReadFromUDP(buf)
- if err != nil {
- if neterr, ok := err.(net.Error); ok && (neterr.Temporary() || neterr.Timeout()) {
- time.Sleep(5 * time.Millisecond)
- continue
- }
- }
- tmp := make([]byte, nr)
- copy(tmp, buf)
- msg, err := ParseMessage(tmp)
- if err != nil {
- server.Log.Error(err)
- }
- m.spawnWorker(&Request{
- Msg: msg,
- Addr: addr,
- Conn: conn,
- })
- }
- }
- func (m *Manager) worker(w *Request) {
- m.serve(w)
- for {
- count := atomic.LoadInt32(&m.workersCount)
- if count > maxWorkersCount {
- return
- }
- if atomic.CompareAndSwapInt32(&m.workersCount, count, count+1) {
- break
- }
- }
- defer atomic.AddInt32(&m.workersCount, -1)
- inUse := false
- timeout := time.NewTimer(idleWorkerTimeout)
- defer timeout.Stop()
- for m.workerChannelHandler(inUse, timeout) {
- }
- }
- func (m *Manager) workerChannelHandler(inUse bool, timeout *time.Timer) bool {
- select {
- case w, ok := <-m.queue:
- if !ok {
- return false
- }
- inUse = true
- m.serve(w)
- case <-timeout.C:
- if !inUse {
- return false
- }
- inUse = false
- timeout.Reset(idleWorkerTimeout)
- }
- return true
- }
- func (m *Manager) serve(w *Request) {
- msg := w.Msg
- server.Log.Debugf("get packet:%#v", msg)
- // check token
- if msg.IsConfirmable() {
- token := msg.GetToken()
- if len(token) != 8 {
- res := &BaseMessage{
- Code: Unauthorized,
- Type: ACK,
- MessageID: msg.GetMessageID(),
- Token: msg.GetToken(),
- }
- bytes, _ := res.Encode()
- w.Conn.WriteTo(bytes, w.Addr)
- server.Log.Debugf("token length error, size :%d", len(token))
- return
- }
- }
- }
- func (m *Manager) spawnWorker(req *Request) {
- select {
- case m.queue <- req:
- default:
- go m.serve(req)
- }
- }
- // Receive a message.
- func Receive(l *net.UDPConn, buf []byte) (Message, error) {
- l.SetReadDeadline(time.Now().Add(ResponseTimeout))
- nr, _, err := l.ReadFromUDP(buf)
- if err != nil {
- return &BaseMessage{}, err
- }
- return ParseMessage(buf[:nr])
- }
|