agent.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. package main
  2. import (
  3. "context"
  4. "github.com/gogf/gf/encoding/gjson"
  5. "sparrow/pkg/klink"
  6. "sparrow/pkg/models"
  7. "sparrow/pkg/protocol"
  8. "sparrow/pkg/rpcs"
  9. "sparrow/pkg/server"
  10. "time"
  11. )
  12. type Access struct {
  13. client SubDev
  14. }
  15. // Message 收到设备上报消息处理
  16. func (a *Access) Message(topic string, payload []byte) error {
  17. topicInfo, err := protocol.GetTopicInfo(topic)
  18. if err != nil {
  19. return err
  20. }
  21. if topicInfo.Direction == protocol.Down {
  22. return nil
  23. }
  24. device := &models.Device{}
  25. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", topicInfo.DeviceCode, device)
  26. if err != nil {
  27. server.Log.Errorf("device not found %s", topicInfo.DeviceCode)
  28. return nil
  29. }
  30. server.Log.Debugf("device {%v} message {%v} : %s", device.DeviceIdentifier, topicInfo.Types, payload)
  31. if len(topicInfo.Types) == 0 {
  32. return nil
  33. }
  34. jsonPayload, err := gjson.DecodeToJson(payload)
  35. if err != nil {
  36. return nil
  37. }
  38. switch topicInfo.Types[0] {
  39. case "status":
  40. return a.processStatus(topicInfo, device.VendorID, jsonPayload)
  41. case "event":
  42. return a.processEvent(topicInfo, device.VendorID, jsonPayload)
  43. }
  44. return nil
  45. }
  46. func (a *Access) processEvent(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  47. reply := rpcs.ReplyOnEvent{}
  48. args := rpcs.ArgsOnEvent{
  49. DeviceId: topicInfo.DeviceCode,
  50. TimeStamp: message.GetUint64("timestamp"),
  51. SubDeviceId: message.GetString("subDeviceId"),
  52. SubData: message.GetJson("data").MustToJson(),
  53. VendorId: vendorId,
  54. }
  55. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
  56. if err != nil {
  57. server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
  58. return err
  59. }
  60. return nil
  61. }
  62. func (a *Access) processStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  63. act := klink.PacketAction(message.GetString("action"))
  64. if act != "" {
  65. switch act {
  66. case klink.DevSendAction:
  67. processReportStatus(topicInfo, vendorId, message)
  68. case klink.DevLoginAction:
  69. _ = processDevLogin(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  70. case klink.DevLogoutAction:
  71. _ = processDevLogout(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  72. case klink.DevNetConfigAction:
  73. _ = processDevNetConfig(topicInfo.DeviceCode, message.GetString("md5"))
  74. case klink.ReportFirmwareAction:
  75. _ = processDeviceReportUpgrade(topicInfo.DeviceCode, message.GetString("version"))
  76. case klink.DevUpgradeAction:
  77. _ = processDeviceUpgrade(topicInfo.DeviceCode, message)
  78. }
  79. }
  80. return nil
  81. }
  82. func processDevLogin(deviceCode, subDeviceId string) error {
  83. var args rpcs.SubDeviceArgs
  84. args.DeviceCode = deviceCode
  85. args.Status = 1
  86. args.SubDeviceId = subDeviceId
  87. var reply *models.SubDevice
  88. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  89. if err != nil {
  90. server.Log.Errorf("子设备上线出错:%s", err.Error())
  91. }
  92. return nil
  93. }
  94. func processDevLogout(deviceCode, subDeviceId string) error {
  95. var args rpcs.SubDeviceArgs
  96. args.DeviceCode = deviceCode
  97. args.Status = 0
  98. args.SubDeviceId = subDeviceId
  99. var reply *models.SubDevice
  100. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  101. if err != nil {
  102. server.Log.Errorf("子设备下线出错:%s", err.Error())
  103. }
  104. return nil
  105. }
  106. // 处理设备配网信息
  107. func processDevNetConfig(deviceCode, md5 string) error {
  108. args := &models.DeviceNetConfig{
  109. DeviceIdentifier: deviceCode,
  110. MD5: md5,
  111. Status: 1,
  112. }
  113. reply := rpcs.ReplyCheckDeviceNetConfig{}
  114. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateDeviceNetConfig", args, &reply)
  115. if err != nil {
  116. server.Log.Errorf("set device:%s net config info error:%v", deviceCode, err)
  117. }
  118. return nil
  119. }
  120. // 设备上报固件信息处理
  121. func processDeviceReportUpgrade(deviceId, version string) error {
  122. args := &rpcs.ArgsUpdateDeviceVersion{
  123. DeviceId: deviceId,
  124. Version: version,
  125. }
  126. var reply rpcs.ReplyEmptyResult
  127. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply)
  128. if err != nil {
  129. server.Log.Errorf("更新设备版本号失败:%v", err)
  130. }
  131. return nil
  132. }
  133. func processReportStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) {
  134. reply := rpcs.ReplyOnStatus{}
  135. args := rpcs.ArgsOnStatus{
  136. DeviceId: topicInfo.DeviceCode,
  137. Timestamp: message.GetUint64("timestamp"),
  138. SubData: message.GetJson("data").MustToJson(),
  139. VendorId: vendorId,
  140. ProductKey: topicInfo.ProductKey,
  141. SubDeviceId: message.GetString("subDeviceId"),
  142. Action: klink.PacketAction(message.GetString("action")),
  143. }
  144. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
  145. if err != nil {
  146. server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
  147. return
  148. }
  149. }
  150. func processDeviceUpgrade(deviceId string, message *gjson.Json) error {
  151. var reply rpcs.ReplyEmptyResult
  152. data := gjson.New(message.GetJson("data").MustToJson())
  153. switch data.GetString("cmd") {
  154. case "download":
  155. params := gjson.New(data.GetJson("params").MustToJson())
  156. args := &rpcs.ChunkUpgrade{
  157. DeviceId: deviceId,
  158. FileId: params.GetString("fileId"),
  159. FileSize: params.GetInt64("fileSize"),
  160. Size: params.GetInt64("size"),
  161. Offset: params.GetInt("offset"),
  162. }
  163. err := server.RPCCallByName(nil, rpcs.MQTTAccessName, "Access.ChunkUpgrade", args, &reply)
  164. if err != nil {
  165. server.Log.Errorf("分片下载发送失败:%v", err)
  166. }
  167. case "downProgress":
  168. params := gjson.New(data.GetJson("params").MustToJson())
  169. var args rpcs.ArgsOtaProgress
  170. args.DeviceId = deviceId
  171. args.Progress = params.GetInt("progress")
  172. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.UpdateProgress", args, &reply)
  173. if err != nil {
  174. server.Log.Errorf("OTA升级进度保存失败:%v", err)
  175. return err
  176. }
  177. }
  178. return nil
  179. }
  180. // Connected 设备接入时
  181. func (a *Access) Connected(status *protocol.DevConnectStatus) error {
  182. server.Log.Infof("设备上线;%s", status.DeviceId)
  183. // 查询设备信息
  184. device := &models.Device{}
  185. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  186. if err != nil {
  187. server.Log.Errorf("device not found %s", status.DeviceId)
  188. return nil
  189. }
  190. args := rpcs.ArgsGetOnline{
  191. Id: device.DeviceIdentifier,
  192. ClientIP: status.ClientIp,
  193. AccessRPCHost: server.GetRPCHost(),
  194. HeartbeatInterval: 300,
  195. }
  196. reply := rpcs.ReplyGetOnline{}
  197. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnlineV2", args, &reply)
  198. if err != nil {
  199. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  200. }
  201. var cReply rpcs.ReplyEmptyResult
  202. var cArgs rpcs.ArgsGetStatus
  203. cArgs.VendorId = device.VendorID
  204. cArgs.Id = args.Id
  205. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
  206. return err
  207. }
  208. return nil
  209. }
  210. // Disconnected 设备断开连接时
  211. func (a *Access) Disconnected(status *protocol.DevConnectStatus) error {
  212. // 查询设备信息
  213. device := &models.Device{}
  214. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  215. if err != nil {
  216. server.Log.Errorf("device not found %s", status.DeviceId)
  217. return nil
  218. }
  219. args := rpcs.ArgsGetOffline{
  220. Id: device.DeviceIdentifier,
  221. VendorId: device.VendorID,
  222. }
  223. reply := rpcs.ReplyGetOffline{}
  224. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
  225. if err != nil {
  226. server.Log.Errorf("device offline error. deviceid: %v, error: %v", status.DeviceId, err)
  227. }
  228. return err
  229. }
  230. // SendCommand rpc 发送设备命令
  231. func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  232. // 查询设备信息
  233. device := &models.Device{}
  234. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  235. if err != nil {
  236. server.Log.Errorf("device not found %s", args.DeviceId)
  237. return nil
  238. }
  239. product := &models.Product{}
  240. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  241. if err != nil {
  242. server.Log.Errorf("device not found %s", args.DeviceId)
  243. return nil
  244. }
  245. cmd := &klink.CloudSend{
  246. Action: "cloudSend",
  247. MsgId: 0,
  248. DeviceCode: args.DeviceId,
  249. SubDeviceId: args.SubDevice,
  250. Timestamp: time.Now().Unix(),
  251. Data: &klink.CloudSendData{
  252. Cmd: args.Cmd,
  253. Params: args.Params,
  254. },
  255. }
  256. msg, err := cmd.Marshal()
  257. if err != nil {
  258. return err
  259. }
  260. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), msg)
  261. }
  262. // GetStatus rpc 获取设备状态
  263. func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  264. server.Log.Infof("Access Get Status: %v", args)
  265. // first send a get status command
  266. cmdArgs := rpcs.ArgsSendCommand{
  267. DeviceId: args.Id,
  268. WaitTime: 0,
  269. SubDevice: args.SubDeviceId,
  270. Cmd: "report",
  271. }
  272. cmdReply := rpcs.ReplySendCommand{}
  273. return a.SendCommand(cmdArgs, &cmdReply)
  274. }
  275. func NewAgent(client SubDev) *Access {
  276. return &Access{
  277. client: client,
  278. }
  279. }
  280. // SendByteData rpc 发送byte数组
  281. func (a *Access) SendByteData(args rpcs.ArgsSendByteData, reply *rpcs.ReplySendCommand) error {
  282. // 查询设备信息
  283. device := &models.Device{}
  284. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  285. if err != nil {
  286. server.Log.Errorf("device not found %s", args.DeviceId)
  287. return nil
  288. }
  289. product := &models.Product{}
  290. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  291. if err != nil {
  292. server.Log.Errorf("device not found %s", args.DeviceId)
  293. return nil
  294. }
  295. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), args.Data)
  296. }