agent.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. package main
  2. import (
  3. "context"
  4. "github.com/gogf/gf/encoding/gjson"
  5. "sparrow/pkg/klink"
  6. "sparrow/pkg/models"
  7. "sparrow/pkg/protocol"
  8. "sparrow/pkg/rpcs"
  9. "sparrow/pkg/server"
  10. "time"
  11. )
  12. type Agent struct {
  13. client SubDev
  14. }
  15. // Message 收到设备上报消息处理
  16. func (a *Agent) Message(topic string, payload []byte) error {
  17. topicInfo, err := protocol.GetTopicInfo(topic)
  18. if err != nil {
  19. return err
  20. }
  21. if topicInfo.Direction == protocol.Down {
  22. return nil
  23. }
  24. device := &models.Device{}
  25. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", topicInfo.DeviceCode, device)
  26. if err != nil {
  27. server.Log.Errorf("device not found %s", topicInfo.DeviceCode)
  28. return nil
  29. }
  30. server.Log.Debugf("device {%v} message {%v} : %s", device.DeviceIdentifier, topicInfo.Types, payload)
  31. if len(topicInfo.Types) == 0 {
  32. return nil
  33. }
  34. jsonPayload, err := gjson.DecodeToJson(payload)
  35. if err != nil {
  36. return nil
  37. }
  38. switch topicInfo.Types[0] {
  39. case "status":
  40. return a.processStatus(topicInfo.DeviceCode, device.VendorID, jsonPayload)
  41. case "event":
  42. return a.processEvent(topicInfo.DeviceCode, device.VendorID, jsonPayload)
  43. }
  44. return nil
  45. }
  46. func (a *Agent) processEvent(deviceId, vendorId string, message *gjson.Json) error {
  47. reply := rpcs.ReplyOnEvent{}
  48. args := rpcs.ArgsOnEvent{
  49. DeviceId: deviceId,
  50. TimeStamp: message.GetUint64("timestamp"),
  51. SubDeviceId: message.GetString("subDeviceId"),
  52. SubData: message.GetJson("data").MustToJson(),
  53. VendorId: vendorId,
  54. }
  55. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
  56. if err != nil {
  57. server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
  58. return err
  59. }
  60. return nil
  61. }
  62. func (a *Agent) processStatus(deviceId, vendorId string, message *gjson.Json) error {
  63. act := klink.PacketAction(message.GetString("action"))
  64. if act != "" {
  65. switch act {
  66. case klink.DevSendAction:
  67. processReportStatus(deviceId, vendorId, message)
  68. case klink.DevLoginAction:
  69. _ = processDevLogin(deviceId, message.GetString("subDeviceId"))
  70. case klink.DevLogoutAction:
  71. _ = processDevLogout(deviceId, message.GetString("subDeviceId"))
  72. case klink.DevNetConfigAction:
  73. _ = processDevNetConfig(deviceId, message.GetString("md5"))
  74. case klink.ReportFirmwareAction:
  75. _ = processDeviceReportUpgrade(deviceId, message.GetString("version"))
  76. }
  77. }
  78. return nil
  79. }
  80. func processDevLogin(deviceCode, subDeviceId string) error {
  81. var args rpcs.SubDeviceArgs
  82. args.DeviceCode = deviceCode
  83. args.Status = 1
  84. args.SubDeviceId = subDeviceId
  85. var reply *models.SubDevice
  86. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  87. if err != nil {
  88. server.Log.Errorf("子设备上线出错:%s", err.Error())
  89. }
  90. return nil
  91. }
  92. func processDevLogout(deviceCode, subDeviceId string) error {
  93. var args rpcs.SubDeviceArgs
  94. args.DeviceCode = deviceCode
  95. args.Status = 0
  96. args.SubDeviceId = subDeviceId
  97. var reply *models.SubDevice
  98. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  99. if err != nil {
  100. server.Log.Errorf("子设备下线出错:%s", err.Error())
  101. }
  102. return nil
  103. }
  104. // 处理设备配网信息
  105. func processDevNetConfig(deviceCode, md5 string) error {
  106. args := &models.DeviceNetConfig{
  107. DeviceIdentifier: deviceCode,
  108. MD5: md5,
  109. Status: 1,
  110. }
  111. reply := rpcs.ReplyCheckDeviceNetConfig{}
  112. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateDeviceNetConfig", args, &reply)
  113. if err != nil {
  114. server.Log.Errorf("set device:%s net config info error:%v", deviceCode, err)
  115. }
  116. return nil
  117. }
  118. // 设备上报固件信息处理
  119. func processDeviceReportUpgrade(deviceId, version string) error {
  120. args := &rpcs.ArgsUpdateDeviceVersion{
  121. DeviceId: deviceId,
  122. Version: version,
  123. }
  124. var reply rpcs.ReplyEmptyResult
  125. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply)
  126. if err != nil {
  127. server.Log.Errorf("更新设备版本号失败:%v", err)
  128. }
  129. return nil
  130. }
  131. func processReportStatus(deviceid, vendorId string, message *gjson.Json) {
  132. reply := rpcs.ReplyOnStatus{}
  133. args := rpcs.ArgsOnStatus{
  134. DeviceId: deviceid,
  135. Timestamp: message.GetUint64("timestamp"),
  136. SubData: message.GetJson("data").MustToJson(),
  137. VendorId: vendorId,
  138. SubDeviceId: message.GetString("subDeviceId"),
  139. Action: klink.PacketAction(message.GetString("action")),
  140. }
  141. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
  142. if err != nil {
  143. server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
  144. return
  145. }
  146. }
  147. // Connected 设备接入时
  148. func (a *Agent) Connected(status *protocol.DevConnectStatus) error {
  149. // 查询设备信息
  150. device := &models.Device{}
  151. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  152. if err != nil {
  153. server.Log.Errorf("device not found %s", status.DeviceId)
  154. return nil
  155. }
  156. args := rpcs.ArgsGetOnline{
  157. Id: device.DeviceIdentifier,
  158. ClientIP: status.ClientIp,
  159. AccessRPCHost: server.GetRPCHost(),
  160. HeartbeatInterval: 300,
  161. }
  162. reply := rpcs.ReplyGetOnline{}
  163. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnlineV2", args, &reply)
  164. if err != nil {
  165. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  166. }
  167. var cReply rpcs.ReplyEmptyResult
  168. var cArgs rpcs.ArgsGetStatus
  169. cArgs.VendorId = device.VendorID
  170. cArgs.Id = args.Id
  171. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
  172. return err
  173. }
  174. return nil
  175. }
  176. // Disconnected 设备断开连接时
  177. func (a *Agent) Disconnected(status *protocol.DevConnectStatus) error {
  178. // 查询设备信息
  179. device := &models.Device{}
  180. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  181. if err != nil {
  182. server.Log.Errorf("device not found %s", status.DeviceId)
  183. return nil
  184. }
  185. args := rpcs.ArgsGetOffline{
  186. Id: device.DeviceIdentifier,
  187. VendorId: device.VendorID,
  188. }
  189. reply := rpcs.ReplyGetOffline{}
  190. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
  191. if err != nil {
  192. server.Log.Errorf("device offline error. deviceid: %v, error: %v", status.DeviceId, err)
  193. }
  194. return err
  195. }
  196. // SendCommand rpc 发送设备命令
  197. func (a *Agent) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  198. // 查询设备信息
  199. device := &models.Device{}
  200. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  201. if err != nil {
  202. server.Log.Errorf("device not found %s", args.DeviceId)
  203. return nil
  204. }
  205. product := &models.Product{}
  206. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  207. if err != nil {
  208. server.Log.Errorf("device not found %s", args.DeviceId)
  209. return nil
  210. }
  211. cmd := &klink.CloudSend{
  212. Action: "cloudSend",
  213. MsgId: 0,
  214. DeviceCode: args.DeviceId,
  215. SubDeviceId: args.SubDevice,
  216. Timestamp: time.Now().Unix(),
  217. Data: &klink.CloudSendData{
  218. Cmd: args.Cmd,
  219. Params: args.Params,
  220. },
  221. }
  222. msg, err := cmd.Marshal()
  223. if err != nil {
  224. return err
  225. }
  226. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), msg)
  227. }
  228. // GetStatus rpc 获取设备状态
  229. func (a *Agent) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  230. server.Log.Infof("Access Get Status: %v", args)
  231. // first send a get status command
  232. cmdArgs := rpcs.ArgsSendCommand{
  233. DeviceId: args.Id,
  234. WaitTime: 0,
  235. SubDevice: args.SubDeviceId,
  236. Cmd: "report",
  237. }
  238. cmdReply := rpcs.ReplySendCommand{}
  239. return a.SendCommand(cmdArgs, &cmdReply)
  240. }
  241. func NewAgent(client SubDev) *Agent {
  242. return &Agent{
  243. client: client,
  244. }
  245. }