agent.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. package main
  2. import (
  3. "context"
  4. "github.com/gogf/gf/encoding/gjson"
  5. "sparrow/pkg/klink"
  6. "sparrow/pkg/models"
  7. "sparrow/pkg/protocol"
  8. "sparrow/pkg/rpcs"
  9. "sparrow/pkg/server"
  10. "time"
  11. )
  12. type Access struct {
  13. client SubDev
  14. }
  15. // Message 收到设备上报消息处理
  16. func (a *Access) Message(topic string, payload []byte) error {
  17. topicInfo, err := protocol.GetTopicInfo(topic)
  18. if err != nil {
  19. return err
  20. }
  21. if topicInfo.Direction == protocol.Down {
  22. return nil
  23. }
  24. device := &models.Device{}
  25. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", topicInfo.DeviceCode, device)
  26. if err != nil {
  27. server.Log.Errorf("device not found %s", topicInfo.DeviceCode)
  28. return nil
  29. }
  30. server.Log.Debugf("device {%v} message {%v} : %s", device.DeviceIdentifier, topicInfo.Types, payload)
  31. if len(topicInfo.Types) == 0 {
  32. return nil
  33. }
  34. jsonPayload, err := gjson.DecodeToJson(payload)
  35. if err != nil {
  36. return nil
  37. }
  38. switch topicInfo.Types[0] {
  39. case "status":
  40. return a.processStatus(topicInfo, device.VendorID, jsonPayload)
  41. case "event":
  42. return a.processEvent(topicInfo, device.VendorID, jsonPayload)
  43. }
  44. return nil
  45. }
  46. func (a *Access) processEvent(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  47. reply := rpcs.ReplyOnEvent{}
  48. args := rpcs.ArgsOnEvent{
  49. DeviceId: topicInfo.DeviceCode,
  50. TimeStamp: message.GetUint64("timestamp"),
  51. SubDeviceId: message.GetString("subDeviceId"),
  52. SubData: message.GetJson("data").MustToJson(),
  53. VendorId: vendorId,
  54. }
  55. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
  56. if err != nil {
  57. server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
  58. return err
  59. }
  60. return nil
  61. }
  62. func (a *Access) processStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  63. act := klink.PacketAction(message.GetString("action"))
  64. if act != "" {
  65. switch act {
  66. case klink.DevSendAction:
  67. processReportStatus(topicInfo.DeviceCode, vendorId, message)
  68. case klink.DevLoginAction:
  69. _ = processDevLogin(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  70. case klink.DevLogoutAction:
  71. _ = processDevLogout(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  72. case klink.DevNetConfigAction:
  73. _ = processDevNetConfig(topicInfo.DeviceCode, message.GetString("md5"))
  74. case klink.ReportFirmwareAction:
  75. _ = processDeviceReportUpgrade(topicInfo.DeviceCode, message.GetString("version"))
  76. case klink.DevUpgradeAction:
  77. _ = processDeviceUpgrade(topicInfo.DeviceCode, message)
  78. }
  79. }
  80. return nil
  81. }
  82. func processDevLogin(deviceCode, subDeviceId string) error {
  83. var args rpcs.SubDeviceArgs
  84. args.DeviceCode = deviceCode
  85. args.Status = 1
  86. args.SubDeviceId = subDeviceId
  87. var reply *models.SubDevice
  88. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  89. if err != nil {
  90. server.Log.Errorf("子设备上线出错:%s", err.Error())
  91. }
  92. return nil
  93. }
  94. func processDevLogout(deviceCode, subDeviceId string) error {
  95. var args rpcs.SubDeviceArgs
  96. args.DeviceCode = deviceCode
  97. args.Status = 0
  98. args.SubDeviceId = subDeviceId
  99. var reply *models.SubDevice
  100. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  101. if err != nil {
  102. server.Log.Errorf("子设备下线出错:%s", err.Error())
  103. }
  104. return nil
  105. }
  106. // 处理设备配网信息
  107. func processDevNetConfig(deviceCode, md5 string) error {
  108. args := &models.DeviceNetConfig{
  109. DeviceIdentifier: deviceCode,
  110. MD5: md5,
  111. Status: 1,
  112. }
  113. reply := rpcs.ReplyCheckDeviceNetConfig{}
  114. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateDeviceNetConfig", args, &reply)
  115. if err != nil {
  116. server.Log.Errorf("set device:%s net config info error:%v", deviceCode, err)
  117. }
  118. return nil
  119. }
  120. // 设备上报固件信息处理
  121. func processDeviceReportUpgrade(deviceId, version string) error {
  122. args := &rpcs.ArgsUpdateDeviceVersion{
  123. DeviceId: deviceId,
  124. Version: version,
  125. }
  126. var reply rpcs.ReplyEmptyResult
  127. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply)
  128. if err != nil {
  129. server.Log.Errorf("更新设备版本号失败:%v", err)
  130. }
  131. return nil
  132. }
  133. func processReportStatus(deviceid, vendorId string, message *gjson.Json) {
  134. reply := rpcs.ReplyOnStatus{}
  135. args := rpcs.ArgsOnStatus{
  136. DeviceId: deviceid,
  137. Timestamp: message.GetUint64("timestamp"),
  138. SubData: message.GetJson("data").MustToJson(),
  139. VendorId: vendorId,
  140. SubDeviceId: message.GetString("subDeviceId"),
  141. Action: klink.PacketAction(message.GetString("action")),
  142. }
  143. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
  144. if err != nil {
  145. server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
  146. return
  147. }
  148. }
  149. func processDeviceUpgrade(deviceId string, message *gjson.Json) error {
  150. var reply rpcs.ReplyEmptyResult
  151. data := gjson.New(message.GetJson("data").MustToJson())
  152. switch data.GetString("cmd") {
  153. case "download":
  154. params := gjson.New(data.GetJson("params").MustToJson())
  155. args := &rpcs.ChunkUpgrade{
  156. DeviceId: deviceId,
  157. FileId: params.GetString("fileId"),
  158. FileSize: params.GetInt64("fileSize"),
  159. Size: params.GetInt64("size"),
  160. Offset: params.GetInt("offset"),
  161. }
  162. err := server.RPCCallByName(nil, rpcs.MQTTAccessName, "Access.ChunkUpgrade", args, &reply)
  163. if err != nil {
  164. server.Log.Errorf("分片下载发送失败:%v", err)
  165. }
  166. case "downProgress":
  167. params := gjson.New(data.GetJson("params").MustToJson())
  168. var args rpcs.ArgsOtaProgress
  169. args.DeviceId = deviceId
  170. args.Progress = params.GetInt("progress")
  171. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.UpdateProgress", args, &reply)
  172. if err != nil {
  173. server.Log.Errorf("OTA升级进度保存失败:%v", err)
  174. return err
  175. }
  176. }
  177. return nil
  178. }
  179. // Connected 设备接入时
  180. func (a *Access) Connected(status *protocol.DevConnectStatus) error {
  181. server.Log.Infof("设备上线;%s", status.DeviceId)
  182. // 查询设备信息
  183. device := &models.Device{}
  184. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  185. if err != nil {
  186. server.Log.Errorf("device not found %s", status.DeviceId)
  187. return nil
  188. }
  189. args := rpcs.ArgsGetOnline{
  190. Id: device.DeviceIdentifier,
  191. ClientIP: status.ClientIp,
  192. AccessRPCHost: server.GetRPCHost(),
  193. HeartbeatInterval: 300,
  194. }
  195. reply := rpcs.ReplyGetOnline{}
  196. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnlineV2", args, &reply)
  197. if err != nil {
  198. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  199. }
  200. var cReply rpcs.ReplyEmptyResult
  201. var cArgs rpcs.ArgsGetStatus
  202. cArgs.VendorId = device.VendorID
  203. cArgs.Id = args.Id
  204. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
  205. return err
  206. }
  207. return nil
  208. }
  209. // Disconnected 设备断开连接时
  210. func (a *Access) Disconnected(status *protocol.DevConnectStatus) error {
  211. // 查询设备信息
  212. device := &models.Device{}
  213. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  214. if err != nil {
  215. server.Log.Errorf("device not found %s", status.DeviceId)
  216. return nil
  217. }
  218. args := rpcs.ArgsGetOffline{
  219. Id: device.DeviceIdentifier,
  220. VendorId: device.VendorID,
  221. }
  222. reply := rpcs.ReplyGetOffline{}
  223. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
  224. if err != nil {
  225. server.Log.Errorf("device offline error. deviceid: %v, error: %v", status.DeviceId, err)
  226. }
  227. return err
  228. }
  229. // SendCommand rpc 发送设备命令
  230. func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  231. // 查询设备信息
  232. device := &models.Device{}
  233. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  234. if err != nil {
  235. server.Log.Errorf("device not found %s", args.DeviceId)
  236. return nil
  237. }
  238. product := &models.Product{}
  239. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  240. if err != nil {
  241. server.Log.Errorf("device not found %s", args.DeviceId)
  242. return nil
  243. }
  244. cmd := &klink.CloudSend{
  245. Action: "cloudSend",
  246. MsgId: 0,
  247. DeviceCode: args.DeviceId,
  248. SubDeviceId: args.SubDevice,
  249. Timestamp: time.Now().Unix(),
  250. Data: &klink.CloudSendData{
  251. Cmd: args.Cmd,
  252. Params: args.Params,
  253. },
  254. }
  255. msg, err := cmd.Marshal()
  256. if err != nil {
  257. return err
  258. }
  259. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), msg)
  260. }
  261. // GetStatus rpc 获取设备状态
  262. func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  263. server.Log.Infof("Access Get Status: %v", args)
  264. // first send a get status command
  265. cmdArgs := rpcs.ArgsSendCommand{
  266. DeviceId: args.Id,
  267. WaitTime: 0,
  268. SubDevice: args.SubDeviceId,
  269. Cmd: "report",
  270. }
  271. cmdReply := rpcs.ReplySendCommand{}
  272. return a.SendCommand(cmdArgs, &cmdReply)
  273. }
  274. func NewAgent(client SubDev) *Access {
  275. return &Access{
  276. client: client,
  277. }
  278. }
  279. // SendByteData rpc 发送byte数组
  280. func (a *Access) SendByteData(args rpcs.ArgsSendByteData, reply *rpcs.ReplySendCommand) error {
  281. // 查询设备信息
  282. device := &models.Device{}
  283. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  284. if err != nil {
  285. server.Log.Errorf("device not found %s", args.DeviceId)
  286. return nil
  287. }
  288. product := &models.Product{}
  289. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  290. if err != nil {
  291. server.Log.Errorf("device not found %s", args.DeviceId)
  292. return nil
  293. }
  294. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), args.Data)
  295. }