manager.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package coap
  2. import (
  3. "net"
  4. "sparrow/pkg/rpcs"
  5. "sparrow/pkg/server"
  6. "sync/atomic"
  7. "time"
  8. )
  9. const (
  10. // ResponseTimeout is the amount of time to wait for a
  11. // response.
  12. ResponseTimeout = time.Second * 2
  13. // ResponseRandomFactor is a multiplier for response backoff.
  14. ResponseRandomFactor = 1.5
  15. // MaxRetransmit is the maximum number of times a message will
  16. // be retransmitted.
  17. MaxRetransmit = 4
  18. maxPktlen = 1500
  19. maxWorkersCount = 10000
  20. idleWorkerTimeout = 10 * time.Second
  21. pubStatusTopic = "/s"
  22. pubEventTopic = "/e"
  23. subCommandTopic = "/c"
  24. )
  25. type Manager struct {
  26. queue chan *Request
  27. Provider Provider
  28. workersCount int32
  29. }
  30. func NewManager(p Provider) *Manager {
  31. return &Manager{
  32. Provider: p,
  33. queue: make(chan *Request),
  34. }
  35. }
  36. func (m *Manager) Handler(conn *net.UDPConn) {
  37. buf := make([]byte, maxPktlen)
  38. for {
  39. nr, addr, err := conn.ReadFromUDP(buf)
  40. if err != nil {
  41. if neterr, ok := err.(net.Error); ok && (neterr.Temporary() || neterr.Timeout()) {
  42. time.Sleep(5 * time.Millisecond)
  43. continue
  44. }
  45. }
  46. tmp := make([]byte, nr)
  47. copy(tmp, buf)
  48. msg, err := ParseMessage(tmp)
  49. if err != nil {
  50. server.Log.Error(err)
  51. }
  52. m.spawnWorker(&Request{
  53. Msg: msg,
  54. Addr: addr,
  55. Conn: conn,
  56. })
  57. }
  58. }
  59. func (m *Manager) worker(w *Request) {
  60. m.serve(w)
  61. for {
  62. count := atomic.LoadInt32(&m.workersCount)
  63. if count > maxWorkersCount {
  64. return
  65. }
  66. if atomic.CompareAndSwapInt32(&m.workersCount, count, count+1) {
  67. break
  68. }
  69. }
  70. defer atomic.AddInt32(&m.workersCount, -1)
  71. inUse := false
  72. timeout := time.NewTimer(idleWorkerTimeout)
  73. defer timeout.Stop()
  74. for m.workerChannelHandler(inUse, timeout) {
  75. }
  76. }
  77. func (m *Manager) workerChannelHandler(inUse bool, timeout *time.Timer) bool {
  78. select {
  79. case w, ok := <-m.queue:
  80. if !ok {
  81. return false
  82. }
  83. inUse = true
  84. m.serve(w)
  85. case <-timeout.C:
  86. if !inUse {
  87. return false
  88. }
  89. inUse = false
  90. timeout.Reset(idleWorkerTimeout)
  91. }
  92. return true
  93. }
  94. func (m *Manager) serve(w *Request) {
  95. msg := w.Msg
  96. server.Log.Debugf("get packet:%#v", msg)
  97. if msg.IsConfirmable() {
  98. token := msg.GetToken()
  99. // TODO:想别的deviceid的
  100. deviceid := msg.GetMessageID()
  101. if len(token) != 8 {
  102. res := &BaseMessage{
  103. Code: Unauthorized,
  104. Type: ACK,
  105. MessageID: msg.GetMessageID(),
  106. Token: msg.GetToken(),
  107. }
  108. bytes, _ := res.Encode()
  109. w.Conn.WriteTo(bytes, w.Addr)
  110. server.Log.Debugf("token length error, size :%d", len(token))
  111. return
  112. }
  113. //check token
  114. err := m.Provider.ValidateDeviceToken(uint64(deviceid), token)
  115. if err != nil {
  116. res := &BaseMessage{
  117. Code: Unauthorized,
  118. Type: ACK,
  119. MessageID: msg.GetMessageID(),
  120. Token: msg.GetToken(),
  121. }
  122. bytes, _ := res.Encode()
  123. w.Conn.WriteTo(bytes, w.Addr)
  124. server.Log.Warnf("device %d token not validate, token :%v", deviceid, token)
  125. return
  126. }
  127. args := rpcs.ArgsGetOnline{
  128. Id: uint64(deviceid),
  129. ClientIP: w.Addr.String(),
  130. AccessRPCHost: server.GetRPCHost(),
  131. HeartbeatInterval: 30,
  132. }
  133. ack := &BaseMessage{
  134. Code: Changed,
  135. Type: ACK,
  136. MessageID: msg.GetMessageID(),
  137. Token: msg.GetToken(),
  138. }
  139. ackbytes, _ := ack.Encode()
  140. w.Conn.WriteTo(ackbytes, w.Addr)
  141. err = m.Provider.OnDeviceOnline(args)
  142. if err != nil {
  143. server.Log.Warnf("device online error :%v", err)
  144. return
  145. }
  146. server.Log.Infof("device %d, connected to server now host:%s", deviceid, w.Addr.String())
  147. topic := msg.Path()[0]
  148. switch topic {
  149. case pubStatusTopic, pubEventTopic, subCommandTopic:
  150. server.Log.Infof("%s, publish status", w.Addr.String())
  151. m.Provider.OnDeviceMessage(uint64(deviceid), topic, msg.GetPayload())
  152. err := m.Provider.OnDeviceHeartBeat(uint64(deviceid))
  153. if err != nil {
  154. server.Log.Warnf("heartbeat set error:%s", w.Addr.String())
  155. return
  156. }
  157. //pub ack
  158. ack := &BaseMessage{
  159. Code: Created,
  160. Type: ACK,
  161. MessageID: msg.GetMessageID(),
  162. Token: msg.GetToken(),
  163. }
  164. ackbytes, _ := ack.Encode()
  165. w.Conn.WriteTo(ackbytes, w.Addr)
  166. default:
  167. //无效主题
  168. server.Log.Errorf("unknown msg type:%s", topic)
  169. ack := &BaseMessage{
  170. Code: BadRequest,
  171. Type: ACK,
  172. MessageID: msg.GetMessageID(),
  173. Token: msg.GetToken(),
  174. }
  175. ackbytes, _ := ack.Encode()
  176. w.Conn.WriteTo(ackbytes, w.Addr)
  177. return
  178. }
  179. }
  180. }
  181. func (m *Manager) spawnWorker(req *Request) {
  182. select {
  183. case m.queue <- req:
  184. default:
  185. go m.serve(req)
  186. }
  187. }
  188. // Receive a message.
  189. func Receive(l *net.UDPConn, buf []byte) (Message, error) {
  190. l.SetReadDeadline(time.Now().Add(ResponseTimeout))
  191. nr, _, err := l.ReadFromUDP(buf)
  192. if err != nil {
  193. return &BaseMessage{}, err
  194. }
  195. return ParseMessage(buf[:nr])
  196. }