agent.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "github.com/gogf/gf/encoding/gjson"
  8. "github.com/gogf/gf/util/gconv"
  9. "github.com/gogf/gf/v2/encoding/gbinary"
  10. "sparrow/pkg/klink"
  11. "sparrow/pkg/models"
  12. "sparrow/pkg/protocol"
  13. "sparrow/pkg/rpcs"
  14. "sparrow/pkg/server"
  15. "sync"
  16. "time"
  17. )
  18. type Access struct {
  19. client SubDev
  20. lockedDevices map[string]*Device
  21. }
  22. func NewAgent(client SubDev) *Access {
  23. a := &Access{
  24. client: client,
  25. lockedDevices: make(map[string]*Device),
  26. }
  27. go a.UnlockDevice()
  28. return a
  29. }
  30. type Device struct {
  31. Id string
  32. Locked bool
  33. LastSeen time.Time
  34. Mutex sync.Mutex
  35. }
  36. // Message 收到设备上报消息处理
  37. func (a *Access) Message(topic string, payload []byte) error {
  38. topicInfo, err := protocol.GetTopicInfo(topic)
  39. if err != nil {
  40. return err
  41. }
  42. if topicInfo.Direction == protocol.Down {
  43. return nil
  44. }
  45. device := &models.Device{}
  46. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", topicInfo.DeviceCode, device)
  47. if err != nil {
  48. server.Log.Errorf("device not found %s", topicInfo.DeviceCode)
  49. return nil
  50. }
  51. server.Log.Debugf("device {%v} message {%v} : %s", device.DeviceIdentifier, topicInfo.Types, payload)
  52. if len(topicInfo.Types) == 0 {
  53. return nil
  54. }
  55. jsonPayload, err := gjson.DecodeToJson(payload)
  56. if err != nil {
  57. return nil
  58. }
  59. switch topicInfo.Types[0] {
  60. case "status":
  61. return a.processStatus(topicInfo, device.VendorID, jsonPayload)
  62. case "event":
  63. return a.processEvent(topicInfo, device.VendorID, jsonPayload)
  64. }
  65. return nil
  66. }
  67. func (a *Access) processEvent(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  68. reply := rpcs.ReplyOnEvent{}
  69. args := rpcs.ArgsOnEvent{
  70. DeviceId: topicInfo.DeviceCode,
  71. TimeStamp: message.GetUint64("timestamp"),
  72. SubDeviceId: message.GetString("subDeviceId"),
  73. SubData: message.GetJson("data").MustToJson(),
  74. VendorId: vendorId,
  75. ProductKey: topicInfo.ProductKey,
  76. }
  77. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
  78. if err != nil {
  79. server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
  80. return err
  81. }
  82. return nil
  83. }
  84. func (a *Access) processStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  85. act := klink.PacketAction(message.GetString("action"))
  86. if act != "" {
  87. switch act {
  88. case klink.DevSendAction:
  89. processReportStatus(topicInfo, vendorId, message)
  90. case klink.DevLoginAction:
  91. _ = processDevLogin(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  92. case klink.DevLogoutAction:
  93. _ = processDevLogout(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  94. case klink.DevNetConfigAction:
  95. _ = processDevNetConfig(topicInfo.DeviceCode, message.GetString("md5"))
  96. case klink.ReportFirmwareAction:
  97. _ = processDeviceReportUpgrade(topicInfo.DeviceCode, message.GetString("version"))
  98. case klink.DevUpgradeAction:
  99. _ = a.processDeviceUpgrade(topicInfo.DeviceCode, message)
  100. }
  101. }
  102. return nil
  103. }
  104. func processDevLogin(deviceCode, subDeviceId string) error {
  105. var args rpcs.SubDeviceArgs
  106. args.DeviceCode = deviceCode
  107. args.Status = 1
  108. args.SubDeviceId = subDeviceId
  109. var reply *models.SubDevice
  110. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  111. if err != nil {
  112. server.Log.Errorf("子设备上线出错:%s", err.Error())
  113. }
  114. return nil
  115. }
  116. func processDevLogout(deviceCode, subDeviceId string) error {
  117. var args rpcs.SubDeviceArgs
  118. args.DeviceCode = deviceCode
  119. args.Status = 0
  120. args.SubDeviceId = subDeviceId
  121. var reply *models.SubDevice
  122. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  123. if err != nil {
  124. server.Log.Errorf("子设备下线出错:%s", err.Error())
  125. }
  126. return nil
  127. }
  128. // 处理设备配网信息
  129. func processDevNetConfig(deviceCode, md5 string) error {
  130. args := &models.DeviceNetConfig{
  131. DeviceIdentifier: deviceCode,
  132. MD5: md5,
  133. Status: 1,
  134. }
  135. reply := rpcs.ReplyCheckDeviceNetConfig{}
  136. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateDeviceNetConfig", args, &reply)
  137. if err != nil {
  138. server.Log.Errorf("set device:%s net config info error:%v", deviceCode, err)
  139. }
  140. return nil
  141. }
  142. // 设备上报固件信息处理
  143. func processDeviceReportUpgrade(deviceId, version string) error {
  144. args := &rpcs.ArgsUpdateDeviceVersion{
  145. DeviceId: deviceId,
  146. Version: version,
  147. }
  148. var reply rpcs.ReplyEmptyResult
  149. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply)
  150. if err != nil {
  151. server.Log.Errorf("更新设备版本号失败:%v", err)
  152. }
  153. return nil
  154. }
  155. func processReportStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) {
  156. reply := rpcs.ReplyOnStatus{}
  157. args := rpcs.ArgsOnStatus{
  158. DeviceId: topicInfo.DeviceCode,
  159. Timestamp: message.GetUint64("timestamp"),
  160. SubData: message.GetJson("data").MustToJson(),
  161. VendorId: vendorId,
  162. ProductKey: topicInfo.ProductKey,
  163. SubDeviceId: message.GetString("subDeviceId"),
  164. Action: klink.PacketAction(message.GetString("action")),
  165. }
  166. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
  167. if err != nil {
  168. server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
  169. return
  170. }
  171. }
  172. func (a *Access) processDeviceUpgrade(deviceId string, message *gjson.Json) error {
  173. var reply rpcs.ReplyEmptyResult
  174. data := gjson.New(message.GetJson("data").MustToJson())
  175. switch data.GetString("cmd") {
  176. case "download":
  177. params := gjson.New(data.GetJson("params").MustToJson())
  178. args := &rpcs.ChunkUpgrade{
  179. DeviceId: deviceId,
  180. FileId: params.GetInt("fileId"),
  181. FileSize: params.GetInt64("fileSize"),
  182. Size: params.GetInt64("size"),
  183. Offset: params.GetInt("offset"),
  184. }
  185. err := a.chunkUpgrade(*args)
  186. if err != nil {
  187. server.Log.Errorf("分片下载发送失败:%v", err)
  188. }
  189. case "downProgress":
  190. params := gjson.New(data.GetJson("params").MustToJson())
  191. var args rpcs.ArgsOtaProgress
  192. args.DeviceId = deviceId
  193. args.Progress = params.GetInt("progress")
  194. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.UpdateProgress", args, &reply)
  195. if err != nil {
  196. server.Log.Errorf("OTA升级进度保存失败:%v", err)
  197. return err
  198. }
  199. case "finish":
  200. device := a.GetLockDevice(deviceId)
  201. device.Mutex.Lock()
  202. if device != nil {
  203. device.Locked = false
  204. }
  205. device.Mutex.Unlock()
  206. server.Log.Infof("OTA升级完成;%s", deviceId)
  207. }
  208. return nil
  209. }
  210. // Connected 设备接入时
  211. func (a *Access) Connected(status *protocol.DevConnectStatus) error {
  212. server.Log.Infof("设备上线;%s", status.DeviceId)
  213. // 查询设备信息
  214. device := &models.Device{}
  215. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  216. if err != nil {
  217. server.Log.Errorf("device not found %s", status.DeviceId)
  218. return nil
  219. }
  220. args := rpcs.ArgsGetOnline{
  221. Id: device.DeviceIdentifier,
  222. ClientIP: status.ClientIp,
  223. AccessRPCHost: server.GetRPCHost(),
  224. HeartbeatInterval: 300,
  225. }
  226. reply := rpcs.ReplyGetOnline{}
  227. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnlineV2", args, &reply)
  228. if err != nil {
  229. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  230. }
  231. var cReply rpcs.ReplyEmptyResult
  232. var cArgs rpcs.ArgsGetStatus
  233. cArgs.VendorId = device.VendorID
  234. cArgs.Id = args.Id
  235. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
  236. return err
  237. }
  238. return nil
  239. }
  240. // Disconnected 设备断开连接时
  241. func (a *Access) Disconnected(status *protocol.DevConnectStatus) error {
  242. // 查询设备信息
  243. device := &models.Device{}
  244. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  245. if err != nil {
  246. server.Log.Errorf("device not found %s", status.DeviceId)
  247. return nil
  248. }
  249. args := rpcs.ArgsGetOffline{
  250. Id: device.DeviceIdentifier,
  251. VendorId: device.VendorID,
  252. }
  253. reply := rpcs.ReplyGetOffline{}
  254. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
  255. if err != nil {
  256. server.Log.Errorf("device offline error. deviceid: %v, error: %v", status.DeviceId, err)
  257. }
  258. return err
  259. }
  260. // SendCommand rpc 发送设备命令
  261. func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  262. // 查询设备信息
  263. lockDevice := a.GetLockDevice(args.DeviceId)
  264. lockDevice.Mutex.Lock()
  265. if lockDevice.Locked {
  266. return errors.New("设备正在进行OTA升级,请稍后重试")
  267. }
  268. lockDevice.Mutex.Unlock()
  269. device := &models.Device{}
  270. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  271. if err != nil {
  272. server.Log.Errorf("device not found %s", args.DeviceId)
  273. return nil
  274. }
  275. product := &models.Product{}
  276. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  277. if err != nil {
  278. server.Log.Errorf("device not found %s", args.DeviceId)
  279. return nil
  280. }
  281. cmd := &klink.CloudSend{
  282. Action: "cloudSend",
  283. MsgId: 0,
  284. DeviceCode: args.DeviceId,
  285. SubDeviceId: args.SubDevice,
  286. Timestamp: time.Now().Unix(),
  287. Data: &klink.CloudSendData{
  288. Cmd: args.Cmd,
  289. Params: args.Params,
  290. },
  291. }
  292. msg, err := cmd.Marshal()
  293. if err != nil {
  294. return err
  295. }
  296. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), msg)
  297. }
  298. // GetStatus rpc 获取设备状态
  299. func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  300. server.Log.Infof("Access Get Status: %v", args)
  301. // first send a get status command
  302. cmdArgs := rpcs.ArgsSendCommand{
  303. DeviceId: args.Id,
  304. WaitTime: 0,
  305. SubDevice: args.SubDeviceId,
  306. Cmd: "report",
  307. }
  308. cmdReply := rpcs.ReplySendCommand{}
  309. return a.SendCommand(cmdArgs, &cmdReply)
  310. }
  311. func (a *Access) chunkUpgrade(params rpcs.ChunkUpgrade) error {
  312. lockDevice := a.GetLockDevice(params.DeviceId)
  313. lockDevice.Mutex.Lock()
  314. lockDevice.Locked = true
  315. lockDevice.LastSeen = time.Now()
  316. lockDevice.Mutex.Unlock()
  317. server.Log.Infof("4G模组OTA升级:%s", params.DeviceId)
  318. buf := bytes.NewBuffer(gbinary.BeEncodeUint16(gconv.Uint16(params.Offset)))
  319. var fileArgs rpcs.ArgsOtaFile
  320. fileArgs.FileId = params.FileId
  321. var fileReply rpcs.ReplyOtaFile
  322. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetFile", fileArgs, &fileReply)
  323. if err != nil {
  324. server.Log.Errorf("OTA升级文件保存失败:%v", err)
  325. return err
  326. }
  327. if fileReply.File == nil {
  328. return errors.New(fmt.Sprintf("文件:%s 获取失败", params.FileId))
  329. }
  330. start := params.Offset * int(params.Size)
  331. stop := (params.Offset + 1) * int(params.Size)
  332. if stop >= len(fileReply.File) {
  333. stop = len(fileReply.File)
  334. }
  335. if stop < start {
  336. start = stop
  337. }
  338. server.Log.Infof("文件截取位置------: start:%d,stop:%d", start, stop)
  339. data := fileReply.File[start:stop]
  340. buf.Write(gbinary.BeEncodeUint16(gconv.Uint16(len(data))))
  341. buf.Write(data)
  342. server.Log.Infof("1----------填充文件:%2X", buf.Bytes())
  343. var mCrc crc
  344. checkSum := mCrc.reset().pushBytes(buf.Bytes()).value()
  345. buf.Write([]byte{byte(checkSum), byte(checkSum >> 8)})
  346. server.Log.Infof("2----------填充CRC:%2X", buf.Bytes())
  347. var SendByteArgs rpcs.ArgsSendByteData
  348. SendByteArgs.DeviceId = params.DeviceId
  349. SendByteArgs.Data = buf.Bytes()
  350. replay := new(rpcs.ReplySendCommand)
  351. err = a.SendByteData(SendByteArgs, replay)
  352. return nil
  353. }
  354. // SendByteData rpc 发送byte数组
  355. func (a *Access) SendByteData(args rpcs.ArgsSendByteData, reply *rpcs.ReplySendCommand) error {
  356. // 查询设备信息
  357. device := &models.Device{}
  358. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  359. if err != nil {
  360. server.Log.Errorf("device not found %s", args.DeviceId)
  361. return err
  362. }
  363. product := &models.Product{}
  364. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  365. if err != nil {
  366. server.Log.Errorf("device not found %s", args.DeviceId)
  367. return err
  368. }
  369. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), args.Data)
  370. }
  371. func (a *Access) GetLockDevice(id string) *Device {
  372. if d, exists := a.lockedDevices[id]; exists {
  373. return d
  374. }
  375. device := &Device{Id: id, Locked: false}
  376. a.lockedDevices[id] = device
  377. return device
  378. }
  379. func (a *Access) UnlockDevice() {
  380. for {
  381. time.Sleep(5 * time.Second) // 每5秒检查一次
  382. for _, device := range a.lockedDevices {
  383. device.Mutex.Lock()
  384. if device.Locked && time.Since(device.LastSeen) > 1*time.Minute {
  385. device.Locked = false
  386. server.Log.Infof("Device %s unlocked\n", device.Id)
  387. }
  388. device.Mutex.Unlock()
  389. }
  390. }
  391. }