manager.go 4.8 KB


  1. package coap
  2. import (
  3. "net"
  4. "sparrow/pkg/rpcs"
  5. "sparrow/pkg/server"
  6. "strconv"
  7. "sync/atomic"
  8. "time"
  9. )
  10. const (
  11. // ResponseTimeout is the amount of time to wait for a
  12. // response.
  13. ResponseTimeout = time.Second * 2
  14. // ResponseRandomFactor is a multiplier for response backoff.
  15. ResponseRandomFactor = 1.5
  16. // MaxRetransmit is the maximum number of times a message will
  17. // be retransmitted.
  18. MaxRetransmit = 4
  19. maxPktlen = 1500
  20. maxWorkersCount = 10000
  21. idleWorkerTimeout = 10 * time.Second
  22. pubStatusTopic = "s"
  23. pubEventTopic = "e"
  24. subCommandTopic = "c"
  25. )
  26. type Manager struct {
  27. queue chan *Request
  28. Provider Provider
  29. workersCount int32
  30. }
  31. func NewManager(p Provider) *Manager {
  32. return &Manager{
  33. Provider: p,
  34. queue: make(chan *Request),
  35. }
  36. }
  37. func (m *Manager) Handler(conn *net.UDPConn) {
  38. buf := make([]byte, maxPktlen)
  39. for {
  40. nr, addr, err := conn.ReadFromUDP(buf)
  41. if err != nil {
  42. if neterr, ok := err.(net.Error); ok && (neterr.Temporary() || neterr.Timeout()) {
  43. time.Sleep(5 * time.Millisecond)
  44. continue
  45. }
  46. }
  47. tmp := make([]byte, nr)
  48. copy(tmp, buf)
  49. msg, err := ParseMessage(tmp)
  50. if err != nil {
  51. server.Log.Error(err)
  52. }
  53. m.spawnWorker(&Request{
  54. Msg: msg,
  55. Addr: addr,
  56. Conn: conn,
  57. })
  58. }
  59. }
  60. func (m *Manager) worker(w *Request) {
  61. m.serve(w)
  62. for {
  63. count := atomic.LoadInt32(&m.workersCount)
  64. if count > maxWorkersCount {
  65. return
  66. }
  67. if atomic.CompareAndSwapInt32(&m.workersCount, count, count+1) {
  68. break
  69. }
  70. }
  71. defer atomic.AddInt32(&m.workersCount, -1)
  72. inUse := false
  73. timeout := time.NewTimer(idleWorkerTimeout)
  74. defer timeout.Stop()
  75. for m.workerChannelHandler(inUse, timeout) {
  76. }
  77. }
  78. func (m *Manager) workerChannelHandler(inUse bool, timeout *time.Timer) bool {
  79. select {
  80. case w, ok := <-m.queue:
  81. if !ok {
  82. return false
  83. }
  84. inUse = true
  85. m.serve(w)
  86. case <-timeout.C:
  87. if !inUse {
  88. return false
  89. }
  90. inUse = false
  91. timeout.Reset(idleWorkerTimeout)
  92. }
  93. return true
  94. }
  95. // coap://endpoint/$DEVICE_ID/s
  96. func (m *Manager) serve(w *Request) {
  97. msg := w.Msg
  98. server.Log.Debugf("get packet:%#v", msg)
  99. if msg.IsConfirmable() && len(msg.Path()) > 1 {
  100. token := msg.GetToken()
  101. // TODO:想别的deviceid的
  102. deviceid, err := strconv.ParseUint(msg.Path()[0], 10, 0)
  103. if err != nil {
  104. server.Log.Errorf("device id error:%s", msg.Path()[0])
  105. return
  106. }
  107. if len(token) != 8 {
  108. res := &BaseMessage{
  109. Code: Unauthorized,
  110. Type: ACK,
  111. MessageID: msg.GetMessageID(),
  112. Token: msg.GetToken(),
  113. }
  114. bytes, _ := res.Encode()
  115. w.Conn.WriteTo(bytes, w.Addr)
  116. server.Log.Errorf("token length error, size :%d", len(token))
  117. return
  118. }
  119. //check token
  120. err = m.Provider.ValidateDeviceToken(deviceid, token)
  121. if err != nil {
  122. res := &BaseMessage{
  123. Code: Unauthorized,
  124. Type: ACK,
  125. MessageID: msg.GetMessageID(),
  126. Token: msg.GetToken(),
  127. }
  128. bytes, _ := res.Encode()
  129. w.Conn.WriteTo(bytes, w.Addr)
  130. server.Log.Warnf("device %d token not validate, token :%v", deviceid, token)
  131. return
  132. }
  133. args := rpcs.ArgsGetOnline{
  134. Id: deviceid,
  135. ClientIP: w.Addr.String(),
  136. AccessRPCHost: server.GetRPCHost(),
  137. HeartbeatInterval: 30,
  138. }
  139. ack := &BaseMessage{
  140. Code: Changed,
  141. Type: ACK,
  142. MessageID: msg.GetMessageID(),
  143. Token: msg.GetToken(),
  144. }
  145. ackbytes, _ := ack.Encode()
  146. w.Conn.WriteTo(ackbytes, w.Addr)
  147. err = m.Provider.OnDeviceOnline(args)
  148. if err != nil {
  149. server.Log.Warnf("device online error :%v", err)
  150. return
  151. }
  152. server.Log.Infof("device %d, connected to server now host:%s", deviceid, w.Addr.String())
  153. topic := msg.Path()[1]
  154. switch topic {
  155. case pubStatusTopic, pubEventTopic, subCommandTopic:
  156. server.Log.Infof("%s, publish status", w.Addr.String())
  157. m.Provider.OnDeviceMessage(uint64(deviceid), topic, msg.GetPayload())
  158. err := m.Provider.OnDeviceHeartBeat(uint64(deviceid))
  159. if err != nil {
  160. server.Log.Warnf("heartbeat set error:%s", w.Addr.String())
  161. return
  162. }
  163. //pub ack
  164. ack := &BaseMessage{
  165. Code: Created,
  166. Type: ACK,
  167. MessageID: msg.GetMessageID(),
  168. Token: msg.GetToken(),
  169. }
  170. ackbytes, _ := ack.Encode()
  171. w.Conn.WriteTo(ackbytes, w.Addr)
  172. default:
  173. //无效主题
  174. server.Log.Errorf("unknown msg type:%s", topic)
  175. ack := &BaseMessage{
  176. Code: BadRequest,
  177. Type: ACK,
  178. MessageID: msg.GetMessageID(),
  179. Token: msg.GetToken(),
  180. }
  181. ackbytes, _ := ack.Encode()
  182. w.Conn.WriteTo(ackbytes, w.Addr)
  183. return
  184. }
  185. } else {
  186. //无效请求
  187. }
  188. }
  189. func (m *Manager) spawnWorker(req *Request) {
  190. select {
  191. case m.queue <- req:
  192. default:
  193. go m.serve(req)
  194. }
  195. }
  196. // Receive a message.
  197. func Receive(l *net.UDPConn, buf []byte) (Message, error) {
  198. l.SetReadDeadline(time.Now().Add(ResponseTimeout))
  199. nr, _, err := l.ReadFromUDP(buf)
  200. if err != nil {
  201. return &BaseMessage{}, err
  202. }
  203. return ParseMessage(buf[:nr])
  204. }