agent.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "github.com/gogf/gf/encoding/gjson"
  8. "github.com/gogf/gf/util/gconv"
  9. "github.com/gogf/gf/v2/encoding/gbinary"
  10. "sparrow/pkg/klink"
  11. "sparrow/pkg/models"
  12. "sparrow/pkg/protocol"
  13. "sparrow/pkg/rpcs"
  14. "sparrow/pkg/server"
  15. "sync"
  16. "time"
  17. )
  18. type Access struct {
  19. client SubDev
  20. lockedDevices map[string]*Device
  21. }
  22. func NewAgent(client SubDev) *Access {
  23. a := &Access{
  24. client: client,
  25. lockedDevices: make(map[string]*Device),
  26. }
  27. go a.UnlockDevice()
  28. return a
  29. }
  30. type Device struct {
  31. Id string
  32. Locked bool
  33. LastSeen time.Time
  34. Mutex sync.Mutex
  35. }
  36. // Message 收到设备上报消息处理
  37. func (a *Access) Message(topic string, payload []byte) error {
  38. topicInfo, err := protocol.GetTopicInfo(topic)
  39. if err != nil {
  40. return err
  41. }
  42. if topicInfo.Direction == protocol.Down {
  43. return nil
  44. }
  45. device := &models.Device{}
  46. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", topicInfo.DeviceCode, device)
  47. if err != nil {
  48. server.Log.Errorf("device not found %s", topicInfo.DeviceCode)
  49. return nil
  50. }
  51. server.Log.Debugf("device {%v} message {%v} : %s", device.DeviceIdentifier, topicInfo.Types, payload)
  52. if len(topicInfo.Types) == 0 {
  53. return nil
  54. }
  55. jsonPayload, err := gjson.DecodeToJson(payload)
  56. if err != nil {
  57. return nil
  58. }
  59. switch topicInfo.Types[0] {
  60. case "status":
  61. return a.processStatus(topicInfo, device.VendorID, jsonPayload)
  62. case "event":
  63. return a.processEvent(topicInfo, device.VendorID, jsonPayload)
  64. }
  65. return nil
  66. }
  67. func (a *Access) processEvent(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  68. reply := rpcs.ReplyOnEvent{}
  69. args := rpcs.ArgsOnEvent{
  70. DeviceId: topicInfo.DeviceCode,
  71. TimeStamp: message.GetUint64("timestamp"),
  72. SubDeviceId: message.GetString("subDeviceId"),
  73. SubData: message.GetJson("data").MustToJson(),
  74. VendorId: vendorId,
  75. ProductKey: topicInfo.ProductKey,
  76. }
  77. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
  78. if err != nil {
  79. server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
  80. return err
  81. }
  82. return nil
  83. }
  84. func (a *Access) processStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error {
  85. act := klink.PacketAction(message.GetString("action"))
  86. if act != "" {
  87. switch act {
  88. case klink.DevSendAction:
  89. processReportStatus(topicInfo, vendorId, message)
  90. case klink.DevLoginAction:
  91. _ = processDevLogin(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  92. case klink.DevLogoutAction:
  93. _ = processDevLogout(topicInfo.DeviceCode, message.GetString("subDeviceId"))
  94. case klink.DevNetConfigAction:
  95. _ = processDevNetConfig(topicInfo.DeviceCode, message.GetString("md5"))
  96. case klink.ReportFirmwareAction:
  97. _ = processDeviceReportUpgrade(topicInfo.DeviceCode, message.GetString("version"))
  98. case klink.DevUpgradeAction:
  99. _ = a.processDeviceUpgrade(topicInfo.DeviceCode, message)
  100. }
  101. }
  102. return nil
  103. }
  104. func processDevLogin(deviceCode, subDeviceId string) error {
  105. var args rpcs.SubDeviceArgs
  106. args.DeviceCode = deviceCode
  107. args.Status = 1
  108. args.SubDeviceId = subDeviceId
  109. var reply *models.SubDevice
  110. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  111. if err != nil {
  112. server.Log.Errorf("子设备上线出错:%s", err.Error())
  113. }
  114. return nil
  115. }
  116. func processDevLogout(deviceCode, subDeviceId string) error {
  117. var args rpcs.SubDeviceArgs
  118. args.DeviceCode = deviceCode
  119. args.Status = 0
  120. args.SubDeviceId = subDeviceId
  121. var reply *models.SubDevice
  122. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  123. if err != nil {
  124. server.Log.Errorf("子设备下线出错:%s", err.Error())
  125. }
  126. return nil
  127. }
  128. // 处理设备配网信息
  129. func processDevNetConfig(deviceCode, md5 string) error {
  130. args := &models.DeviceNetConfig{
  131. DeviceIdentifier: deviceCode,
  132. MD5: md5,
  133. Status: 1,
  134. }
  135. reply := rpcs.ReplyCheckDeviceNetConfig{}
  136. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateDeviceNetConfig", args, &reply)
  137. if err != nil {
  138. server.Log.Errorf("set device:%s net config info error:%v", deviceCode, err)
  139. }
  140. return nil
  141. }
  142. // 设备上报固件信息处理
  143. func processDeviceReportUpgrade(deviceId, version string) error {
  144. args := &rpcs.ArgsUpdateDeviceVersion{
  145. DeviceId: deviceId,
  146. Version: version,
  147. }
  148. var reply rpcs.ReplyEmptyResult
  149. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply)
  150. if err != nil {
  151. server.Log.Errorf("更新设备版本号失败:%v", err)
  152. }
  153. return nil
  154. }
  155. func processReportStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) {
  156. reply := rpcs.ReplyOnStatus{}
  157. args := rpcs.ArgsOnStatus{
  158. DeviceId: topicInfo.DeviceCode,
  159. Timestamp: message.GetUint64("timestamp"),
  160. SubData: message.GetJson("data").MustToJson(),
  161. VendorId: vendorId,
  162. ProductKey: topicInfo.ProductKey,
  163. SubDeviceId: message.GetString("subDeviceId"),
  164. Action: klink.PacketAction(message.GetString("action")),
  165. }
  166. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
  167. if err != nil {
  168. server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
  169. return
  170. }
  171. }
  172. func (a *Access) processDeviceUpgrade(deviceId string, message *gjson.Json) error {
  173. var reply rpcs.ReplyEmptyResult
  174. data := gjson.New(message.GetJson("data").MustToJson())
  175. switch data.GetString("cmd") {
  176. case "download":
  177. params := gjson.New(data.GetJson("params").MustToJson())
  178. args := &rpcs.ChunkUpgrade{
  179. DeviceId: deviceId,
  180. FileId: params.GetInt("fileId"),
  181. FileSize: params.GetInt64("fileSize"),
  182. Size: params.GetInt64("size"),
  183. Offset: params.GetInt("offset"),
  184. }
  185. err := a.chunkUpgrade(*args)
  186. if err != nil {
  187. server.Log.Errorf("分片下载发送失败:%v", err)
  188. return err
  189. }
  190. case "downProgress":
  191. params := gjson.New(data.GetJson("params").MustToJson())
  192. var args rpcs.ArgsOtaProgress
  193. args.DeviceId = deviceId
  194. args.Progress = params.GetInt("progress")
  195. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.UpdateProgress", args, &reply)
  196. if err != nil {
  197. server.Log.Errorf("OTA升级进度保存失败:%v", err)
  198. return err
  199. }
  200. case "finish":
  201. device := a.GetLockDevice(deviceId)
  202. device.Mutex.Lock()
  203. defer device.Mutex.Unlock()
  204. if device != nil {
  205. device.Locked = false
  206. }
  207. server.Log.Infof("OTA升级完成;%s", deviceId)
  208. }
  209. return nil
  210. }
  211. // Connected 设备接入时
  212. func (a *Access) Connected(status *protocol.DevConnectStatus) error {
  213. server.Log.Infof("设备上线;%s", status.DeviceId)
  214. // 查询设备信息
  215. device := &models.Device{}
  216. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  217. if err != nil {
  218. server.Log.Errorf("device not found %s", status.DeviceId)
  219. return nil
  220. }
  221. args := rpcs.ArgsGetOnline{
  222. Id: device.DeviceIdentifier,
  223. ClientIP: status.ClientIp,
  224. AccessRPCHost: server.GetRPCHost(),
  225. HeartbeatInterval: 300,
  226. }
  227. reply := rpcs.ReplyGetOnline{}
  228. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnlineV2", args, &reply)
  229. if err != nil {
  230. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  231. }
  232. var cReply rpcs.ReplyEmptyResult
  233. var cArgs rpcs.ArgsGetStatus
  234. cArgs.VendorId = device.VendorID
  235. cArgs.Id = args.Id
  236. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
  237. return err
  238. }
  239. return nil
  240. }
  241. // Disconnected 设备断开连接时
  242. func (a *Access) Disconnected(status *protocol.DevConnectStatus) error {
  243. // 查询设备信息
  244. device := &models.Device{}
  245. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
  246. if err != nil {
  247. server.Log.Errorf("device not found %s", status.DeviceId)
  248. return nil
  249. }
  250. args := rpcs.ArgsGetOffline{
  251. Id: device.DeviceIdentifier,
  252. VendorId: device.VendorID,
  253. }
  254. reply := rpcs.ReplyGetOffline{}
  255. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
  256. if err != nil {
  257. server.Log.Errorf("device offline error. deviceid: %v, error: %v", status.DeviceId, err)
  258. }
  259. return err
  260. }
  261. // SendCommand rpc 发送设备命令
  262. func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  263. // 查询设备信息
  264. lockDevice := a.GetLockDevice(args.DeviceId)
  265. lockDevice.Mutex.Lock()
  266. defer lockDevice.Mutex.Unlock()
  267. if lockDevice.Locked {
  268. return errors.New("设备正在进行OTA升级,请稍后重试")
  269. }
  270. device := &models.Device{}
  271. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  272. if err != nil {
  273. server.Log.Errorf("device not found %s", args.DeviceId)
  274. return nil
  275. }
  276. product := &models.Product{}
  277. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  278. if err != nil {
  279. server.Log.Errorf("device not found %s", args.DeviceId)
  280. return nil
  281. }
  282. cmd := &klink.CloudSend{
  283. Action: "cloudSend",
  284. MsgId: 0,
  285. DeviceCode: args.DeviceId,
  286. SubDeviceId: args.SubDevice,
  287. Timestamp: time.Now().Unix(),
  288. Data: &klink.CloudSendData{
  289. Cmd: args.Cmd,
  290. Params: args.Params,
  291. },
  292. }
  293. msg, err := cmd.Marshal()
  294. if err != nil {
  295. return err
  296. }
  297. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), msg)
  298. }
  299. // GetStatus rpc 获取设备状态
  300. func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  301. server.Log.Infof("Access Get Status: %v", args)
  302. // first send a get status command
  303. cmdArgs := rpcs.ArgsSendCommand{
  304. DeviceId: args.Id,
  305. WaitTime: 0,
  306. SubDevice: args.SubDeviceId,
  307. Cmd: "report",
  308. }
  309. cmdReply := rpcs.ReplySendCommand{}
  310. return a.SendCommand(cmdArgs, &cmdReply)
  311. }
  312. func (a *Access) chunkUpgrade(params rpcs.ChunkUpgrade) error {
  313. lockDevice := a.GetLockDevice(params.DeviceId)
  314. lockDevice.Mutex.Lock()
  315. defer lockDevice.Mutex.Unlock()
  316. lockDevice.Locked = true
  317. lockDevice.LastSeen = time.Now()
  318. server.Log.Infof("4G模组OTA升级:%s", params.DeviceId)
  319. buf := bytes.NewBuffer(gbinary.BeEncodeUint16(gconv.Uint16(params.Offset)))
  320. var fileArgs rpcs.ArgsOtaFile
  321. fileArgs.FileId = params.FileId
  322. var fileReply rpcs.ReplyOtaFile
  323. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetFile", fileArgs, &fileReply)
  324. if err != nil {
  325. server.Log.Errorf("OTA升级文件保存失败:%v", err)
  326. return err
  327. }
  328. if fileReply.File == nil {
  329. return errors.New(fmt.Sprintf("文件:%d 获取失败", params.FileId))
  330. }
  331. start := params.Offset * int(params.Size)
  332. stop := (params.Offset + 1) * int(params.Size)
  333. if stop >= len(fileReply.File) {
  334. stop = len(fileReply.File)
  335. }
  336. if stop < start {
  337. start = stop
  338. }
  339. server.Log.Infof("<--------fileId:%d,offset:%d,文件截取位置------: start:%d,stop:%d", params.FileId, params.FileSize, start, stop)
  340. data := fileReply.File[start:stop]
  341. buf.Write(gbinary.BeEncodeUint16(gconv.Uint16(len(data))))
  342. buf.Write(data)
  343. var mCrc crc
  344. checkSum := mCrc.reset().pushBytes(buf.Bytes()).value()
  345. buf.Write([]byte{byte(checkSum), byte(checkSum >> 8)})
  346. server.Log.Infof("--------> %2X", buf.Bytes())
  347. var SendByteArgs rpcs.ArgsSendByteData
  348. SendByteArgs.DeviceId = params.DeviceId
  349. SendByteArgs.Data = buf.Bytes()
  350. replay := new(rpcs.ReplySendCommand)
  351. err = a.SendByteData(SendByteArgs, replay)
  352. return nil
  353. }
  354. // SendByteData rpc 发送byte数组
  355. func (a *Access) SendByteData(args rpcs.ArgsSendByteData, reply *rpcs.ReplySendCommand) error {
  356. // 查询设备信息
  357. device := &models.Device{}
  358. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
  359. if err != nil {
  360. server.Log.Errorf("device not found %s", args.DeviceId)
  361. return err
  362. }
  363. product := &models.Product{}
  364. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  365. if err != nil {
  366. server.Log.Errorf("device not found %s", args.DeviceId)
  367. return err
  368. }
  369. return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), args.Data)
  370. }
  371. func (a *Access) GetLockDevice(id string) *Device {
  372. if d, exists := a.lockedDevices[id]; exists {
  373. return d
  374. }
  375. device := &Device{Id: id, Locked: false}
  376. a.lockedDevices[id] = device
  377. return device
  378. }
  379. func (a *Access) UnlockDevice() {
  380. for {
  381. time.Sleep(5 * time.Second) // 每5秒检查一次
  382. for _, device := range a.lockedDevices {
  383. device.Mutex.Lock()
  384. if device.Locked && time.Since(device.LastSeen) > 1*time.Minute {
  385. device.Locked = false
  386. server.Log.Infof("Device %s unlocked\n", device.Id)
  387. }
  388. device.Mutex.Unlock()
  389. }
  390. }
  391. }