manager.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package coap
  2. import (
  3. "net"
  4. "sparrow/pkg/server"
  5. "sync/atomic"
  6. "time"
  7. )
  8. const (
  9. // ResponseTimeout is the amount of time to wait for a
  10. // response.
  11. ResponseTimeout = time.Second * 2
  12. // ResponseRandomFactor is a multiplier for response backoff.
  13. ResponseRandomFactor = 1.5
  14. // MaxRetransmit is the maximum number of times a message will
  15. // be retransmitted.
  16. MaxRetransmit = 4
  17. maxPktlen = 1500
  18. maxWorkersCount = 10000
  19. idleWorkerTimeout = 10 * time.Second
  20. )
  21. type Manager struct {
  22. queue chan *Request
  23. Provider Provider
  24. workersCount int32
  25. }
  26. func NewManager(p Provider) *Manager {
  27. return &Manager{
  28. Provider: p,
  29. queue: make(chan *Request),
  30. }
  31. }
  32. func (m *Manager) Handler(conn *net.UDPConn) {
  33. buf := make([]byte, maxPktlen)
  34. for {
  35. nr, addr, err := conn.ReadFromUDP(buf)
  36. if err != nil {
  37. if neterr, ok := err.(net.Error); ok && (neterr.Temporary() || neterr.Timeout()) {
  38. time.Sleep(5 * time.Millisecond)
  39. continue
  40. }
  41. }
  42. tmp := make([]byte, nr)
  43. copy(tmp, buf)
  44. msg, err := ParseMessage(tmp)
  45. if err != nil {
  46. server.Log.Error(err)
  47. }
  48. m.spawnWorker(&Request{
  49. Msg: msg,
  50. Addr: addr,
  51. Conn: conn,
  52. })
  53. }
  54. }
  55. func (m *Manager) worker(w *Request) {
  56. m.serve(w)
  57. for {
  58. count := atomic.LoadInt32(&m.workersCount)
  59. if count > maxWorkersCount {
  60. return
  61. }
  62. if atomic.CompareAndSwapInt32(&m.workersCount, count, count+1) {
  63. break
  64. }
  65. }
  66. defer atomic.AddInt32(&m.workersCount, -1)
  67. inUse := false
  68. timeout := time.NewTimer(idleWorkerTimeout)
  69. defer timeout.Stop()
  70. for m.workerChannelHandler(inUse, timeout) {
  71. }
  72. }
  73. func (m *Manager) workerChannelHandler(inUse bool, timeout *time.Timer) bool {
  74. select {
  75. case w, ok := <-m.queue:
  76. if !ok {
  77. return false
  78. }
  79. inUse = true
  80. m.serve(w)
  81. case <-timeout.C:
  82. if !inUse {
  83. return false
  84. }
  85. inUse = false
  86. timeout.Reset(idleWorkerTimeout)
  87. }
  88. return true
  89. }
  90. func (m *Manager) serve(w *Request) {
  91. msg := w.Msg
  92. server.Log.Debugf("get packet:%#v", msg)
  93. // check token
  94. if msg.IsConfirmable() {
  95. token := msg.GetToken()
  96. if len(token) != 8 {
  97. res := &BaseMessage{
  98. Code: Unauthorized,
  99. Type: ACK,
  100. MessageID: msg.GetMessageID(),
  101. Token: msg.GetToken(),
  102. }
  103. bytes, _ := res.Encode()
  104. w.Conn.WriteTo(bytes, w.Addr)
  105. server.Log.Debugf("token length error, size :%d", len(token))
  106. return
  107. }
  108. }
  109. }
  110. func (m *Manager) spawnWorker(req *Request) {
  111. select {
  112. case m.queue <- req:
  113. default:
  114. go m.serve(req)
  115. }
  116. }
  117. // Receive a message.
  118. func Receive(l *net.UDPConn, buf []byte) (Message, error) {
  119. l.SetReadDeadline(time.Now().Add(ResponseTimeout))
  120. nr, _, err := l.ReadFromUDP(buf)
  121. if err != nil {
  122. return &BaseMessage{}, err
  123. }
  124. return ParseMessage(buf[:nr])
  125. }