agent.go 7.9 KB


  1. package main
  2. import (
  3. "context"
  4. "github.com/gogf/gf/encoding/gjson"
  5. "sparrow/pkg/klink"
  6. "sparrow/pkg/models"
  7. "sparrow/pkg/protocol"
  8. "sparrow/pkg/rpcs"
  9. "sparrow/pkg/server"
  10. "time"
  11. )
  12. type Access struct {
  13. client SubDev
  14. }
  15. // Message 收到设备上报消息处理
  16. func (a *Access) Message(topic string, payload []byte) error {
  17. topicInfo, err := protocol.GetTopicInfo(topic)
  18. if err != nil {
  19. return err
  20. }
  21. if topicInfo.Direction == protocol.Down {
  22. return nil
  23. }
  24. device := &models.Device{}
  25. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", topicInfo.DeviceCode, device)
  26. if err != nil {
  27. server.Log.Errorf("device not found %s", topicInfo.DeviceCode)
  28. return nil
  29. }
  30. server.Log.Debugf("device {%v} message {%v} : %s", device.DeviceIdentifier, topicInfo.Types, payload)
  31. if len(topicInfo.Types) == 0 {
  32. return nil
  33. }
  34. jsonPayload, err := gjson.DecodeToJson(payload)
  35. if err != nil {
  36. return nil
  37. }
  38. switch topicInfo.Types[0] {
  39. case "status":
  40. return a.processStatus(topicInfo.DeviceCode, device.VendorID, jsonPayload)
  41. case "event":
  42. return a.processEvent(topicInfo.DeviceCode, device.VendorID, jsonPayload)
  43. }
  44. return nil
  45. }
  46. func (a *Access) processEvent(deviceId, vendorId string, message *gjson.Json) error {
  47. reply := rpcs.ReplyOnEvent{}
  48. args := rpcs.ArgsOnEvent{
  49. DeviceId: deviceId,
  50. TimeStamp: message.GetUint64("timestamp"),
  51. SubDeviceId: message.GetString("subDeviceId"),
  52. SubData: message.GetJson("data").MustToJson(),
  53. VendorId: vendorId,
  54. }
  55. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
  56. if err != nil {
  57. server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
  58. return err
  59. }
  60. return nil
  61. }
  62. func (a *Access) processStatus(deviceId, vendorId string, message *gjson.Json) error {
  63. act := klink.PacketAction(message.GetString("action"))
  64. if act != "" {
  65. switch act {
  66. case klink.DevSendAction:
  67. processReportStatus(deviceId, vendorId, message)
  68. case klink.DevLoginAction:
  69. _ = processDevLogin(deviceId, message.GetString("subDeviceId"))
  70. case klink.DevLogoutAction:
  71. _ = processDevLogout(deviceId, message.GetString("subDeviceId"))
  72. case klink.DevNetConfigAction:
  73. _ = processDevNetConfig(deviceId, message.GetString("md5"))
  74. case klink.ReportFirmwareAction:
  75. _ = processDeviceReportUpgrade(deviceId, message.GetString("version"))
  76. }
  77. }
  78. return nil
  79. }
  80. func processDevLogin(deviceCode, subDeviceId string) error {
  81. var args rpcs.SubDeviceArgs
  82. args.DeviceCode = deviceCode
  83. args.Status = 1
  84. args.SubDeviceId = subDeviceId
  85. var reply *models.SubDevice
  86. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  87. if err != nil {
  88. server.Log.Errorf("子设备上线出错:%s", err.Error())
  89. }
  90. return nil
  91. }
  92. func processDevLogout(deviceCode, subDeviceId string) error {
  93. var args rpcs.SubDeviceArgs
  94. args.DeviceCode = deviceCode
  95. args.Status = 0
  96. args.SubDeviceId = subDeviceId
  97. var reply *models.SubDevice
  98. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  99. if err != nil {
  100. server.Log.Errorf("子设备下线出错:%s", err.Error())
  101. }
  102. return nil
  103. }
  104. // 处理设备配网信息
  105. func processDevNetConfig(deviceCode, md5 string) error {
  106. args := &models.DeviceNetConfig{
  107. DeviceIdentifier: deviceCode,
  108. MD5: md5,
  109. Status: 1,
  110. }
  111. reply := rpcs.ReplyCheckDeviceNetConfig{}
  112. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateDeviceNetConfig", args, &reply)
  113. if err != nil {
  114. server.Log.Errorf("set device:%s net config info error:%v", deviceCode, err)
  115. }
  116. return nil
  117. }
  118. // 设备上报固件信息处理
  119. func processDeviceReportUpgrade(deviceId, version string) error {
  120. args := &rpcs.ArgsUpdateDeviceVersion{
  121. DeviceId: deviceId,
  122. Version: version,
  123. }
  124. var reply rpcs.ReplyEmptyResult
  125. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply)
  126. if err != nil {
  127. server.Log.Errorf("更新设备版本号失败:%v", err)
  128. }
  129. return nil
  130. }
  131. func processReportStatus(deviceid, vendorId string, message *gjson.Json) {
  132. reply := rpcs.ReplyOnStatus{}
  133. args := rpcs.ArgsOnStatus{
  134. DeviceId: deviceid,
  135. Timestamp: message.GetUint64("timestamp"),
  136. SubData: message.GetJson("data").MustToJson(),
  137. VendorId: vendorId,
  138. SubDeviceId: message.GetString("subDeviceId"),
  139. Action: klink.PacketAction(message.GetString("action")),
  140. }
  141. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
  142. if err != nil {
  143. server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
  144. return
  145. }
  146. }
  147. // Connected 设备接入时
  148. func (a *Access) Connected(status *protocol.DevConnectStatus) error {
  149. server.Log.Infof("设备上线;%s", status.DeviceId)
  150. // 查询设备信息
  151. device := &models.Device{}
  152. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  153. if err != nil {
  154. server.Log.Errorf("device not found %s", status.DeviceId)
  155. return nil
  156. }
  157. args := rpcs.ArgsGetOnline{
  158. Id: device.DeviceIdentifier,
  159. ClientIP: status.ClientIp,
  160. AccessRPCHost: server.GetRPCHost(),
  161. HeartbeatInterval: 300,
  162. }
  163. reply := rpcs.ReplyGetOnline{}
  164. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnlineV2", args, &reply)
  165. if err != nil {
  166. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  167. }
  168. var cReply rpcs.ReplyEmptyResult
  169. var cArgs rpcs.ArgsGetStatus
  170. cArgs.VendorId = device.VendorID
  171. cArgs.Id = args.Id
  172. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
  173. return err
  174. }
  175. return nil
  176. }
  177. // Disconnected 设备断开连接时
  178. func (a *Access) Disconnected(status *protocol.DevConnectStatus) error {
  179. // 查询设备信息
  180. device := &models.Device{}
  181. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  182. if err != nil {
  183. server.Log.Errorf("device not found %s", status.DeviceId)
  184. return nil
  185. }
  186. args := rpcs.ArgsGetOffline{
  187. Id: device.DeviceIdentifier,
  188. VendorId: device.VendorID,
  189. }
  190. reply := rpcs.ReplyGetOffline{}
  191. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
  192. if err != nil {
  193. server.Log.Errorf("device offline error. deviceid: %v, error: %v", status.DeviceId, err)
  194. }
  195. return err
  196. }
  197. // SendCommand rpc 发送设备命令
  198. func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  199. // 查询设备信息
  200. device := &models.Device{}
  201. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  202. if err != nil {
  203. server.Log.Errorf("device not found %s", args.DeviceId)
  204. return nil
  205. }
  206. product := &models.Product{}
  207. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  208. if err != nil {
  209. server.Log.Errorf("device not found %s", args.DeviceId)
  210. return nil
  211. }
  212. cmd := &klink.CloudSend{
  213. Action: "cloudSend",
  214. MsgId: 0,
  215. DeviceCode: args.DeviceId,
  216. SubDeviceId: args.SubDevice,
  217. Timestamp: time.Now().Unix(),
  218. Data: &klink.CloudSendData{
  219. Cmd: args.Cmd,
  220. Params: args.Params,
  221. },
  222. }
  223. msg, err := cmd.Marshal()
  224. if err != nil {
  225. return err
  226. }
  227. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), msg)
  228. }
  229. // GetStatus rpc 获取设备状态
  230. func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  231. server.Log.Infof("Access Get Status: %v", args)
  232. // first send a get status command
  233. cmdArgs := rpcs.ArgsSendCommand{
  234. DeviceId: args.Id,
  235. WaitTime: 0,
  236. SubDevice: args.SubDeviceId,
  237. Cmd: "report",
  238. }
  239. cmdReply := rpcs.ReplySendCommand{}
  240. return a.SendCommand(cmdArgs, &cmdReply)
  241. }
  242. func NewAgent(client SubDev) *Access {
  243. return &Access{
  244. client: client,
  245. }
  246. }