agent.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. package main
  2. import (
  3. "context"
  4. "github.com/gogf/gf/encoding/gjson"
  5. "sparrow/pkg/klink"
  6. "sparrow/pkg/models"
  7. "sparrow/pkg/protocol"
  8. "sparrow/pkg/rpcs"
  9. "sparrow/pkg/server"
  10. )
  11. type Agent struct {
  12. }
  13. // Message 收到设备上报消息处理
  14. func (a *Agent) Message(topic string, payload []byte) error {
  15. topicInfo, err := protocol.GetTopicInfo(topic)
  16. if err != nil {
  17. return err
  18. }
  19. if topicInfo.Direction == protocol.Down {
  20. return nil
  21. }
  22. device := &models.Device{}
  23. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", topicInfo.DeviceCode, device)
  24. if err != nil {
  25. server.Log.Errorf("device not found %s", topicInfo.DeviceCode)
  26. return nil
  27. }
  28. server.Log.Debugf("device {%v} message {%v} : %s", device.DeviceIdentifier, topicInfo.Types, payload)
  29. if len(topicInfo.Types) == 0 {
  30. return nil
  31. }
  32. jsonPayload, err := gjson.DecodeToJson(payload)
  33. if err != nil {
  34. return nil
  35. }
  36. switch topicInfo.Types[0] {
  37. case "status":
  38. return a.processStatus(topicInfo.DeviceCode, device.VendorID, jsonPayload)
  39. case "event":
  40. return a.processEvent(topicInfo.DeviceCode, device.VendorID, jsonPayload)
  41. }
  42. return nil
  43. }
  44. func (a *Agent) processEvent(deviceId, vendorId string, message *gjson.Json) error {
  45. reply := rpcs.ReplyOnEvent{}
  46. args := rpcs.ArgsOnEvent{
  47. DeviceId: deviceId,
  48. TimeStamp: message.GetUint64("timestamp"),
  49. SubDeviceId: message.GetString("subDeviceId"),
  50. SubData: message.GetJson("data").MustToJson(),
  51. VendorId: vendorId,
  52. }
  53. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
  54. if err != nil {
  55. server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
  56. return err
  57. }
  58. return nil
  59. }
  60. func (a *Agent) processStatus(deviceId, vendorId string, message *gjson.Json) error {
  61. act := klink.PacketAction(message.GetString("action"))
  62. if act != "" {
  63. switch act {
  64. case klink.DevSendAction:
  65. processReportStatus(deviceId, vendorId, message)
  66. case klink.DevLoginAction:
  67. _ = processDevLogin(deviceId, message.GetString("subDeviceId"))
  68. case klink.DevLogoutAction:
  69. _ = processDevLogout(deviceId, message.GetString("subDeviceId"))
  70. case klink.DevNetConfigAction:
  71. _ = processDevNetConfig(deviceId, message.GetString("md5"))
  72. case klink.ReportFirmwareAction:
  73. _ = processDeviceReportUpgrade(deviceId, message.GetString("version"))
  74. }
  75. }
  76. return nil
  77. }
  78. func processDevLogin(deviceCode, subDeviceId string) error {
  79. var args rpcs.SubDeviceArgs
  80. args.DeviceCode = deviceCode
  81. args.Status = 1
  82. args.SubDeviceId = subDeviceId
  83. var reply *models.SubDevice
  84. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  85. if err != nil {
  86. server.Log.Errorf("子设备上线出错:%s", err.Error())
  87. }
  88. return nil
  89. }
  90. func processDevLogout(deviceCode, subDeviceId string) error {
  91. var args rpcs.SubDeviceArgs
  92. args.DeviceCode = deviceCode
  93. args.Status = 0
  94. args.SubDeviceId = subDeviceId
  95. var reply *models.SubDevice
  96. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  97. if err != nil {
  98. server.Log.Errorf("子设备下线出错:%s", err.Error())
  99. }
  100. return nil
  101. }
  102. // 处理设备配网信息
  103. func processDevNetConfig(deviceCode, md5 string) error {
  104. args := &models.DeviceNetConfig{
  105. DeviceIdentifier: deviceCode,
  106. MD5: md5,
  107. Status: 1,
  108. }
  109. reply := rpcs.ReplyCheckDeviceNetConfig{}
  110. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateDeviceNetConfig", args, &reply)
  111. if err != nil {
  112. server.Log.Errorf("set device:%s net config info error:%v", deviceCode, err)
  113. }
  114. return nil
  115. }
  116. // 设备上报固件信息处理
  117. func processDeviceReportUpgrade(deviceId, version string) error {
  118. args := &rpcs.ArgsUpdateDeviceVersion{
  119. DeviceId: deviceId,
  120. Version: version,
  121. }
  122. var reply rpcs.ReplyEmptyResult
  123. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply)
  124. if err != nil {
  125. server.Log.Errorf("更新设备版本号失败:%v", err)
  126. }
  127. return nil
  128. }
  129. func processReportStatus(deviceid, vendorId string, message *gjson.Json) {
  130. reply := rpcs.ReplyOnStatus{}
  131. args := rpcs.ArgsOnStatus{
  132. DeviceId: deviceid,
  133. Timestamp: message.GetUint64("timestamp"),
  134. SubData: message.GetJson("data").MustToJson(),
  135. VendorId: vendorId,
  136. SubDeviceId: message.GetString("subDeviceId"),
  137. Action: klink.PacketAction(message.GetString("action")),
  138. }
  139. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
  140. if err != nil {
  141. server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
  142. return
  143. }
  144. }
  145. // Connected 设备接入时
  146. func (a *Agent) Connected(status *protocol.DevConnectStatus) error {
  147. // 查询设备信息
  148. device := &models.Device{}
  149. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  150. if err != nil {
  151. server.Log.Errorf("device not found %s", status.DeviceId)
  152. return nil
  153. }
  154. args := rpcs.ArgsGetOnline{
  155. Id: device.RecordId,
  156. ClientIP: status.ClientIp,
  157. AccessRPCHost: server.GetRPCHost(),
  158. HeartbeatInterval: 300,
  159. }
  160. reply := rpcs.ReplyGetOnline{}
  161. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnlineV2", args, &reply)
  162. if err != nil {
  163. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  164. }
  165. var cReply rpcs.ReplyEmptyResult
  166. var cArgs rpcs.ArgsGetStatus
  167. cArgs.VendorId = device.VendorID
  168. cArgs.Id = args.Id
  169. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
  170. return err
  171. }
  172. return nil
  173. }
  174. // Disconnected 设备断开连接时
  175. func (a *Agent) Disconnected(status *protocol.DevConnectStatus) error {
  176. // 查询设备信息
  177. device := &models.Device{}
  178. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  179. if err != nil {
  180. server.Log.Errorf("device not found %s", status.DeviceId)
  181. return nil
  182. }
  183. args := rpcs.ArgsGetOffline{
  184. Id: device.RecordId,
  185. VendorId: device.VendorID,
  186. }
  187. reply := rpcs.ReplyGetOffline{}
  188. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
  189. if err != nil {
  190. server.Log.Errorf("device offline error. deviceid: %v, error: %v", status.DeviceId, err)
  191. }
  192. return err
  193. }
  194. func NewAgent() *Agent {
  195. return &Agent{}
  196. }