agent.go 12 KB


  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "github.com/gogf/gf/encoding/gjson"
  9. "github.com/gogf/gf/util/gconv"
  10. "github.com/gogf/gf/v2/encoding/gbinary"
  11. "sparrow/pkg/klink"
  12. "sparrow/pkg/models"
  13. "sparrow/pkg/protocol"
  14. "sparrow/pkg/rpcs"
  15. "sparrow/pkg/server"
  16. "time"
  17. )
  18. type Access struct {
  19. client SubDev
  20. }
  21. func NewAgent(client SubDev) *Access {
  22. return &Access{
  23. client: client,
  24. }
  25. }
  26. // Message 收到设备上报消息处理
  27. func (a *Access) Message(topic string, payload []byte) error {
  28. topicInfo, err := protocol.GetTopicInfo(topic)
  29. if err != nil {
  30. return err
  31. }
  32. if topicInfo.Direction == protocol.Down {
  33. return nil
  34. }
  35. device := &models.Device{}
  36. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", topicInfo.DeviceCode, device)
  37. if err != nil {
  38. server.Log.Errorf("device not found %s", topicInfo.DeviceCode)
  39. return nil
  40. }
  41. server.Log.Debugf("device {%v} message {%v} : %s", device.DeviceIdentifier, topicInfo.Types, payload)
  42. if len(topicInfo.Types) == 0 {
  43. return nil
  44. }
  45. jsonPayload, err := gjson.DecodeToJson(payload)
  46. if err != nil {
  47. return nil
  48. }
  49. switch topicInfo.Types[0] {
  50. case "status":
  51. return a.processStatus(topicInfo, device.VendorID, jsonPayload)
  52. case "event":
  53. return a.processEvent(topicInfo, device.VendorID, jsonPayload)
  54. }
  55. return nil
  56. }
  57. func (a *Access) processEvent(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  58. reply := rpcs.ReplyOnEvent{}
  59. args := rpcs.ArgsOnEvent{
  60. DeviceId: topicInfo.DeviceCode,
  61. TimeStamp: message.GetUint64("timestamp"),
  62. SubDeviceId: message.GetString("subDeviceId"),
  63. SubData: message.GetJson("data").MustToJson(),
  64. VendorId: vendorId,
  65. ProductKey: topicInfo.ProductKey,
  66. }
  67. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
  68. if err != nil {
  69. server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
  70. return err
  71. }
  72. return nil
  73. }
  74. func (a *Access) processStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  75. act := klink.PacketAction(message.GetString("action"))
  76. if act != "" {
  77. switch act {
  78. case klink.DevSendAction:
  79. processReportStatus(topicInfo, vendorId, message)
  80. case klink.DevLoginAction:
  81. _ = processDevLogin(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  82. case klink.DevLogoutAction:
  83. _ = processDevLogout(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  84. case klink.DevNetConfigAction:
  85. _ = processDevNetConfig(topicInfo.DeviceCode, message.GetString("md5"))
  86. case klink.ReportFirmwareAction:
  87. _ = processDeviceReportUpgrade(topicInfo.DeviceCode, message.GetString("version"))
  88. case klink.DevUpgradeAction:
  89. _ = a.processDeviceUpgrade(topicInfo.DeviceCode, message)
  90. }
  91. }
  92. return nil
  93. }
  94. func processDevLogin(deviceCode, subDeviceId string) error {
  95. var args rpcs.SubDeviceArgs
  96. args.DeviceCode = deviceCode
  97. args.Status = 1
  98. args.SubDeviceId = subDeviceId
  99. var reply *models.SubDevice
  100. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  101. if err != nil {
  102. server.Log.Errorf("子设备上线出错:%s", err.Error())
  103. }
  104. return nil
  105. }
  106. func processDevLogout(deviceCode, subDeviceId string) error {
  107. var args rpcs.SubDeviceArgs
  108. args.DeviceCode = deviceCode
  109. args.Status = 0
  110. args.SubDeviceId = subDeviceId
  111. var reply *models.SubDevice
  112. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  113. if err != nil {
  114. server.Log.Errorf("子设备下线出错:%s", err.Error())
  115. }
  116. return nil
  117. }
  118. // 处理设备配网信息
  119. func processDevNetConfig(deviceCode, md5 string) error {
  120. args := &models.DeviceNetConfig{
  121. DeviceIdentifier: deviceCode,
  122. MD5: md5,
  123. Status: 1,
  124. }
  125. reply := rpcs.ReplyCheckDeviceNetConfig{}
  126. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateDeviceNetConfig", args, &reply)
  127. if err != nil {
  128. server.Log.Errorf("set device:%s net config info error:%v", deviceCode, err)
  129. }
  130. return nil
  131. }
  132. // 设备上报固件信息处理
  133. func processDeviceReportUpgrade(deviceId, version string) error {
  134. args := &rpcs.ArgsUpdateDeviceVersion{
  135. DeviceId: deviceId,
  136. Version: version,
  137. }
  138. var reply rpcs.ReplyEmptyResult
  139. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply)
  140. if err != nil {
  141. server.Log.Errorf("更新设备版本号失败:%v", err)
  142. }
  143. return nil
  144. }
  145. func processReportStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) {
  146. reply := rpcs.ReplyOnStatus{}
  147. args := rpcs.ArgsOnStatus{
  148. DeviceId: topicInfo.DeviceCode,
  149. Timestamp: message.GetUint64("timestamp"),
  150. SubData: message.GetJson("data").MustToJson(),
  151. VendorId: vendorId,
  152. ProductKey: topicInfo.ProductKey,
  153. SubDeviceId: message.GetString("subDeviceId"),
  154. Action: klink.PacketAction(message.GetString("action")),
  155. }
  156. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
  157. if err != nil {
  158. server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
  159. return
  160. }
  161. }
  162. func (a *Access) processDeviceUpgrade(deviceId string, message *gjson.Json) error {
  163. var reply rpcs.ReplyEmptyResult
  164. data := gjson.New(message.GetJson("data").MustToJson())
  165. switch data.GetString("cmd") {
  166. case "download":
  167. params := gjson.New(data.GetJson("params").MustToJson())
  168. args := &rpcs.ChunkUpgrade{
  169. DeviceId: deviceId,
  170. FileId: params.GetString("fileId"),
  171. FileSize: params.GetInt64("fileSize"),
  172. Size: params.GetInt64("size"),
  173. Offset: params.GetInt("offset"),
  174. }
  175. err := a.chunkUpgrade(*args)
  176. if err != nil {
  177. server.Log.Errorf("分片下载发送失败:%v", err)
  178. }
  179. case "downProgress":
  180. params := gjson.New(data.GetJson("params").MustToJson())
  181. var args rpcs.ArgsOtaProgress
  182. args.DeviceId = deviceId
  183. args.Progress = params.GetInt("progress")
  184. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.UpdateProgress", args, &reply)
  185. if err != nil {
  186. server.Log.Errorf("OTA升级进度保存失败:%v", err)
  187. return err
  188. }
  189. }
  190. return nil
  191. }
  192. // Connected 设备接入时
  193. func (a *Access) Connected(status *protocol.DevConnectStatus) error {
  194. server.Log.Infof("设备上线;%s", status.DeviceId)
  195. // 查询设备信息
  196. device := &models.Device{}
  197. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  198. if err != nil {
  199. server.Log.Errorf("device not found %s", status.DeviceId)
  200. return nil
  201. }
  202. args := rpcs.ArgsGetOnline{
  203. Id: device.DeviceIdentifier,
  204. ClientIP: status.ClientIp,
  205. AccessRPCHost: server.GetRPCHost(),
  206. HeartbeatInterval: 300,
  207. }
  208. reply := rpcs.ReplyGetOnline{}
  209. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnlineV2", args, &reply)
  210. if err != nil {
  211. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  212. }
  213. var cReply rpcs.ReplyEmptyResult
  214. var cArgs rpcs.ArgsGetStatus
  215. cArgs.VendorId = device.VendorID
  216. cArgs.Id = args.Id
  217. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
  218. return err
  219. }
  220. return nil
  221. }
  222. // Disconnected 设备断开连接时
  223. func (a *Access) Disconnected(status *protocol.DevConnectStatus) error {
  224. // 查询设备信息
  225. device := &models.Device{}
  226. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  227. if err != nil {
  228. server.Log.Errorf("device not found %s", status.DeviceId)
  229. return nil
  230. }
  231. args := rpcs.ArgsGetOffline{
  232. Id: device.DeviceIdentifier,
  233. VendorId: device.VendorID,
  234. }
  235. reply := rpcs.ReplyGetOffline{}
  236. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
  237. if err != nil {
  238. server.Log.Errorf("device offline error. deviceid: %v, error: %v", status.DeviceId, err)
  239. }
  240. return err
  241. }
  242. // SendCommand rpc 发送设备命令
  243. func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  244. // 查询设备信息
  245. device := &models.Device{}
  246. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  247. if err != nil {
  248. server.Log.Errorf("device not found %s", args.DeviceId)
  249. return nil
  250. }
  251. product := &models.Product{}
  252. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  253. if err != nil {
  254. server.Log.Errorf("device not found %s", args.DeviceId)
  255. return nil
  256. }
  257. cmd := &klink.CloudSend{
  258. Action: "cloudSend",
  259. MsgId: 0,
  260. DeviceCode: args.DeviceId,
  261. SubDeviceId: args.SubDevice,
  262. Timestamp: time.Now().Unix(),
  263. Data: &klink.CloudSendData{
  264. Cmd: args.Cmd,
  265. Params: args.Params,
  266. },
  267. }
  268. msg, err := cmd.Marshal()
  269. if err != nil {
  270. return err
  271. }
  272. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), msg)
  273. }
  274. // GetStatus rpc 获取设备状态
  275. func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  276. server.Log.Infof("Access Get Status: %v", args)
  277. // first send a get status command
  278. cmdArgs := rpcs.ArgsSendCommand{
  279. DeviceId: args.Id,
  280. WaitTime: 0,
  281. SubDevice: args.SubDeviceId,
  282. Cmd: "report",
  283. }
  284. cmdReply := rpcs.ReplySendCommand{}
  285. return a.SendCommand(cmdArgs, &cmdReply)
  286. }
  287. func (a *Access) chunkUpgrade(params rpcs.ChunkUpgrade) error {
  288. server.Log.Infof("4G模组OTA升级:%s", params.DeviceId)
  289. cmd := &klink.CloudSend{
  290. Action: "cloudSend",
  291. MsgId: 0,
  292. DeviceCode: params.DeviceId,
  293. Timestamp: time.Now().Unix(),
  294. Data: &klink.CloudSendData{
  295. Cmd: "devUpgrade",
  296. Params: map[string]interface{}{
  297. "fileId": params.FileId,
  298. "fileSize": params.FileSize,
  299. "size": params.Size,
  300. "offset": params.Offset,
  301. },
  302. },
  303. }
  304. byteCmd, err := json.Marshal(cmd)
  305. if err != nil {
  306. return err
  307. }
  308. buf := bytes.NewBuffer(gbinary.BeEncodeUint16(gconv.Uint16(len(byteCmd))))
  309. server.Log.Infof("1----------填充数据长度:%2X", buf.Bytes())
  310. buf.Write(byteCmd)
  311. server.Log.Infof("2----------填充响应数据:%2X", buf.Bytes())
  312. var fileArgs rpcs.ArgsOtaFile
  313. fileArgs.FileId = params.FileId
  314. var fileReply rpcs.ReplyOtaFile
  315. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetFile", fileArgs, &fileReply)
  316. if err != nil {
  317. server.Log.Errorf("OTA升级文件保存失败:%v", err)
  318. return err
  319. }
  320. if fileReply.File == nil {
  321. return errors.New(fmt.Sprintf("文件:%s 获取失败", params.FileId))
  322. }
  323. buf.Write(fileReply.File[params.Offset : params.Offset+int(params.Size)])
  324. server.Log.Infof("3----------填充文件:%2X", buf.Bytes())
  325. var mCrc crc
  326. checkSum := mCrc.reset().pushBytes(buf.Bytes()).value()
  327. buf.Write([]byte{byte(checkSum), byte(checkSum >> 8)})
  328. server.Log.Infof("4----------填充CRC:%2X", buf.Bytes())
  329. var SendByteArgs rpcs.ArgsSendByteData
  330. SendByteArgs.DeviceId = params.DeviceId
  331. SendByteArgs.Data = buf.Bytes()
  332. replay := new(rpcs.ReplySendCommand)
  333. err = a.SendByteData(SendByteArgs, replay)
  334. return nil
  335. }
  336. // SendByteData rpc 发送byte数组
  337. func (a *Access) SendByteData(args rpcs.ArgsSendByteData, reply *rpcs.ReplySendCommand) error {
  338. // 查询设备信息
  339. device := &models.Device{}
  340. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  341. if err != nil {
  342. server.Log.Errorf("device not found %s", args.DeviceId)
  343. return err
  344. }
  345. product := &models.Product{}
  346. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  347. if err != nil {
  348. server.Log.Errorf("device not found %s", args.DeviceId)
  349. return err
  350. }
  351. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), args.Data)
  352. }