access.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. MQTT "github.com/eclipse/paho.mqtt.golang"
  8. "github.com/gogf/gf/v2/encoding/gbinary"
  9. "github.com/gogf/gf/v2/util/gconv"
  10. "sparrow/pkg/klink"
  11. "sparrow/pkg/mqtt"
  12. "sparrow/pkg/protocol"
  13. "sparrow/pkg/rpcs"
  14. "sparrow/pkg/server"
  15. "time"
  16. )
  17. const (
  18. defaultTimeoutSecond = 5
  19. commandGetCurrentStatus = uint16(65528)
  20. )
  21. type Access struct {
  22. MqttBroker *mqtt.Broker
  23. MQTT.Client
  24. }
  25. func NewAccess() (*Access, error) {
  26. p := NewMQTTProvider()
  27. return &Access{
  28. MqttBroker: mqtt.NewBroker(p),
  29. }, nil
  30. }
  31. func (a *Access) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStatus) error {
  32. server.Log.Infof("Access Set Status: %v", args)
  33. data := &protocol.Data{}
  34. data.Head.Timestamp = uint64(time.Now().Unix())
  35. token, err := a.MqttBroker.GetToken(args.DeviceId)
  36. if err != nil {
  37. return err
  38. }
  39. copy(data.Head.Token[:], token[:16])
  40. data.SubData = args.Status
  41. msg, err := data.Marshal()
  42. if err != nil {
  43. return err
  44. }
  45. return a.MqttBroker.SendMessageToDevice(args.DeviceId, "s", msg, defaultTimeoutSecond*time.Second)
  46. }
  47. func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  48. server.Log.Infof("Access Get Status: %v", args)
  49. // first send a get status command
  50. cmdArgs := rpcs.ArgsSendCommand{
  51. DeviceId: args.Id,
  52. WaitTime: 0,
  53. SubDevice: args.SubDeviceId,
  54. Cmd: "report",
  55. }
  56. cmdReply := rpcs.ReplySendCommand{}
  57. return a.SendCommand(cmdArgs, &cmdReply)
  58. }
  59. func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  60. cmd := &klink.CloudSend{
  61. Action: "cloudSend",
  62. MsgId: 0,
  63. DeviceCode: args.DeviceId,
  64. SubDeviceId: args.SubDevice,
  65. Timestamp: time.Now().Unix(),
  66. Data: &klink.CloudSendData{
  67. Cmd: args.Cmd,
  68. Params: args.Params,
  69. },
  70. }
  71. msg, err := cmd.Marshal()
  72. if err != nil {
  73. return err
  74. }
  75. print("Access Send Command: %v, %v,%s\r\n", args.DeviceId, args.Cmd, string(msg))
  76. return a.MqttBroker.SendMessageToDevice(args.DeviceId, "c", msg, time.Duration(args.WaitTime)*time.Second)
  77. }
  78. // Upgrade 设备OTA升级指令
  79. func (a *Access) Upgrade(args rpcs.ArgsDeviceUpgrade, reply *rpcs.ReplyEmptyResult) error {
  80. server.Log.Infof("设备OTA升级:%s, %s", args.DeviceId, args.Version)
  81. cmd := &klink.CloudSend{
  82. Action: "cloudSend",
  83. MsgId: 0,
  84. DeviceCode: args.DeviceId,
  85. Timestamp: time.Now().Unix(),
  86. Data: &klink.CloudSendData{
  87. Cmd: "devUpgrade",
  88. Params: map[string]interface{}{
  89. "md5": args.Md5,
  90. "url": args.Url,
  91. "version": args.Version,
  92. "file_size": args.FileSize,
  93. },
  94. },
  95. SubDeviceId: args.SudDeviceId,
  96. }
  97. msg, err := cmd.Marshal()
  98. if err != nil {
  99. return err
  100. }
  101. return a.MqttBroker.SendMessageToDevice(args.DeviceId, "c", msg, 5*time.Second)
  102. }
  103. // UpgradeInfo 下发升级包信息
  104. // TODO: 实现
  105. func (a *Access) UpgradeFor4G(args rpcs.ArgsUpgrade4G, reply *rpcs.ReplyEmptyResult) error {
  106. server.Log.Infof("4G模组OTA升级:%s", args.DeviceId)
  107. cmd := &klink.CloudSend{
  108. Action: "cloudSend",
  109. MsgId: 0,
  110. DeviceCode: args.DeviceId,
  111. Timestamp: time.Now().Unix(),
  112. Data: &klink.CloudSendData{
  113. Cmd: "devUpgrade",
  114. Params: map[string]interface{}{
  115. "fileId": args.FileId,
  116. "fileSize": args.FileSize,
  117. },
  118. },
  119. }
  120. msg, err := cmd.Marshal()
  121. if err != nil {
  122. return err
  123. }
  124. return a.MqttBroker.SendMessageToDevice(args.DeviceId, "c", msg, 5*time.Second)
  125. }
  126. func (a *Access) ChunkUpgrade(args rpcs.ChunkUpgrade, reply *rpcs.ReplyEmptyResult) error {
  127. server.Log.Infof("4G模组OTA升级:%s", args.DeviceId)
  128. cmd := &klink.CloudSend{
  129. Action: "cloudSend",
  130. MsgId: 0,
  131. DeviceCode: args.DeviceId,
  132. Timestamp: time.Now().Unix(),
  133. Data: &klink.CloudSendData{
  134. Cmd: "devUpgrade",
  135. Params: map[string]interface{}{
  136. "fileId": args.FileId,
  137. "fileSize": args.FileSize,
  138. "size": args.Size,
  139. "offset": args.Offset,
  140. },
  141. },
  142. }
  143. byteCmd, err := json.Marshal(cmd)
  144. if err != nil {
  145. return err
  146. }
  147. buf := bytes.NewBuffer(gbinary.BeEncodeUint16(gconv.Uint16(len(byteCmd))))
  148. buf.Write(byteCmd)
  149. var fileArgs rpcs.ArgsOtaFile
  150. fileArgs.FileId = args.FileId
  151. var fileReply rpcs.ReplyOtaFile
  152. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetFile", fileArgs, &reply)
  153. if err != nil {
  154. server.Log.Errorf("OTA升级文件保存失败:%v", err)
  155. return err
  156. }
  157. if fileReply.File == nil {
  158. return errors.New(fmt.Sprintf("文件:%s 获取失败", args.FileId))
  159. }
  160. buf.Write(fileReply.File[args.Offset : args.Offset+int(args.Size)])
  161. var mCrc crc
  162. checkSum := mCrc.reset().pushBytes(buf.Bytes()).value()
  163. buf.Write([]byte{byte(checkSum), byte(checkSum >> 8)})
  164. var SendByteArgs rpcs.ArgsSendByteData
  165. SendByteArgs.DeviceId = args.DeviceId
  166. SendByteArgs.Data = buf.Bytes()
  167. err = server.RPCCallByName(nil, rpcs.EmqxAgentServiceName, "Access.SendByteData", SendByteArgs, &reply)
  168. return nil
  169. }