manager.go 4.9 KB


  1. package coap
  2. import (
  3. "net"
  4. "sparrow/pkg/rpcs"
  5. "sparrow/pkg/server"
  6. "sync/atomic"
  7. "time"
  8. )
  9. const (
  10. // ResponseTimeout is the amount of time to wait for a
  11. // response.
  12. ResponseTimeout = time.Second * 2
  13. // ResponseRandomFactor is a multiplier for response backoff.
  14. ResponseRandomFactor = 1.5
  15. // MaxRetransmit is the maximum number of times a message will
  16. // be retransmitted.
  17. MaxRetransmit = 4
  18. maxPktlen = 1500
  19. maxWorkersCount = 10000
  20. idleWorkerTimeout = 10 * time.Second
  21. pubStatusTopic = "s"
  22. pubEventTopic = "e"
  23. subCommandTopic = "c"
  24. )
  25. // Manager manager
  26. type Manager struct {
  27. queue chan *Request
  28. Provider Provider
  29. workersCount int32
  30. }
  31. // NewManager new manager
  32. func NewManager(p Provider) *Manager {
  33. return &Manager{
  34. Provider: p,
  35. queue: make(chan *Request),
  36. }
  37. }
  38. // Handler udp handler
  39. func (m *Manager) Handler(conn *net.UDPConn) {
  40. buf := make([]byte, maxPktlen)
  41. for {
  42. nr, addr, err := conn.ReadFromUDP(buf)
  43. if err != nil {
  44. if neterr, ok := err.(net.Error); ok && (neterr.Temporary() || neterr.Timeout()) {
  45. time.Sleep(5 * time.Millisecond)
  46. continue
  47. }
  48. }
  49. tmp := make([]byte, nr)
  50. copy(tmp, buf)
  51. msg, err := ParseMessage(tmp)
  52. if err != nil {
  53. server.Log.Error(err)
  54. }
  55. m.spawnWorker(&Request{
  56. Msg: msg,
  57. Addr: addr,
  58. Conn: conn,
  59. })
  60. }
  61. }
  62. func (m *Manager) worker(w *Request) {
  63. m.serve(w)
  64. for {
  65. count := atomic.LoadInt32(&m.workersCount)
  66. if count > maxWorkersCount {
  67. return
  68. }
  69. if atomic.CompareAndSwapInt32(&m.workersCount, count, count+1) {
  70. break
  71. }
  72. }
  73. defer atomic.AddInt32(&m.workersCount, -1)
  74. inUse := false
  75. timeout := time.NewTimer(idleWorkerTimeout)
  76. defer timeout.Stop()
  77. for m.workerChannelHandler(inUse, timeout) {
  78. }
  79. }
  80. func (m *Manager) workerChannelHandler(inUse bool, timeout *time.Timer) bool {
  81. select {
  82. case w, ok := <-m.queue:
  83. if !ok {
  84. return false
  85. }
  86. inUse = true
  87. m.serve(w)
  88. case <-timeout.C:
  89. if !inUse {
  90. return false
  91. }
  92. inUse = false
  93. timeout.Reset(idleWorkerTimeout)
  94. }
  95. return true
  96. }
  97. // coap://endpoint/$DEVICE_ID/s
  98. func (m *Manager) serve(w *Request) {
  99. msg := w.Msg
  100. server.Log.Debugf("get packet:%#v, workers count :%d", msg, m.workersCount)
  101. if msg.IsConfirmable() && len(msg.Path()) > 1 {
  102. token := msg.GetToken()
  103. // TODO:想别的deviceid的
  104. var err error
  105. deviceid :=""
  106. if err != nil {
  107. server.Log.Errorf("device id error:%s", msg.Path()[0])
  108. return
  109. }
  110. if len(token) != 8 {
  111. res := &BaseMessage{
  112. Code: Unauthorized,
  113. Type: ACK,
  114. MessageID: msg.GetMessageID(),
  115. Token: msg.GetToken(),
  116. }
  117. bytes, _ := res.Encode()
  118. w.Conn.WriteTo(bytes, w.Addr)
  119. server.Log.Errorf("token length error, size :%d", len(token))
  120. return
  121. }
  122. //check token
  123. err = m.Provider.ValidateDeviceToken(deviceid, token)
  124. if err != nil {
  125. res := &BaseMessage{
  126. Code: Unauthorized,
  127. Type: ACK,
  128. MessageID: msg.GetMessageID(),
  129. Token: msg.GetToken(),
  130. }
  131. bytes, _ := res.Encode()
  132. w.Conn.WriteTo(bytes, w.Addr)
  133. server.Log.Warnf("device %d token not validate, token :%v", deviceid, token)
  134. return
  135. }
  136. args := rpcs.ArgsGetOnline{
  137. Id: deviceid,
  138. ClientIP: w.Addr.String(),
  139. AccessRPCHost: server.GetRPCHost(),
  140. HeartbeatInterval: 30,
  141. }
  142. ack := &BaseMessage{
  143. Code: Changed,
  144. Type: ACK,
  145. MessageID: msg.GetMessageID(),
  146. Token: msg.GetToken(),
  147. }
  148. ackbytes, _ := ack.Encode()
  149. w.Conn.WriteTo(ackbytes, w.Addr)
  150. err = m.Provider.OnDeviceOnline(args)
  151. if err != nil {
  152. server.Log.Warnf("device online error :%v", err)
  153. return
  154. }
  155. server.Log.Infof("device %d, connected to server now host:%s", deviceid, w.Addr.String())
  156. topic := msg.Path()[1]
  157. switch topic {
  158. case pubStatusTopic, pubEventTopic, subCommandTopic:
  159. server.Log.Infof("%s, publish status", w.Addr.String())
  160. m.Provider.OnDeviceMessage(deviceid, topic, msg.GetPayload())
  161. err := m.Provider.OnDeviceHeartBeat(deviceid)
  162. if err != nil {
  163. server.Log.Warnf("heartbeat set error:%s", w.Addr.String())
  164. return
  165. }
  166. //pub ack
  167. ack := &BaseMessage{
  168. Code: Created,
  169. Type: ACK,
  170. MessageID: msg.GetMessageID(),
  171. Token: msg.GetToken(),
  172. }
  173. ackbytes, _ := ack.Encode()
  174. w.Conn.WriteTo(ackbytes, w.Addr)
  175. default:
  176. //无效主题
  177. server.Log.Errorf("unknown msg type:%s", topic)
  178. ack := &BaseMessage{
  179. Code: BadRequest,
  180. Type: ACK,
  181. MessageID: msg.GetMessageID(),
  182. Token: msg.GetToken(),
  183. }
  184. ackbytes, _ := ack.Encode()
  185. w.Conn.WriteTo(ackbytes, w.Addr)
  186. return
  187. }
  188. } else {
  189. //TODO:无效请求
  190. }
  191. }
  192. func (m *Manager) spawnWorker(req *Request) {
  193. select {
  194. case m.queue <- req:
  195. default:
  196. go m.serve(req)
  197. }
  198. }
  199. // Receive a message.
  200. func Receive(l *net.UDPConn, buf []byte) (Message, error) {
  201. l.SetReadDeadline(time.Now().Add(ResponseTimeout))
  202. nr, _, err := l.ReadFromUDP(buf)
  203. if err != nil {
  204. return &BaseMessage{}, err
  205. }
  206. return ParseMessage(buf[:nr])
  207. }