agent.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. package main
  2. import (
  3. "context"
  4. "github.com/gogf/gf/encoding/gjson"
  5. "sparrow/pkg/klink"
  6. "sparrow/pkg/models"
  7. "sparrow/pkg/protocol"
  8. "sparrow/pkg/rpcs"
  9. "sparrow/pkg/server"
  10. "time"
  11. )
  12. type Access struct {
  13. client SubDev
  14. }
  15. // Message 收到设备上报消息处理
  16. func (a *Access) Message(topic string, payload []byte) error {
  17. topicInfo, err := protocol.GetTopicInfo(topic)
  18. if err != nil {
  19. return err
  20. }
  21. if topicInfo.Direction == protocol.Down {
  22. return nil
  23. }
  24. device := &models.Device{}
  25. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", topicInfo.DeviceCode, device)
  26. if err != nil {
  27. server.Log.Errorf("device not found %s", topicInfo.DeviceCode)
  28. return nil
  29. }
  30. server.Log.Debugf("device {%v} message {%v} : %s", device.DeviceIdentifier, topicInfo.Types, payload)
  31. if len(topicInfo.Types) == 0 {
  32. return nil
  33. }
  34. jsonPayload, err := gjson.DecodeToJson(payload)
  35. if err != nil {
  36. return nil
  37. }
  38. switch topicInfo.Types[0] {
  39. case "status":
  40. return a.processStatus(topicInfo, device.VendorID, jsonPayload)
  41. case "event":
  42. return a.processEvent(topicInfo, device.VendorID, jsonPayload)
  43. }
  44. return nil
  45. }
  46. func (a *Access) processEvent(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  47. reply := rpcs.ReplyOnEvent{}
  48. args := rpcs.ArgsOnEvent{
  49. DeviceId: topicInfo.DeviceCode,
  50. TimeStamp: message.GetUint64("timestamp"),
  51. SubDeviceId: message.GetString("subDeviceId"),
  52. SubData: message.GetJson("data").MustToJson(),
  53. VendorId: vendorId,
  54. ProductKey: topicInfo.ProductKey,
  55. }
  56. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
  57. if err != nil {
  58. server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
  59. return err
  60. }
  61. return nil
  62. }
  63. func (a *Access) processStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  64. act := klink.PacketAction(message.GetString("action"))
  65. if act != "" {
  66. switch act {
  67. case klink.DevSendAction:
  68. processReportStatus(topicInfo, vendorId, message)
  69. case klink.DevLoginAction:
  70. _ = processDevLogin(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  71. case klink.DevLogoutAction:
  72. _ = processDevLogout(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  73. case klink.DevNetConfigAction:
  74. _ = processDevNetConfig(topicInfo.DeviceCode, message.GetString("md5"))
  75. case klink.ReportFirmwareAction:
  76. _ = processDeviceReportUpgrade(topicInfo.DeviceCode, message.GetString("version"))
  77. case klink.DevUpgradeAction:
  78. _ = processDeviceUpgrade(topicInfo.DeviceCode, message)
  79. }
  80. }
  81. return nil
  82. }
  83. func processDevLogin(deviceCode, subDeviceId string) error {
  84. var args rpcs.SubDeviceArgs
  85. args.DeviceCode = deviceCode
  86. args.Status = 1
  87. args.SubDeviceId = subDeviceId
  88. var reply *models.SubDevice
  89. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  90. if err != nil {
  91. server.Log.Errorf("子设备上线出错:%s", err.Error())
  92. }
  93. return nil
  94. }
  95. func processDevLogout(deviceCode, subDeviceId string) error {
  96. var args rpcs.SubDeviceArgs
  97. args.DeviceCode = deviceCode
  98. args.Status = 0
  99. args.SubDeviceId = subDeviceId
  100. var reply *models.SubDevice
  101. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  102. if err != nil {
  103. server.Log.Errorf("子设备下线出错:%s", err.Error())
  104. }
  105. return nil
  106. }
  107. // 处理设备配网信息
  108. func processDevNetConfig(deviceCode, md5 string) error {
  109. args := &models.DeviceNetConfig{
  110. DeviceIdentifier: deviceCode,
  111. MD5: md5,
  112. Status: 1,
  113. }
  114. reply := rpcs.ReplyCheckDeviceNetConfig{}
  115. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateDeviceNetConfig", args, &reply)
  116. if err != nil {
  117. server.Log.Errorf("set device:%s net config info error:%v", deviceCode, err)
  118. }
  119. return nil
  120. }
  121. // 设备上报固件信息处理
  122. func processDeviceReportUpgrade(deviceId, version string) error {
  123. args := &rpcs.ArgsUpdateDeviceVersion{
  124. DeviceId: deviceId,
  125. Version: version,
  126. }
  127. var reply rpcs.ReplyEmptyResult
  128. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply)
  129. if err != nil {
  130. server.Log.Errorf("更新设备版本号失败:%v", err)
  131. }
  132. return nil
  133. }
  134. func processReportStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) {
  135. reply := rpcs.ReplyOnStatus{}
  136. args := rpcs.ArgsOnStatus{
  137. DeviceId: topicInfo.DeviceCode,
  138. Timestamp: message.GetUint64("timestamp"),
  139. SubData: message.GetJson("data").MustToJson(),
  140. VendorId: vendorId,
  141. ProductKey: topicInfo.ProductKey,
  142. SubDeviceId: message.GetString("subDeviceId"),
  143. Action: klink.PacketAction(message.GetString("action")),
  144. }
  145. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
  146. if err != nil {
  147. server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
  148. return
  149. }
  150. }
  151. func processDeviceUpgrade(deviceId string, message *gjson.Json) error {
  152. var reply rpcs.ReplyEmptyResult
  153. data := gjson.New(message.GetJson("data").MustToJson())
  154. switch data.GetString("cmd") {
  155. case "download":
  156. params := gjson.New(data.GetJson("params").MustToJson())
  157. args := &rpcs.ChunkUpgrade{
  158. DeviceId: deviceId,
  159. FileId: params.GetString("fileId"),
  160. FileSize: params.GetInt64("fileSize"),
  161. Size: params.GetInt64("size"),
  162. Offset: params.GetInt("offset"),
  163. }
  164. err := server.RPCCallByName(nil, rpcs.MQTTAccessName, "Access.ChunkUpgrade", args, &reply)
  165. if err != nil {
  166. server.Log.Errorf("分片下载发送失败:%v", err)
  167. }
  168. case "downProgress":
  169. params := gjson.New(data.GetJson("params").MustToJson())
  170. var args rpcs.ArgsOtaProgress
  171. args.DeviceId = deviceId
  172. args.Progress = params.GetInt("progress")
  173. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.UpdateProgress", args, &reply)
  174. if err != nil {
  175. server.Log.Errorf("OTA升级进度保存失败:%v", err)
  176. return err
  177. }
  178. }
  179. return nil
  180. }
  181. // Connected 设备接入时
  182. func (a *Access) Connected(status *protocol.DevConnectStatus) error {
  183. server.Log.Infof("设备上线;%s", status.DeviceId)
  184. // 查询设备信息
  185. device := &models.Device{}
  186. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  187. if err != nil {
  188. server.Log.Errorf("device not found %s", status.DeviceId)
  189. return nil
  190. }
  191. args := rpcs.ArgsGetOnline{
  192. Id: device.DeviceIdentifier,
  193. ClientIP: status.ClientIp,
  194. AccessRPCHost: server.GetRPCHost(),
  195. HeartbeatInterval: 300,
  196. }
  197. reply := rpcs.ReplyGetOnline{}
  198. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnlineV2", args, &reply)
  199. if err != nil {
  200. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  201. }
  202. var cReply rpcs.ReplyEmptyResult
  203. var cArgs rpcs.ArgsGetStatus
  204. cArgs.VendorId = device.VendorID
  205. cArgs.Id = args.Id
  206. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
  207. return err
  208. }
  209. return nil
  210. }
  211. // Disconnected 设备断开连接时
  212. func (a *Access) Disconnected(status *protocol.DevConnectStatus) error {
  213. // 查询设备信息
  214. device := &models.Device{}
  215. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  216. if err != nil {
  217. server.Log.Errorf("device not found %s", status.DeviceId)
  218. return nil
  219. }
  220. args := rpcs.ArgsGetOffline{
  221. Id: device.DeviceIdentifier,
  222. VendorId: device.VendorID,
  223. }
  224. reply := rpcs.ReplyGetOffline{}
  225. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
  226. if err != nil {
  227. server.Log.Errorf("device offline error. deviceid: %v, error: %v", status.DeviceId, err)
  228. }
  229. return err
  230. }
  231. // SendCommand rpc 发送设备命令
  232. func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  233. // 查询设备信息
  234. device := &models.Device{}
  235. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  236. if err != nil {
  237. server.Log.Errorf("device not found %s", args.DeviceId)
  238. return nil
  239. }
  240. product := &models.Product{}
  241. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  242. if err != nil {
  243. server.Log.Errorf("device not found %s", args.DeviceId)
  244. return nil
  245. }
  246. cmd := &klink.CloudSend{
  247. Action: "cloudSend",
  248. MsgId: 0,
  249. DeviceCode: args.DeviceId,
  250. SubDeviceId: args.SubDevice,
  251. Timestamp: time.Now().Unix(),
  252. Data: &klink.CloudSendData{
  253. Cmd: args.Cmd,
  254. Params: args.Params,
  255. },
  256. }
  257. msg, err := cmd.Marshal()
  258. if err != nil {
  259. return err
  260. }
  261. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), msg)
  262. }
  263. // GetStatus rpc 获取设备状态
  264. func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  265. server.Log.Infof("Access Get Status: %v", args)
  266. // first send a get status command
  267. cmdArgs := rpcs.ArgsSendCommand{
  268. DeviceId: args.Id,
  269. WaitTime: 0,
  270. SubDevice: args.SubDeviceId,
  271. Cmd: "report",
  272. }
  273. cmdReply := rpcs.ReplySendCommand{}
  274. return a.SendCommand(cmdArgs, &cmdReply)
  275. }
  276. func NewAgent(client SubDev) *Access {
  277. return &Access{
  278. client: client,
  279. }
  280. }
  281. // SendByteData rpc 发送byte数组
  282. func (a *Access) SendByteData(args rpcs.ArgsSendByteData, reply *rpcs.ReplySendCommand) error {
  283. // 查询设备信息
  284. device := &models.Device{}
  285. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  286. if err != nil {
  287. server.Log.Errorf("device not found %s", args.DeviceId)
  288. return nil
  289. }
  290. product := &models.Product{}
  291. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  292. if err != nil {
  293. server.Log.Errorf("device not found %s", args.DeviceId)
  294. return nil
  295. }
  296. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), args.Data)
  297. }