agent.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "github.com/gogf/gf/encoding/gjson"
  8. "github.com/gogf/gf/util/gconv"
  9. "github.com/gogf/gf/v2/encoding/gbinary"
  10. "sparrow/pkg/klink"
  11. "sparrow/pkg/models"
  12. "sparrow/pkg/protocol"
  13. "sparrow/pkg/rpcs"
  14. "sparrow/pkg/server"
  15. "sync"
  16. "time"
  17. )
  18. type Access struct {
  19. client SubDev
  20. lockedDevices map[string]*Device
  21. }
  22. func NewAgent(client SubDev) *Access {
  23. a := &Access{
  24. client: client,
  25. lockedDevices: make(map[string]*Device),
  26. }
  27. go a.UnlockDevice()
  28. return a
  29. }
  30. type Device struct {
  31. Id string
  32. Locked bool
  33. LastSeen time.Time
  34. Mutex sync.Mutex
  35. }
  36. // Message 收到设备上报消息处理
  37. func (a *Access) Message(topic string, payload []byte) error {
  38. topicInfo, err := protocol.GetTopicInfo(topic)
  39. if err != nil {
  40. return err
  41. }
  42. if topicInfo.Direction == protocol.Down {
  43. return nil
  44. }
  45. device := &models.Device{}
  46. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", topicInfo.DeviceCode, device)
  47. if err != nil {
  48. server.Log.Errorf("device not found %s", topicInfo.DeviceCode)
  49. return nil
  50. }
  51. server.Log.Debugf("device {%v} message {%v} : %s", device.DeviceIdentifier, topicInfo.Types, payload)
  52. if len(topicInfo.Types) == 0 {
  53. return nil
  54. }
  55. jsonPayload, err := gjson.DecodeToJson(payload)
  56. if err != nil {
  57. return nil
  58. }
  59. switch topicInfo.Types[0] {
  60. case "status":
  61. return a.processStatus(topicInfo, device.VendorID, jsonPayload)
  62. case "event":
  63. return a.processEvent(topicInfo, device.VendorID, jsonPayload)
  64. }
  65. return nil
  66. }
  67. func (a *Access) processEvent(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  68. reply := rpcs.ReplyOnEvent{}
  69. args := rpcs.ArgsOnEvent{
  70. DeviceId: topicInfo.DeviceCode,
  71. TimeStamp: message.GetUint64("timestamp"),
  72. SubDeviceId: message.GetString("subDeviceId"),
  73. SubData: message.GetJson("data").MustToJson(),
  74. VendorId: vendorId,
  75. ProductKey: topicInfo.ProductKey,
  76. }
  77. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
  78. if err != nil {
  79. server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
  80. return err
  81. }
  82. return nil
  83. }
  84. func (a *Access) processStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  85. act := klink.PacketAction(message.GetString("action"))
  86. if act != "" {
  87. switch act {
  88. case klink.DevSendAction:
  89. processReportStatus(topicInfo, vendorId, message)
  90. case klink.DevLoginAction:
  91. _ = processDevLogin(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  92. case klink.DevLogoutAction:
  93. _ = processDevLogout(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  94. case klink.DevNetConfigAction:
  95. _ = processDevNetConfig(topicInfo.DeviceCode, message.GetString("md5"))
  96. case klink.ReportFirmwareAction:
  97. _ = processDeviceReportUpgrade(topicInfo.DeviceCode, message.GetString("version"))
  98. case klink.DevUpgradeAction:
  99. _ = a.processDeviceUpgrade(topicInfo.DeviceCode, message)
  100. }
  101. }
  102. return nil
  103. }
  104. func processDevLogin(deviceCode, subDeviceId string) error {
  105. var args rpcs.SubDeviceArgs
  106. args.DeviceCode = deviceCode
  107. args.Status = 1
  108. args.SubDeviceId = subDeviceId
  109. var reply *models.SubDevice
  110. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  111. if err != nil {
  112. server.Log.Errorf("子设备上线出错:%s", err.Error())
  113. }
  114. return nil
  115. }
  116. func processDevLogout(deviceCode, subDeviceId string) error {
  117. var args rpcs.SubDeviceArgs
  118. args.DeviceCode = deviceCode
  119. args.Status = 0
  120. args.SubDeviceId = subDeviceId
  121. var reply *models.SubDevice
  122. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  123. if err != nil {
  124. server.Log.Errorf("子设备下线出错:%s", err.Error())
  125. }
  126. return nil
  127. }
  128. // 处理设备配网信息
  129. func processDevNetConfig(deviceCode, md5 string) error {
  130. args := &models.DeviceNetConfig{
  131. DeviceIdentifier: deviceCode,
  132. MD5: md5,
  133. Status: 1,
  134. }
  135. reply := rpcs.ReplyCheckDeviceNetConfig{}
  136. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateDeviceNetConfig", args, &reply)
  137. if err != nil {
  138. server.Log.Errorf("set device:%s net config info error:%v", deviceCode, err)
  139. }
  140. return nil
  141. }
  142. // 设备上报固件信息处理
  143. func processDeviceReportUpgrade(deviceId, version string) error {
  144. args := &rpcs.ArgsUpdateDeviceVersion{
  145. DeviceId: deviceId,
  146. Version: version,
  147. }
  148. var reply rpcs.ReplyEmptyResult
  149. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply)
  150. if err != nil {
  151. server.Log.Errorf("更新设备版本号失败:%v", err)
  152. }
  153. return nil
  154. }
  155. func processReportStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) {
  156. reply := rpcs.ReplyOnStatus{}
  157. args := rpcs.ArgsOnStatus{
  158. DeviceId: topicInfo.DeviceCode,
  159. Timestamp: message.GetUint64("timestamp"),
  160. SubData: message.GetJson("data").MustToJson(),
  161. VendorId: vendorId,
  162. ProductKey: topicInfo.ProductKey,
  163. SubDeviceId: message.GetString("subDeviceId"),
  164. Action: klink.PacketAction(message.GetString("action")),
  165. }
  166. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
  167. if err != nil {
  168. server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
  169. return
  170. }
  171. }
  172. func (a *Access) processDeviceUpgrade(deviceId string, message *gjson.Json) error {
  173. var reply rpcs.ReplyEmptyResult
  174. data := gjson.New(message.GetJson("data").MustToJson())
  175. server.Log.Infof("收到指令:%s", data.MustToJsonString())
  176. switch data.GetString("cmd") {
  177. case "download":
  178. params := gjson.New(data.GetJson("params").MustToJson())
  179. args := &rpcs.ChunkUpgrade{
  180. DeviceId: deviceId,
  181. FileId: params.GetInt("fileId"),
  182. FileSize: params.GetInt64("fileSize"),
  183. Size: params.GetInt64("size"),
  184. Offset: params.GetInt("offset"),
  185. }
  186. err := a.chunkUpgrade(*args)
  187. if err != nil {
  188. server.Log.Errorf("分片下载发送失败:%v", err)
  189. return err
  190. }
  191. case "downProgress":
  192. params := gjson.New(data.GetJson("params").MustToJson())
  193. var args rpcs.ArgsOtaProgress
  194. args.DeviceId = deviceId
  195. args.Progress = params.GetInt("progress")
  196. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.UpdateProgress", args, &reply)
  197. if err != nil {
  198. server.Log.Errorf("OTA升级进度保存失败:%v", err)
  199. return err
  200. }
  201. case "finish":
  202. device := a.GetLockDevice(deviceId)
  203. device.Mutex.Lock()
  204. defer device.Mutex.Unlock()
  205. if device != nil {
  206. device.Locked = false
  207. }
  208. server.Log.Infof("OTA升级完成;%s", deviceId)
  209. }
  210. return nil
  211. }
  212. // Connected 设备接入时
  213. func (a *Access) Connected(status *protocol.DevConnectStatus) error {
  214. server.Log.Infof("设备上线;%s", status.DeviceId)
  215. // 查询设备信息
  216. device := &models.Device{}
  217. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  218. if err != nil {
  219. server.Log.Errorf("device not found %s", status.DeviceId)
  220. return nil
  221. }
  222. args := rpcs.ArgsGetOnline{
  223. Id: device.DeviceIdentifier,
  224. ClientIP: status.ClientIp,
  225. AccessRPCHost: server.GetRPCHost(),
  226. HeartbeatInterval: 300,
  227. }
  228. reply := rpcs.ReplyGetOnline{}
  229. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnlineV2", args, &reply)
  230. if err != nil {
  231. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  232. }
  233. var cReply rpcs.ReplyEmptyResult
  234. var cArgs rpcs.ArgsGetStatus
  235. cArgs.VendorId = device.VendorID
  236. cArgs.Id = args.Id
  237. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
  238. return err
  239. }
  240. return nil
  241. }
  242. // Disconnected 设备断开连接时
  243. func (a *Access) Disconnected(status *protocol.DevConnectStatus) error {
  244. // 查询设备信息
  245. device := &models.Device{}
  246. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  247. if err != nil {
  248. server.Log.Errorf("device not found %s", status.DeviceId)
  249. return nil
  250. }
  251. args := rpcs.ArgsGetOffline{
  252. Id: device.DeviceIdentifier,
  253. VendorId: device.VendorID,
  254. }
  255. reply := rpcs.ReplyGetOffline{}
  256. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
  257. if err != nil {
  258. server.Log.Errorf("device offline error. deviceid: %v, error: %v", status.DeviceId, err)
  259. }
  260. return err
  261. }
  262. // SendCommand rpc 发送设备命令
  263. func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  264. // 查询设备信息
  265. lockDevice := a.GetLockDevice(args.DeviceId)
  266. lockDevice.Mutex.Lock()
  267. defer lockDevice.Mutex.Unlock()
  268. if lockDevice.Locked {
  269. return errors.New("设备正在进行OTA升级,请稍后重试")
  270. }
  271. device := &models.Device{}
  272. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  273. if err != nil {
  274. server.Log.Errorf("device not found %s", args.DeviceId)
  275. return nil
  276. }
  277. product := &models.Product{}
  278. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  279. if err != nil {
  280. server.Log.Errorf("device not found %s", args.DeviceId)
  281. return nil
  282. }
  283. cmd := &klink.CloudSend{
  284. Action: "cloudSend",
  285. MsgId: 0,
  286. DeviceCode: args.DeviceId,
  287. SubDeviceId: args.SubDevice,
  288. Timestamp: time.Now().Unix(),
  289. Data: &klink.CloudSendData{
  290. Cmd: args.Cmd,
  291. Params: args.Params,
  292. },
  293. }
  294. msg, err := cmd.Marshal()
  295. if err != nil {
  296. return err
  297. }
  298. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), msg)
  299. }
  300. // GetStatus rpc 获取设备状态
  301. func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  302. server.Log.Infof("Access Get Status: %v", args)
  303. // first send a get status command
  304. cmdArgs := rpcs.ArgsSendCommand{
  305. DeviceId: args.Id,
  306. WaitTime: 0,
  307. SubDevice: args.SubDeviceId,
  308. Cmd: "report",
  309. }
  310. cmdReply := rpcs.ReplySendCommand{}
  311. return a.SendCommand(cmdArgs, &cmdReply)
  312. }
  313. func (a *Access) chunkUpgrade(params rpcs.ChunkUpgrade) error {
  314. lockDevice := a.GetLockDevice(params.DeviceId)
  315. lockDevice.Mutex.Lock()
  316. defer lockDevice.Mutex.Unlock()
  317. lockDevice.Locked = true
  318. lockDevice.LastSeen = time.Now()
  319. server.Log.Infof("正在进行OTA升级:%s", params.DeviceId)
  320. buf := bytes.NewBuffer(gbinary.BeEncodeUint16(gconv.Uint16(params.Offset)))
  321. var fileArgs rpcs.ArgsOtaFile
  322. fileArgs.FileId = params.FileId
  323. var fileReply rpcs.ReplyOtaFile
  324. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetFile", fileArgs, &fileReply)
  325. if err != nil {
  326. server.Log.Errorf("OTA升级文件保存失败:%v", err)
  327. return err
  328. }
  329. if fileReply.File == nil {
  330. return errors.New(fmt.Sprintf("文件:%d 获取失败", params.FileId))
  331. }
  332. start := params.Offset * int(params.Size)
  333. stop := (params.Offset + 1) * int(params.Size)
  334. if stop >= len(fileReply.File) {
  335. stop = len(fileReply.File)
  336. }
  337. if stop < start {
  338. start = stop
  339. }
  340. server.Log.Infof("收到ota请求--------fileId:%d ,offset:%d , 文件截取位置: start:%d,stop:%d", params.FileId, params.Offset, start, stop)
  341. data := fileReply.File[start:stop]
  342. buf.Write(gbinary.BeEncodeUint16(gconv.Uint16(len(data))))
  343. buf.Write(data)
  344. var mCrc crc
  345. checkSum := mCrc.reset().pushBytes(buf.Bytes()).value()
  346. buf.Write([]byte{byte(checkSum), byte(checkSum >> 8)})
  347. server.Log.Infof("发送数据-------- %2X", buf.Bytes())
  348. var SendByteArgs rpcs.ArgsSendByteData
  349. SendByteArgs.DeviceId = params.DeviceId
  350. SendByteArgs.Data = buf.Bytes()
  351. replay := new(rpcs.ReplySendCommand)
  352. err = a.SendByteData(SendByteArgs, replay)
  353. return nil
  354. }
  355. // SendByteData rpc 发送byte数组
  356. func (a *Access) SendByteData(args rpcs.ArgsSendByteData, reply *rpcs.ReplySendCommand) error {
  357. // 查询设备信息
  358. device := &models.Device{}
  359. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  360. if err != nil {
  361. server.Log.Errorf("device not found %s", args.DeviceId)
  362. return err
  363. }
  364. product := &models.Product{}
  365. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  366. if err != nil {
  367. server.Log.Errorf("device not found %s", args.DeviceId)
  368. return err
  369. }
  370. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), args.Data)
  371. }
  372. func (a *Access) GetLockDevice(id string) *Device {
  373. if d, exists := a.lockedDevices[id]; exists {
  374. return d
  375. }
  376. device := &Device{Id: id, Locked: false}
  377. a.lockedDevices[id] = device
  378. return device
  379. }
  380. func (a *Access) UnlockDevice() {
  381. for {
  382. time.Sleep(5 * time.Second) // 每5秒检查一次
  383. for _, device := range a.lockedDevices {
  384. device.Mutex.Lock()
  385. if device.Locked && time.Since(device.LastSeen) > 1*time.Minute {
  386. device.Locked = false
  387. server.Log.Infof("Device %s unlocked\n", device.Id)
  388. }
  389. device.Mutex.Unlock()
  390. }
  391. }
  392. }