package main import ( "bytes" "context" "errors" "fmt" "github.com/gogf/gf/encoding/gjson" "github.com/gogf/gf/util/gconv" "github.com/gogf/gf/v2/encoding/gbinary" "sparrow/pkg/klink" "sparrow/pkg/models" "sparrow/pkg/protocol" "sparrow/pkg/rpcs" "sparrow/pkg/server" "sync" "time" ) type Access struct { client SubDev lockedDevices map[string]*Device } func NewAgent(client SubDev) *Access { a := &Access{ client: client, lockedDevices: make(map[string]*Device), } go a.UnlockDevice() return a } type Device struct { Id string Locked bool LastSeen time.Time Mutex sync.Mutex } // Message 收到设备上报消息处理 func (a *Access) Message(topic string, payload []byte) error { topicInfo, err := protocol.GetTopicInfo(topic) if err != nil { return err } if topicInfo.Direction == protocol.Down { return nil } device := &models.Device{} err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", topicInfo.DeviceCode, device) if err != nil { server.Log.Errorf("device not found %s", topicInfo.DeviceCode) return nil } server.Log.Debugf("device {%v} message {%v} : %s", device.DeviceIdentifier, topicInfo.Types, payload) if len(topicInfo.Types) == 0 { return nil } jsonPayload, err := gjson.DecodeToJson(payload) if err != nil { return nil } switch topicInfo.Types[0] { case "status": return a.processStatus(topicInfo, device.VendorID, jsonPayload) case "event": return a.processEvent(topicInfo, device.VendorID, jsonPayload) } return nil } func (a *Access) processEvent(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error { reply := rpcs.ReplyOnEvent{} args := rpcs.ArgsOnEvent{ DeviceId: topicInfo.DeviceCode, TimeStamp: message.GetUint64("timestamp"), SubDeviceId: message.GetString("subDeviceId"), SubData: message.GetJson("data").MustToJson(), VendorId: vendorId, ProductKey: topicInfo.ProductKey, } err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply) if err != nil { server.Log.Errorf("device on event error. args: %v, error: %v", args, err) return err } return nil } func (a *Access) processStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) error { act := klink.PacketAction(message.GetString("action")) if act != "" { switch act { case klink.DevSendAction: processReportStatus(topicInfo, vendorId, message) case klink.DevLoginAction: _ = processDevLogin(topicInfo.DeviceCode, message.GetString("subDeviceId")) case klink.DevLogoutAction: _ = processDevLogout(topicInfo.DeviceCode, message.GetString("subDeviceId")) case klink.DevNetConfigAction: _ = processDevNetConfig(topicInfo.DeviceCode, message.GetString("md5")) case klink.ReportFirmwareAction: _ = processDeviceReportUpgrade(topicInfo.DeviceCode, message.GetString("version")) case klink.DevUpgradeAction: _ = a.processDeviceUpgrade(topicInfo.DeviceCode, message) } } return nil } func processDevLogin(deviceCode, subDeviceId string) error { var args rpcs.SubDeviceArgs args.DeviceCode = deviceCode args.Status = 1 args.SubDeviceId = subDeviceId var reply *models.SubDevice err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply) if err != nil { server.Log.Errorf("子设备上线出错:%s", err.Error()) } return nil } func processDevLogout(deviceCode, subDeviceId string) error { var args rpcs.SubDeviceArgs args.DeviceCode = deviceCode args.Status = 0 args.SubDeviceId = subDeviceId var reply *models.SubDevice err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply) if err != nil { server.Log.Errorf("子设备下线出错:%s", err.Error()) } return nil } // 处理设备配网信息 func processDevNetConfig(deviceCode, md5 string) error { args := &models.DeviceNetConfig{ DeviceIdentifier: deviceCode, MD5: md5, Status: 1, } reply := rpcs.ReplyCheckDeviceNetConfig{} err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateDeviceNetConfig", args, &reply) if err != nil { server.Log.Errorf("set device:%s net config info error:%v", deviceCode, err) } return nil } // 设备上报固件信息处理 func processDeviceReportUpgrade(deviceId, version string) error { args := &rpcs.ArgsUpdateDeviceVersion{ DeviceId: deviceId, Version: version, } var reply rpcs.ReplyEmptyResult err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply) if err != nil { server.Log.Errorf("更新设备版本号失败:%v", err) } return nil } func processReportStatus(topicInfo *protocol.TopicInfo, vendorId string, message *gjson.Json) { reply := rpcs.ReplyOnStatus{} args := rpcs.ArgsOnStatus{ DeviceId: topicInfo.DeviceCode, Timestamp: message.GetUint64("timestamp"), SubData: message.GetJson("data").MustToJson(), VendorId: vendorId, ProductKey: topicInfo.ProductKey, SubDeviceId: message.GetString("subDeviceId"), Action: klink.PacketAction(message.GetString("action")), } err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply) if err != nil { server.Log.Errorf("device report status error. args: %v, error: %v", args, err) return } } func (a *Access) processDeviceUpgrade(deviceId string, message *gjson.Json) error { var reply rpcs.ReplyEmptyResult data := gjson.New(message.GetJson("data").MustToJson()) server.Log.Infof("收到指令:%s", data.MustToJsonString()) switch data.GetString("cmd") { case "download": params := gjson.New(data.GetJson("params").MustToJson()) args := &rpcs.ChunkUpgrade{ DeviceId: deviceId, FileId: params.GetInt("fileId"), FileSize: params.GetInt64("fileSize"), Size: params.GetInt64("size"), Offset: params.GetInt("offset"), } err := a.chunkUpgrade(*args) if err != nil { server.Log.Errorf("分片下载发送失败:%v", err) return err } case "downProgress": params := gjson.New(data.GetJson("params").MustToJson()) var args rpcs.ArgsOtaProgress args.DeviceId = deviceId args.Progress = params.GetInt("progress") err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.UpdateProgress", args, &reply) if err != nil { server.Log.Errorf("OTA升级进度保存失败:%v", err) return err } case "finish": device := a.GetLockDevice(deviceId) device.Mutex.Lock() defer device.Mutex.Unlock() if device != nil { device.Locked = false } server.Log.Infof("OTA升级完成;%s", deviceId) } return nil } // Connected 设备接入时 func (a *Access) Connected(status *protocol.DevConnectStatus) error { server.Log.Infof("设备上线;%s", status.DeviceId) // 查询设备信息 device := &models.Device{} err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device) if err != nil { server.Log.Errorf("device not found %s", status.DeviceId) return nil } args := rpcs.ArgsGetOnline{ Id: device.DeviceIdentifier, ClientIP: status.ClientIp, AccessRPCHost: server.GetRPCHost(), HeartbeatInterval: 300, } reply := rpcs.ReplyGetOnline{} err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnlineV2", args, &reply) if err != nil { server.Log.Errorf("device online error. args: %v, error: %v", args, err) } var cReply rpcs.ReplyEmptyResult var cArgs rpcs.ArgsGetStatus cArgs.VendorId = device.VendorID cArgs.Id = args.Id if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil { return err } return nil } // Disconnected 设备断开连接时 func (a *Access) Disconnected(status *protocol.DevConnectStatus) error { // 查询设备信息 device := &models.Device{} err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device) if err != nil { server.Log.Errorf("device not found %s", status.DeviceId) return nil } args := rpcs.ArgsGetOffline{ Id: device.DeviceIdentifier, VendorId: device.VendorID, } reply := rpcs.ReplyGetOffline{} err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply) if err != nil { server.Log.Errorf("device offline error. deviceid: %v, error: %v", status.DeviceId, err) } return err } // SendCommand rpc 发送设备命令 func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error { // 查询设备信息 lockDevice := a.GetLockDevice(args.DeviceId) lockDevice.Mutex.Lock() defer lockDevice.Mutex.Unlock() if lockDevice.Locked { return errors.New("设备正在进行OTA升级,请稍后重试") } device := &models.Device{} err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device) if err != nil { server.Log.Errorf("device not found %s", args.DeviceId) return nil } product := &models.Product{} err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product) if err != nil { server.Log.Errorf("device not found %s", args.DeviceId) return nil } cmd := &klink.CloudSend{ Action: "cloudSend", MsgId: 0, DeviceCode: args.DeviceId, SubDeviceId: args.SubDevice, Timestamp: time.Now().Unix(), Data: &klink.CloudSendData{ Cmd: args.Cmd, Params: args.Params, }, } msg, err := cmd.Marshal() if err != nil { return err } return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), msg) } // GetStatus rpc 获取设备状态 func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error { server.Log.Infof("Access Get Status: %v", args) // first send a get status command cmdArgs := rpcs.ArgsSendCommand{ DeviceId: args.Id, WaitTime: 0, SubDevice: args.SubDeviceId, Cmd: "report", } cmdReply := rpcs.ReplySendCommand{} return a.SendCommand(cmdArgs, &cmdReply) } func (a *Access) chunkUpgrade(params rpcs.ChunkUpgrade) error { lockDevice := a.GetLockDevice(params.DeviceId) lockDevice.Mutex.Lock() defer lockDevice.Mutex.Unlock() lockDevice.Locked = true lockDevice.LastSeen = time.Now() server.Log.Infof("正在进行OTA升级:%s", params.DeviceId) buf := bytes.NewBuffer(gbinary.BeEncodeUint16(gconv.Uint16(params.Offset))) var fileArgs rpcs.ArgsOtaFile fileArgs.FileId = params.FileId var fileReply rpcs.ReplyOtaFile err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetFile", fileArgs, &fileReply) if err != nil { server.Log.Errorf("OTA升级文件保存失败:%v", err) return err } if fileReply.File == nil { return errors.New(fmt.Sprintf("文件:%d 获取失败", params.FileId)) } start := params.Offset * int(params.Size) stop := (params.Offset + 1) * int(params.Size) if stop >= len(fileReply.File) { stop = len(fileReply.File) } if stop < start { start = stop } server.Log.Infof("收到ota请求--------fileId:%d ,offset:%d , 文件截取位置: start:%d,stop:%d", params.FileId, params.Offset, start, stop) data := fileReply.File[start:stop] buf.Write(gbinary.BeEncodeUint16(gconv.Uint16(len(data)))) buf.Write(data) var mCrc crc checkSum := mCrc.reset().pushBytes(buf.Bytes()).value() buf.Write([]byte{byte(checkSum), byte(checkSum >> 8)}) server.Log.Infof("发送数据-------- %2X", buf.Bytes()) var SendByteArgs rpcs.ArgsSendByteData SendByteArgs.DeviceId = params.DeviceId SendByteArgs.Data = buf.Bytes() replay := new(rpcs.ReplySendCommand) err = a.SendByteData(SendByteArgs, replay) return nil } // SendByteData rpc 发送byte数组 func (a *Access) SendByteData(args rpcs.ArgsSendByteData, reply *rpcs.ReplySendCommand) error { // 查询设备信息 device := &models.Device{} err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device) if err != nil { server.Log.Errorf("device not found %s", args.DeviceId) return err } product := &models.Product{} err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product) if err != nil { server.Log.Errorf("device not found %s", args.DeviceId) return err } return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), args.Data) } func (a *Access) GetLockDevice(id string) *Device { if d, exists := a.lockedDevices[id]; exists { return d } device := &Device{Id: id, Locked: false} a.lockedDevices[id] = device return device } func (a *Access) UnlockDevice() { for { time.Sleep(5 * time.Second) // 每5秒检查一次 for _, device := range a.lockedDevices { device.Mutex.Lock() if device.Locked && time.Since(device.LastSeen) > 1*time.Minute { device.Locked = false server.Log.Infof("Device %s unlocked\n", device.Id) } device.Mutex.Unlock() } } }