manager.go 4.9 KB


  1. package coap
  2. import (
  3. "net"
  4. "sparrow/pkg/rpcs"
  5. "sparrow/pkg/server"
  6. "strconv"
  7. "sync/atomic"
  8. "time"
  9. )
  10. const (
  11. // ResponseTimeout is the amount of time to wait for a
  12. // response.
  13. ResponseTimeout = time.Second * 2
  14. // ResponseRandomFactor is a multiplier for response backoff.
  15. ResponseRandomFactor = 1.5
  16. // MaxRetransmit is the maximum number of times a message will
  17. // be retransmitted.
  18. MaxRetransmit = 4
  19. maxPktlen = 1500
  20. maxWorkersCount = 10000
  21. idleWorkerTimeout = 10 * time.Second
  22. pubStatusTopic = "s"
  23. pubEventTopic = "e"
  24. subCommandTopic = "c"
  25. )
  26. // Manager manager
  27. type Manager struct {
  28. queue chan *Request
  29. Provider Provider
  30. workersCount int32
  31. }
  32. // NewManager new manager
  33. func NewManager(p Provider) *Manager {
  34. return &Manager{
  35. Provider: p,
  36. queue: make(chan *Request),
  37. }
  38. }
  39. // Handler udp handler
  40. func (m *Manager) Handler(conn *net.UDPConn) {
  41. buf := make([]byte, maxPktlen)
  42. for {
  43. nr, addr, err := conn.ReadFromUDP(buf)
  44. if err != nil {
  45. if neterr, ok := err.(net.Error); ok && (neterr.Temporary() || neterr.Timeout()) {
  46. time.Sleep(5 * time.Millisecond)
  47. continue
  48. }
  49. }
  50. tmp := make([]byte, nr)
  51. copy(tmp, buf)
  52. msg, err := ParseMessage(tmp)
  53. if err != nil {
  54. server.Log.Error(err)
  55. }
  56. m.spawnWorker(&Request{
  57. Msg: msg,
  58. Addr: addr,
  59. Conn: conn,
  60. })
  61. }
  62. }
  63. func (m *Manager) worker(w *Request) {
  64. m.serve(w)
  65. for {
  66. count := atomic.LoadInt32(&m.workersCount)
  67. if count > maxWorkersCount {
  68. return
  69. }
  70. if atomic.CompareAndSwapInt32(&m.workersCount, count, count+1) {
  71. break
  72. }
  73. }
  74. defer atomic.AddInt32(&m.workersCount, -1)
  75. inUse := false
  76. timeout := time.NewTimer(idleWorkerTimeout)
  77. defer timeout.Stop()
  78. for m.workerChannelHandler(inUse, timeout) {
  79. }
  80. }
  81. func (m *Manager) workerChannelHandler(inUse bool, timeout *time.Timer) bool {
  82. select {
  83. case w, ok := <-m.queue:
  84. if !ok {
  85. return false
  86. }
  87. inUse = true
  88. m.serve(w)
  89. case <-timeout.C:
  90. if !inUse {
  91. return false
  92. }
  93. inUse = false
  94. timeout.Reset(idleWorkerTimeout)
  95. }
  96. return true
  97. }
  98. // coap://endpoint/$DEVICE_ID/s
  99. func (m *Manager) serve(w *Request) {
  100. msg := w.Msg
  101. server.Log.Debugf("get packet:%#v, workers count :%d", msg, m.workersCount)
  102. if msg.IsConfirmable() && len(msg.Path()) > 1 {
  103. token := msg.GetToken()
  104. // TODO:想别的deviceid的
  105. deviceid, err := strconv.ParseUint(msg.Path()[0], 10, 0)
  106. if err != nil {
  107. server.Log.Errorf("device id error:%s", msg.Path()[0])
  108. return
  109. }
  110. if len(token) != 8 {
  111. res := &BaseMessage{
  112. Code: Unauthorized,
  113. Type: ACK,
  114. MessageID: msg.GetMessageID(),
  115. Token: msg.GetToken(),
  116. }
  117. bytes, _ := res.Encode()
  118. w.Conn.WriteTo(bytes, w.Addr)
  119. server.Log.Errorf("token length error, size :%d", len(token))
  120. return
  121. }
  122. //check token
  123. err = m.Provider.ValidateDeviceToken(deviceid, token)
  124. if err != nil {
  125. res := &BaseMessage{
  126. Code: Unauthorized,
  127. Type: ACK,
  128. MessageID: msg.GetMessageID(),
  129. Token: msg.GetToken(),
  130. }
  131. bytes, _ := res.Encode()
  132. w.Conn.WriteTo(bytes, w.Addr)
  133. server.Log.Warnf("device %d token not validate, token :%v", deviceid, token)
  134. return
  135. }
  136. args := rpcs.ArgsGetOnline{
  137. Id: deviceid,
  138. ClientIP: w.Addr.String(),
  139. AccessRPCHost: server.GetRPCHost(),
  140. HeartbeatInterval: 30,
  141. }
  142. ack := &BaseMessage{
  143. Code: Changed,
  144. Type: ACK,
  145. MessageID: msg.GetMessageID(),
  146. Token: msg.GetToken(),
  147. }
  148. ackbytes, _ := ack.Encode()
  149. w.Conn.WriteTo(ackbytes, w.Addr)
  150. err = m.Provider.OnDeviceOnline(args)
  151. if err != nil {
  152. server.Log.Warnf("device online error :%v", err)
  153. return
  154. }
  155. server.Log.Infof("device %d, connected to server now host:%s", deviceid, w.Addr.String())
  156. topic := msg.Path()[1]
  157. switch topic {
  158. case pubStatusTopic, pubEventTopic, subCommandTopic:
  159. server.Log.Infof("%s, publish status", w.Addr.String())
  160. m.Provider.OnDeviceMessage(uint64(deviceid), topic, msg.GetPayload())
  161. err := m.Provider.OnDeviceHeartBeat(uint64(deviceid))
  162. if err != nil {
  163. server.Log.Warnf("heartbeat set error:%s", w.Addr.String())
  164. return
  165. }
  166. //pub ack
  167. ack := &BaseMessage{
  168. Code: Created,
  169. Type: ACK,
  170. MessageID: msg.GetMessageID(),
  171. Token: msg.GetToken(),
  172. }
  173. ackbytes, _ := ack.Encode()
  174. w.Conn.WriteTo(ackbytes, w.Addr)
  175. default:
  176. //无效主题
  177. server.Log.Errorf("unknown msg type:%s", topic)
  178. ack := &BaseMessage{
  179. Code: BadRequest,
  180. Type: ACK,
  181. MessageID: msg.GetMessageID(),
  182. Token: msg.GetToken(),
  183. }
  184. ackbytes, _ := ack.Encode()
  185. w.Conn.WriteTo(ackbytes, w.Addr)
  186. return
  187. }
  188. } else {
  189. //无效请求
  190. }
  191. }
  192. func (m *Manager) spawnWorker(req *Request) {
  193. select {
  194. case m.queue <- req:
  195. default:
  196. go m.serve(req)
  197. }
  198. }
  199. // Receive a message.
  200. func Receive(l *net.UDPConn, buf []byte) (Message, error) {
  201. l.SetReadDeadline(time.Now().Add(ResponseTimeout))
  202. nr, _, err := l.ReadFromUDP(buf)
  203. if err != nil {
  204. return &BaseMessage{}, err
  205. }
  206. return ParseMessage(buf[:nr])
  207. }