agent.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "github.com/gogf/gf/encoding/gjson"
  8. "github.com/gogf/gf/util/gconv"
  9. "github.com/gogf/gf/v2/encoding/gbinary"
  10. "sparrow/pkg/klink"
  11. "sparrow/pkg/models"
  12. "sparrow/pkg/protocol"
  13. "sparrow/pkg/rpcs"
  14. "sparrow/pkg/server"
  15. "time"
  16. )
  17. type Access struct {
  18. client SubDev
  19. }
  20. func NewAgent(client SubDev) *Access {
  21. return &Access{
  22. client: client,
  23. }
  24. }
  25. // Message 收到设备上报消息处理
  26. func (a *Access) Message(topic string, payload []byte) error {
  27. topicInfo, err := protocol.GetTopicInfo(topic)
  28. if err != nil {
  29. return err
  30. }
  31. if topicInfo.Direction == protocol.Down {
  32. return nil
  33. }
  34. device := &models.Device{}
  35. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", topicInfo.DeviceCode, device)
  36. if err != nil {
  37. server.Log.Errorf("device not found %s", topicInfo.DeviceCode)
  38. return nil
  39. }
  40. server.Log.Debugf("device {%v} message {%v} : %s", device.DeviceIdentifier, topicInfo.Types, payload)
  41. if len(topicInfo.Types) == 0 {
  42. return nil
  43. }
  44. jsonPayload, err := gjson.DecodeToJson(payload)
  45. if err != nil {
  46. return nil
  47. }
  48. switch topicInfo.Types[0] {
  49. case "status":
  50. return a.processStatus(topicInfo, device.VendorID, jsonPayload)
  51. case "event":
  52. return a.processEvent(topicInfo, device.VendorID, jsonPayload)
  53. }
  54. return nil
  55. }
  56. func (a *Access) processEvent(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  57. reply := rpcs.ReplyOnEvent{}
  58. args := rpcs.ArgsOnEvent{
  59. DeviceId: topicInfo.DeviceCode,
  60. TimeStamp: message.GetUint64("timestamp"),
  61. SubDeviceId: message.GetString("subDeviceId"),
  62. SubData: message.GetJson("data").MustToJson(),
  63. VendorId: vendorId,
  64. ProductKey: topicInfo.ProductKey,
  65. }
  66. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
  67. if err != nil {
  68. server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
  69. return err
  70. }
  71. return nil
  72. }
  73. func (a *Access) processStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  74. act := klink.PacketAction(message.GetString("action"))
  75. if act != "" {
  76. switch act {
  77. case klink.DevSendAction:
  78. processReportStatus(topicInfo, vendorId, message)
  79. case klink.DevLoginAction:
  80. _ = processDevLogin(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  81. case klink.DevLogoutAction:
  82. _ = processDevLogout(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  83. case klink.DevNetConfigAction:
  84. _ = processDevNetConfig(topicInfo.DeviceCode, message.GetString("md5"))
  85. case klink.ReportFirmwareAction:
  86. _ = processDeviceReportUpgrade(topicInfo.DeviceCode, message.GetString("version"))
  87. case klink.DevUpgradeAction:
  88. _ = a.processDeviceUpgrade(topicInfo.DeviceCode, message)
  89. }
  90. }
  91. return nil
  92. }
  93. func processDevLogin(deviceCode, subDeviceId string) error {
  94. var args rpcs.SubDeviceArgs
  95. args.DeviceCode = deviceCode
  96. args.Status = 1
  97. args.SubDeviceId = subDeviceId
  98. var reply *models.SubDevice
  99. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  100. if err != nil {
  101. server.Log.Errorf("子设备上线出错:%s", err.Error())
  102. }
  103. return nil
  104. }
  105. func processDevLogout(deviceCode, subDeviceId string) error {
  106. var args rpcs.SubDeviceArgs
  107. args.DeviceCode = deviceCode
  108. args.Status = 0
  109. args.SubDeviceId = subDeviceId
  110. var reply *models.SubDevice
  111. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  112. if err != nil {
  113. server.Log.Errorf("子设备下线出错:%s", err.Error())
  114. }
  115. return nil
  116. }
  117. // 处理设备配网信息
  118. func processDevNetConfig(deviceCode, md5 string) error {
  119. args := &models.DeviceNetConfig{
  120. DeviceIdentifier: deviceCode,
  121. MD5: md5,
  122. Status: 1,
  123. }
  124. reply := rpcs.ReplyCheckDeviceNetConfig{}
  125. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateDeviceNetConfig", args, &reply)
  126. if err != nil {
  127. server.Log.Errorf("set device:%s net config info error:%v", deviceCode, err)
  128. }
  129. return nil
  130. }
  131. // 设备上报固件信息处理
  132. func processDeviceReportUpgrade(deviceId, version string) error {
  133. args := &rpcs.ArgsUpdateDeviceVersion{
  134. DeviceId: deviceId,
  135. Version: version,
  136. }
  137. var reply rpcs.ReplyEmptyResult
  138. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply)
  139. if err != nil {
  140. server.Log.Errorf("更新设备版本号失败:%v", err)
  141. }
  142. return nil
  143. }
  144. func processReportStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) {
  145. reply := rpcs.ReplyOnStatus{}
  146. args := rpcs.ArgsOnStatus{
  147. DeviceId: topicInfo.DeviceCode,
  148. Timestamp: message.GetUint64("timestamp"),
  149. SubData: message.GetJson("data").MustToJson(),
  150. VendorId: vendorId,
  151. ProductKey: topicInfo.ProductKey,
  152. SubDeviceId: message.GetString("subDeviceId"),
  153. Action: klink.PacketAction(message.GetString("action")),
  154. }
  155. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
  156. if err != nil {
  157. server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
  158. return
  159. }
  160. }
  161. func (a *Access) processDeviceUpgrade(deviceId string, message *gjson.Json) error {
  162. var reply rpcs.ReplyEmptyResult
  163. data := gjson.New(message.GetJson("data").MustToJson())
  164. switch data.GetString("cmd") {
  165. case "download":
  166. params := gjson.New(data.GetJson("params").MustToJson())
  167. args := &rpcs.ChunkUpgrade{
  168. DeviceId: deviceId,
  169. FileId: params.GetString("fileId"),
  170. FileSize: params.GetInt64("fileSize"),
  171. Size: params.GetInt64("size"),
  172. Offset: params.GetInt("offset"),
  173. }
  174. err := a.chunkUpgrade(*args)
  175. if err != nil {
  176. server.Log.Errorf("分片下载发送失败:%v", err)
  177. }
  178. case "downProgress":
  179. params := gjson.New(data.GetJson("params").MustToJson())
  180. var args rpcs.ArgsOtaProgress
  181. args.DeviceId = deviceId
  182. args.Progress = params.GetInt("progress")
  183. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.UpdateProgress", args, &reply)
  184. if err != nil {
  185. server.Log.Errorf("OTA升级进度保存失败:%v", err)
  186. return err
  187. }
  188. }
  189. return nil
  190. }
  191. // Connected 设备接入时
  192. func (a *Access) Connected(status *protocol.DevConnectStatus) error {
  193. server.Log.Infof("设备上线;%s", status.DeviceId)
  194. // 查询设备信息
  195. device := &models.Device{}
  196. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  197. if err != nil {
  198. server.Log.Errorf("device not found %s", status.DeviceId)
  199. return nil
  200. }
  201. args := rpcs.ArgsGetOnline{
  202. Id: device.DeviceIdentifier,
  203. ClientIP: status.ClientIp,
  204. AccessRPCHost: server.GetRPCHost(),
  205. HeartbeatInterval: 300,
  206. }
  207. reply := rpcs.ReplyGetOnline{}
  208. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnlineV2", args, &reply)
  209. if err != nil {
  210. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  211. }
  212. var cReply rpcs.ReplyEmptyResult
  213. var cArgs rpcs.ArgsGetStatus
  214. cArgs.VendorId = device.VendorID
  215. cArgs.Id = args.Id
  216. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
  217. return err
  218. }
  219. return nil
  220. }
  221. // Disconnected 设备断开连接时
  222. func (a *Access) Disconnected(status *protocol.DevConnectStatus) error {
  223. // 查询设备信息
  224. device := &models.Device{}
  225. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  226. if err != nil {
  227. server.Log.Errorf("device not found %s", status.DeviceId)
  228. return nil
  229. }
  230. args := rpcs.ArgsGetOffline{
  231. Id: device.DeviceIdentifier,
  232. VendorId: device.VendorID,
  233. }
  234. reply := rpcs.ReplyGetOffline{}
  235. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
  236. if err != nil {
  237. server.Log.Errorf("device offline error. deviceid: %v, error: %v", status.DeviceId, err)
  238. }
  239. return err
  240. }
  241. // SendCommand rpc 发送设备命令
  242. func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  243. // 查询设备信息
  244. device := &models.Device{}
  245. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  246. if err != nil {
  247. server.Log.Errorf("device not found %s", args.DeviceId)
  248. return nil
  249. }
  250. product := &models.Product{}
  251. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  252. if err != nil {
  253. server.Log.Errorf("device not found %s", args.DeviceId)
  254. return nil
  255. }
  256. cmd := &klink.CloudSend{
  257. Action: "cloudSend",
  258. MsgId: 0,
  259. DeviceCode: args.DeviceId,
  260. SubDeviceId: args.SubDevice,
  261. Timestamp: time.Now().Unix(),
  262. Data: &klink.CloudSendData{
  263. Cmd: args.Cmd,
  264. Params: args.Params,
  265. },
  266. }
  267. msg, err := cmd.Marshal()
  268. if err != nil {
  269. return err
  270. }
  271. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), msg)
  272. }
  273. // GetStatus rpc 获取设备状态
  274. func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  275. server.Log.Infof("Access Get Status: %v", args)
  276. // first send a get status command
  277. cmdArgs := rpcs.ArgsSendCommand{
  278. DeviceId: args.Id,
  279. WaitTime: 0,
  280. SubDevice: args.SubDeviceId,
  281. Cmd: "report",
  282. }
  283. cmdReply := rpcs.ReplySendCommand{}
  284. return a.SendCommand(cmdArgs, &cmdReply)
  285. }
  286. func (a *Access) chunkUpgrade(params rpcs.ChunkUpgrade) error {
  287. server.Log.Infof("4G模组OTA升级:%s", params.DeviceId)
  288. buf := bytes.NewBuffer(gbinary.BeEncodeUint16(gconv.Uint16(params.Offset)))
  289. var fileArgs rpcs.ArgsOtaFile
  290. fileArgs.FileId = params.FileId
  291. var fileReply rpcs.ReplyOtaFile
  292. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetFile", fileArgs, &fileReply)
  293. if err != nil {
  294. server.Log.Errorf("OTA升级文件保存失败:%v", err)
  295. return err
  296. }
  297. if fileReply.File == nil {
  298. return errors.New(fmt.Sprintf("文件:%s 获取失败", params.FileId))
  299. }
  300. start := params.Offset * int(params.Size)
  301. stop := (params.Offset + 1) + int(params.Size)
  302. if stop >= len(fileArgs.FileData) {
  303. stop = len(fileArgs.FileData)
  304. }
  305. data := fileReply.File[start:stop]
  306. buf.Write(gbinary.BeEncodeUint16(gconv.Uint16(len(data))))
  307. buf.Write(data)
  308. // 生成随机的4位数字
  309. //randomNumber := rand.Intn(9000) + 1000
  310. server.Log.Infof("1----------填充文件:%2X", buf.Bytes())
  311. var mCrc crc
  312. checkSum := mCrc.reset().pushBytes(buf.Bytes()).value()
  313. buf.Write([]byte{byte(checkSum), byte(checkSum >> 8)})
  314. server.Log.Infof("2----------填充CRC:%2X", buf.Bytes())
  315. var SendByteArgs rpcs.ArgsSendByteData
  316. SendByteArgs.DeviceId = params.DeviceId
  317. SendByteArgs.Data = buf.Bytes()
  318. replay := new(rpcs.ReplySendCommand)
  319. err = a.SendByteData(SendByteArgs, replay)
  320. return nil
  321. }
  322. // SendByteData rpc 发送byte数组
  323. func (a *Access) SendByteData(args rpcs.ArgsSendByteData, reply *rpcs.ReplySendCommand) error {
  324. // 查询设备信息
  325. device := &models.Device{}
  326. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  327. if err != nil {
  328. server.Log.Errorf("device not found %s", args.DeviceId)
  329. return err
  330. }
  331. product := &models.Product{}
  332. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  333. if err != nil {
  334. server.Log.Errorf("device not found %s", args.DeviceId)
  335. return err
  336. }
  337. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), args.Data)
  338. }