浏览代码

增加分体式五恒ota升级、设备信息查看

liuxiulin 8 月之前
父节点
当前提交
9c380d3854

+ 55 - 0
pkg/deviceStatus/deviceStatus.go

@@ -0,0 +1,55 @@
+package deviceStatus
+
+import (
+	"github.com/gogf/gf/database/gredis"
+	"github.com/gogf/gf/encoding/gjson"
+)
+
+const (
+	KeyPrefix   = "device:separate:"
+	dataExpires = 7200
+)
+
+type SplitStatus struct {
+	Power      int `json:"power"`
+	Mode       int `json:"mode"`
+	FanSpeed   int `json:"fan_speed"`
+	SetTemp    int `json:"set_temp"`
+	EnvTemp    int `json:"env_temp"`
+	EnvCo2     int `json:"env_co2"`
+	EnvPm25    int `json:"env_pm25"`
+	StatusCode int `json:"status_code"`
+	AirMode    int `json:"air_mode"`
+	AcType     int `json:"ac_type"`
+	AirType    int `json:"air_type"`
+	HumType    int `json:"hum_type"`
+}
+
+type DevStatusManager struct {
+	redisClient *gredis.Redis
+}
+
+func NewDevStatusManager(host string, port int) *DevStatusManager {
+	red := gredis.New(&gredis.Config{
+		Host:      host,
+		Port:      port,
+		Db:        14,
+		MaxActive: 100,
+	})
+	mgr := &DevStatusManager{
+		redisClient: red,
+	}
+	return mgr
+}
+
+func (mgr *DevStatusManager) GetDeviceStatus(id string) (*gjson.Json, error) {
+	key := KeyPrefix + id
+
+	// get status from redis
+	result, err := mgr.redisClient.DoVar("GET", key)
+	if err != nil {
+		return nil, err
+	}
+
+	return gjson.New(result), nil
+}

+ 119 - 0
pkg/otaUpgrade/ota_upgrade.go

@@ -0,0 +1,119 @@
+package otaUpgrade
+
+import (
+	"github.com/gogf/gf/database/gredis"
+)
+
+const (
+	FileKeyPrefix     = "ota:file:"
+	ProgressKeyPrefix = "ota:progress:"
+	dataExpires       = 7200
+)
+
+type File struct {
+	FileId   string
+	FileData []byte
+}
+
+type Progress struct {
+	DeviceId string
+	Progress int
+}
+type OtaManager struct {
+	redisClient *gredis.Redis
+}
+
+func NewOtaManager(host string, port, db int) *OtaManager {
+	red := gredis.New(&gredis.Config{
+		Host:      host,
+		Port:      port,
+		Db:        db,
+		MaxActive: 100,
+	})
+	mgr := &OtaManager{
+		redisClient: red,
+	}
+	return mgr
+}
+
+func (mgr *OtaManager) SavaFile(id string, fileData []byte) error {
+	key := FileKeyPrefix + id
+	file := new(File)
+	file.FileId = id
+	file.FileData = fileData
+	_, err := mgr.redisClient.Do("SET", key, file)
+	if err != nil {
+		return err
+	}
+	_, err = mgr.redisClient.Do("EXPIRE", key, dataExpires)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (mgr *OtaManager) GetFile(id string) (*File, error) {
+	key := FileKeyPrefix + id
+	file := new(File)
+	// get status from redis
+	result, err := mgr.redisClient.DoVar("GET", key)
+	if err != nil {
+		return nil, err
+	}
+	err = result.Struct(file)
+	if err != nil {
+		return nil, err
+	}
+	return file, nil
+}
+
+func (mgr *OtaManager) DelFile(id string) error {
+	key := FileKeyPrefix + id
+	_, err := mgr.redisClient.Do("DEL", key)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (mgr *OtaManager) GetProgress(id string) (*Progress, error) {
+	key := ProgressKeyPrefix + id
+	progress := new(Progress)
+	result, err := mgr.redisClient.DoVar("GET", key)
+	if err != nil {
+		return nil, err
+	}
+	_, err = mgr.redisClient.Do("EXPIRE", key, dataExpires)
+	if err != nil {
+		return nil, err
+	}
+	err = result.Struct(progress)
+	if err != nil {
+		return nil, err
+	}
+	return progress, nil
+}
+
+func (mgr *OtaManager) UpdateProgress(id string, progress int) error {
+	key := ProgressKeyPrefix + id
+	_, err := mgr.redisClient.Do("SET", key, progress)
+	if err != nil {
+		return err
+	}
+	_, err = mgr.redisClient.Do("EXPIRE", key, dataExpires)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (mgr *OtaManager) DelProgress(id string) error {
+	key := ProgressKeyPrefix + id
+	_, err := mgr.redisClient.Do("DEL", key)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}

+ 28 - 0
pkg/rpcs/access.go

@@ -1,6 +1,7 @@
 package rpcs
 
 import (
+	"github.com/gogf/gf/encoding/gjson"
 	"sparrow/pkg/protocol"
 )
 
@@ -36,3 +37,30 @@ type ArgsDeviceUpgrade struct {
 	Version     string
 	FileSize    int64
 }
+
+type ArgsSendByteData struct {
+	DeviceId  string
+	SubDevice string
+	Cmd       string
+	WaitTime  uint32
+	Data      []byte
+	Params    map[string]interface{}
+}
+
+type ArgsUpgrade4G struct {
+	DeviceId string
+	FileId   string
+	FileSize int64
+}
+
+type ChunkUpgrade struct {
+	DeviceId string
+	FileId   string
+	FileSize int64
+	Size     int64
+	Offset   int
+}
+
+type ReplyStatus struct {
+	Status gjson.Json
+}

+ 22 - 0
pkg/rpcs/devicemanager.go

@@ -36,3 +36,25 @@ type ReplyHeartBeat ReplyEmptyResult
 
 type ArgsGetDeviceOnlineStatus ArgsDeviceId
 type ReplyGetDeviceOnlineStatus online.Status
+
+type ArgsOtaFile struct {
+	FileId   string
+	FileData []byte
+}
+
+type ReplyOtaFile struct {
+	File []byte
+}
+
+type ArgsOtaProgress struct {
+	DeviceId string
+	Progress int
+}
+
+type ReplyOtaProgress struct {
+	Progress int
+}
+
+type ReplyDeviceStatus struct {
+	Status interface{}
+}

+ 57 - 0
services/devicemanager/manager.go

@@ -1,7 +1,9 @@
 package main
 
 import (
+	"sparrow/pkg/deviceStatus"
 	"sparrow/pkg/online"
+	"sparrow/pkg/otaUpgrade"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/token"
 )
@@ -9,14 +11,20 @@ import (
 type DeviceManager struct {
 	onlineManager *online.Manager
 	tokenHelper   *token.Helper
+	otaManager    *otaUpgrade.OtaManager
+	statusManager *deviceStatus.DevStatusManager
 }
 
 func NewDeviceManager(redishost string, port, db int) *DeviceManager {
 	mgr := online.NewManager(redishost, port, db)
 	helper := token.NewHelper(redishost, port, db)
+	otaMgr := otaUpgrade.NewOtaManager(redishost, port, db)
+	statusMgr := deviceStatus.NewDevStatusManager(redishost, port)
 	return &DeviceManager{
 		onlineManager: mgr,
 		tokenHelper:   helper,
+		otaManager:    otaMgr,
+		statusManager: statusMgr,
 	}
 }
 
@@ -72,3 +80,52 @@ func (dm *DeviceManager) GetDeviceOnlineStatus(args rpcs.ArgsGetDeviceOnlineStat
 
 	return nil
 }
+
+func (dm *DeviceManager) SavaFile(args rpcs.ArgsOtaFile, reply *rpcs.ReplyGetOffline) error {
+	err := dm.otaManager.SavaFile(args.FileId, args.FileData)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (dm *DeviceManager) GetFile(args rpcs.ArgsOtaFile, reply *rpcs.ReplyOtaFile) error {
+	file, err := dm.otaManager.GetFile(args.FileId)
+	if err != nil {
+		return err
+	}
+	if file != nil {
+		reply.File = file.FileData
+	}
+	return nil
+}
+
+func (dm *DeviceManager) UpdateProgress(args rpcs.ArgsOtaProgress, reply *rpcs.ReplyGetOffline) error {
+	err := dm.otaManager.UpdateProgress(args.DeviceId, args.Progress)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (dm *DeviceManager) GetProgress(args rpcs.ArgsOtaProgress, reply *rpcs.ReplyOtaProgress) error {
+	progress, err := dm.otaManager.GetProgress(args.DeviceId)
+	if err != nil {
+		return err
+	}
+	if progress != nil {
+		reply.Progress = progress.Progress
+	}
+	return nil
+}
+
+func (dm *DeviceManager) GetDeviceStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyStatus) error {
+	status, err := dm.statusManager.GetDeviceStatus(args.Id)
+	if err != nil {
+		return err
+	}
+	if status != nil {
+		reply.Status = *status
+	}
+	return nil
+}

+ 19 - 0
services/emqx-agent/agent.go

@@ -259,3 +259,22 @@ func NewAgent(client SubDev) *Access {
 		client: client,
 	}
 }
+
+// SendByteData rpc 发送byte数组
+func (a *Access) SendByteData(args rpcs.ArgsSendByteData, reply *rpcs.ReplySendCommand) error {
+	// 查询设备信息
+	device := &models.Device{}
+	err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
+	if err != nil {
+		server.Log.Errorf("device not found %s", args.DeviceId)
+		return nil
+	}
+	product := &models.Product{}
+	err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
+	if err != nil {
+		server.Log.Errorf("device not found %s", args.DeviceId)
+		return nil
+	}
+
+	return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.ProductKey), args.Data)
+}

+ 77 - 42
services/knowoapi/controllers/device.go

@@ -3,6 +3,8 @@ package controllers
 import (
 	"errors"
 	"fmt"
+	"io"
+	"sparrow/pkg/models"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
 	"sparrow/services/knowoapi/services"
@@ -217,46 +219,79 @@ func (a *DeviceController) GetLivechart() {
 }
 
 // Upgrade ota升级
-// POST /devices/upgrade
-//func (a *DeviceController) Upgrade() {
-//	params := new(models.UpgradeParams)
-//	if err := parseBody(a.Ctx, params); err != nil {
-//		badRequest(a.Ctx, err)
-//		return
-//	}
-//	params.VendorID = a.Token.getVendorID(a.Ctx)
-//	file, header, err := a.Ctx.FormFile("file")
-//	if err != nil {
-//		responseError(a.Ctx, ErrNormal, err.Error())
-//		return
-//	}
-//	fileBytes, err := io.ReadAll(file)
-//	if err != nil {
-//		responseError(a.Ctx, ErrNormal, err.Error())
-//		return
-//	}
-//	params.FileSize = header.Size
-//	params.File = fileBytes
-//	params.FileName = header.Filename
-//	err = a.Service.Upgrade(params)
-//	if err != nil {
-//		responseError(a.Ctx, ErrNormal, err.Error())
-//		return
-//	}
-//	done(a.Ctx, params)
-//}
+// POST /device/upgrade
+func (a *DeviceController) Upgrade() {
+	params := new(models.UpgradeParams)
+	if err := parseBody(a.Ctx, params); err != nil {
+		badRequest(a.Ctx, err)
+		return
+	}
+	params.VendorID = a.Token.getVendorID(a.Ctx)
+	file, header, err := a.Ctx.FormFile("file")
+	if err != nil {
+		responseError(a.Ctx, ErrNormal, err.Error())
+		return
+	}
+	fileBytes, err := io.ReadAll(file)
+	if err != nil {
+		responseError(a.Ctx, ErrNormal, err.Error())
+		return
+	}
+	params.FileSize = header.Size
+	params.File = fileBytes
+	params.FileName = header.Filename
+	err = a.Service.Upgrade(params)
+	if err != nil {
+		responseError(a.Ctx, ErrNormal, err.Error())
+		return
+	}
+	done(a.Ctx, params)
+}
+
+// OtaProgress ota升级
+// GET /device/ota/progress?device_id=
+func (a *DeviceController) OtaProgress() {
+	deviceId := a.Ctx.URLParam("device_id")
 
-//// OtaProgress ota升级
-//// GET /devices/ota/progress?deviceId=
-//func (a *DeviceController) OtaProgress() {
-//	deviceId := a.Ctx.URLParam("deviceId")
-//
-//	data, err := a.Service.GetUpgradeProgress(deviceId)
-//	if err != nil {
-//		responseError(a.Ctx, ErrDatabase, err.Error())
-//		return
-//	}
-//	done(a.Ctx, map[string]interface{}{
-//		"progress": data.Progress,
-//	})
-//}
+	data, err := a.Service.GetUpgradeProgress(deviceId)
+	if err != nil {
+		responseError(a.Ctx, ErrDatabase, err.Error())
+		return
+	}
+	done(a.Ctx, map[string]interface{}{
+		"progress": data.Progress,
+	})
+}
+
+// GetStatus 获取设备信息
+// GET /device/status?device_id=
+func (a *DeviceController) GetStatus() {
+	deviceId := a.Ctx.URLParam("device_id")
+
+	data, err := a.Service.GetDeviceStatus(deviceId)
+	if err != nil {
+		responseError(a.Ctx, ErrDatabase, err.Error())
+		return
+	}
+	done(a.Ctx, map[string]interface{}{
+		"data": data.MustToJsonString(),
+	})
+}
+
+// UpdateSplitInfo
+// POST /device/split_info
+func (a *DeviceController) UpdateSplitInfo() {
+
+	d := new(models.UpgradeParams)
+	if err := parseBody(a.Ctx, d); err != nil {
+		badRequest(a.Ctx, err)
+		return
+	}
+
+	err := a.Service.GetSplitInfo(d.DeviceID)
+	if err != nil {
+		responseError(a.Ctx, ErrDatabase, err.Error())
+		return
+	}
+	done(a.Ctx, d.DeviceID)
+}

+ 80 - 45
services/knowoapi/services/device.go

@@ -1,6 +1,8 @@
 package services
 
 import (
+	"github.com/gogf/gf/encoding/gjson"
+	"github.com/gogf/gf/util/guid"
 	"sparrow/pkg/models"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
@@ -23,10 +25,14 @@ type DeviceService interface {
 	GetDevices(vendorid, proid string, pi, ps int, deviceid string) ([]*models.Devices, int, error)
 	//获取用户下所有设备的数量,在线设备的数量,离线设备的数量
 	GetDevicesCountByVenderId(vendorid string) (map[string]interface{}, error)
-	//// 发起设备OTA升级
-	//Upgrade(params *models.UpgradeParams) error
-	//// GetUpgradeProgress 获取ota升级进度
-	//GetUpgradeProgress(deviceId string) (rpcs.ReplyOtaProgress, error)
+	// 发起设备OTA升级
+	Upgrade(params *models.UpgradeParams) error
+	// GetUpgradeProgress 获取ota升级进度
+	GetUpgradeProgress(deviceId string) (rpcs.ReplyOtaProgress, error)
+	// 获取设备状态数据
+	GetDeviceStatus(deviceId string) (gjson.Json, error)
+	// 获取分体式五恒获取网关信息
+	GetSplitInfo(deviceId string) error
 }
 
 type deviceservice struct {
@@ -128,44 +134,73 @@ func (a deviceservice) GetDevicesCountByVenderId(vendorid string) (map[string]in
 	return deviceCount, nil
 }
 
-//func (a deviceservice) Upgrade(param *models.UpgradeParams) error {
-//
-//	var fileArgs rpcs.ArgsOtaFile
-//	fileArgs.FileData = param.File
-//	fileArgs.FileId = guid.S()
-//	var reply rpcs.ReplyEmptyResult
-//
-//	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.SavaFile", fileArgs, &reply)
-//	if err != nil {
-//		server.Log.Errorf("OTA升级文件保存失败:%v", err)
-//		return err
-//	}
-//
-//	var args rpcs.ArgsUpgrade4G
-//	args.DeviceId = param.DeviceID
-//	args.FileId = fileArgs.FileId
-//	args.FileSize = param.FileSize
-//
-//	err = server.RPCCallByName(nil, rpcs.MQTTAccessName, "Access.UpgradeFor4G", args, &reply)
-//	if err != nil {
-//		server.Log.Errorf("4G模组OTA升级失败:%v", err)
-//		return err
-//	}
-//
-//	return nil
-//}
-//
-//func (a deviceservice) GetUpgradeProgress(deviceId string) (rpcs.ReplyOtaProgress, error) {
-//	var args rpcs.ArgsOtaProgress
-//	args.DeviceId = deviceId
-//
-//	var reply rpcs.ReplyOtaProgress
-//
-//	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetProgress", args, &reply)
-//	if err != nil {
-//		server.Log.Errorf("OTA升级进度获取失败:%v", err)
-//		return reply, err
-//	}
-//
-//	return reply, nil
-//}
+func (a deviceservice) Upgrade(param *models.UpgradeParams) error {
+
+	var fileArgs rpcs.ArgsOtaFile
+	fileArgs.FileData = param.File
+	fileArgs.FileId = guid.S()
+	var reply rpcs.ReplyEmptyResult
+
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.SavaFile", fileArgs, &reply)
+	if err != nil {
+		server.Log.Errorf("OTA升级文件保存失败:%v", err)
+		return err
+	}
+
+	var args rpcs.ArgsUpgrade4G
+	args.DeviceId = param.DeviceID
+	args.FileId = fileArgs.FileId
+	args.FileSize = param.FileSize
+
+	err = server.RPCCallByName(nil, rpcs.MQTTAccessName, "Access.UpgradeFor4G", args, &reply)
+	if err != nil {
+		server.Log.Errorf("4G模组OTA升级失败:%v", err)
+		return err
+	}
+
+	return nil
+}
+
+func (a deviceservice) GetUpgradeProgress(deviceId string) (rpcs.ReplyOtaProgress, error) {
+	var args rpcs.ArgsOtaProgress
+	args.DeviceId = deviceId
+
+	var reply rpcs.ReplyOtaProgress
+
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetProgress", args, &reply)
+	if err != nil {
+		server.Log.Errorf("OTA升级进度获取失败:%v", err)
+		return reply, err
+	}
+
+	return reply, nil
+}
+
+func (a deviceservice) GetDeviceStatus(deviceId string) (gjson.Json, error) {
+	var args rpcs.ArgsGetStatus
+	args.Id = deviceId
+
+	var reply rpcs.ReplyStatus
+
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceStatus", args, &reply)
+	if err != nil {
+		server.Log.Errorf("设备状态数据获取失败:%v", err)
+		return reply.Status, err
+	}
+
+	return reply.Status, nil
+}
+
+func (a deviceservice) GetSplitInfo(deviceId string) error {
+	var args rpcs.ArgsSendCommand
+	args.DeviceId = deviceId
+	args.Cmd = "getInfo"
+	var reply rpcs.ReplySendCommand
+	err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.SendCommand", args, &reply)
+	if err != nil {
+		server.Log.Errorf("设备状态数据获取失败:%v", err)
+		return err
+	}
+
+	return nil
+}

+ 89 - 1
services/mqttaccess/access.go

@@ -1,6 +1,13 @@
 package main
 
 import (
+	"bytes"
+	"encoding/json"
+	"errors"
+	"fmt"
+	MQTT "github.com/eclipse/paho.mqtt.golang"
+	"github.com/gogf/gf/v2/encoding/gbinary"
+	"github.com/gogf/gf/v2/util/gconv"
 	"sparrow/pkg/klink"
 	"sparrow/pkg/mqtt"
 	"sparrow/pkg/protocol"
@@ -17,12 +24,13 @@ const (
 
 type Access struct {
 	MqttBroker *mqtt.Broker
+	MQTT.Client
 }
 
 func NewAccess() (*Access, error) {
 	p := NewMQTTProvider()
 	return &Access{
-		mqtt.NewBroker(p),
+		MqttBroker: mqtt.NewBroker(p),
 	}, nil
 }
 
@@ -105,3 +113,83 @@ func (a *Access) Upgrade(args rpcs.ArgsDeviceUpgrade, reply *rpcs.ReplyEmptyResu
 	}
 	return a.MqttBroker.SendMessageToDevice(args.DeviceId, "c", msg, 5*time.Second)
 }
+
+// UpgradeInfo 下发升级包信息
+// TODO: 实现
+func (a *Access) UpgradeFor4G(args rpcs.ArgsUpgrade4G, reply *rpcs.ReplyEmptyResult) error {
+	server.Log.Infof("4G模组OTA升级:%s", args.DeviceId)
+
+	cmd := &klink.CloudSend{
+		Action:     "cloudSend",
+		MsgId:      0,
+		DeviceCode: args.DeviceId,
+		Timestamp:  time.Now().Unix(),
+		Data: &klink.CloudSendData{
+			Cmd: "devUpgrade",
+			Params: map[string]interface{}{
+				"fileId":   args.FileId,
+				"fileSize": args.FileSize,
+			},
+		},
+	}
+
+	msg, err := cmd.Marshal()
+	if err != nil {
+		return err
+	}
+	return a.MqttBroker.SendMessageToDevice(args.DeviceId, "c", msg, 5*time.Second)
+}
+
+func (a *Access) ChunkUpgrade(args rpcs.ChunkUpgrade, reply *rpcs.ReplyEmptyResult) error {
+	server.Log.Infof("4G模组OTA升级:%s", args.DeviceId)
+
+	cmd := &klink.CloudSend{
+		Action:     "cloudSend",
+		MsgId:      0,
+		DeviceCode: args.DeviceId,
+		Timestamp:  time.Now().Unix(),
+		Data: &klink.CloudSendData{
+			Cmd: "devUpgrade",
+			Params: map[string]interface{}{
+				"fileId":   args.FileId,
+				"fileSize": args.FileSize,
+				"size":     args.Size,
+				"offset":   args.Offset,
+			},
+		},
+	}
+
+	byteCmd, err := json.Marshal(cmd)
+	if err != nil {
+		return err
+	}
+
+	buf := bytes.NewBuffer(gbinary.BeEncodeUint16(gconv.Uint16(len(byteCmd))))
+	buf.Write(byteCmd)
+
+	var fileArgs rpcs.ArgsOtaFile
+	fileArgs.FileId = args.FileId
+
+	var fileReply rpcs.ReplyOtaFile
+	err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetFile", fileArgs, &reply)
+	if err != nil {
+		server.Log.Errorf("OTA升级文件保存失败:%v", err)
+		return err
+	}
+
+	if fileReply.File == nil {
+		return errors.New(fmt.Sprintf("文件:%s 获取失败", args.FileId))
+	}
+	buf.Write(fileReply.File[args.Offset : args.Offset+int(args.Size)])
+
+	var mCrc crc
+	checkSum := mCrc.reset().pushBytes(buf.Bytes()).value()
+	buf.Write([]byte{byte(checkSum), byte(checkSum >> 8)})
+
+	var SendByteArgs rpcs.ArgsSendByteData
+	SendByteArgs.DeviceId = args.DeviceId
+	SendByteArgs.Data = buf.Bytes()
+	err = server.RPCCallByName(nil, rpcs.EmqxAgentServiceName, "Access.SendByteData", SendByteArgs, &reply)
+
+	return nil
+}

+ 68 - 0
services/mqttaccess/crc.go

@@ -0,0 +1,68 @@
+package main
+
+// Table of CRC values for high–order byte
+var crcHighBytes = []byte{
+	0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40,
+	0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41,
+	0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41,
+	0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40,
+	0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41,
+	0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40,
+	0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40,
+	0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41,
+	0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41,
+	0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40,
+	0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40,
+	0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41,
+	0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40,
+	0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41,
+	0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41,
+	0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40,
+}
+
+// Table of CRC values for low-order byte
+var crcLowBytes = []byte{
+	0x00, 0xC0, 0xC1, 0x01, 0xC3, 0x03, 0x02, 0xC2, 0xC6, 0x06, 0x07, 0xC7, 0x05, 0xC5, 0xC4, 0x04,
+	0xCC, 0x0C, 0x0D, 0xCD, 0x0F, 0xCF, 0xCE, 0x0E, 0x0A, 0xCA, 0xCB, 0x0B, 0xC9, 0x09, 0x08, 0xC8,
+	0xD8, 0x18, 0x19, 0xD9, 0x1B, 0xDB, 0xDA, 0x1A, 0x1E, 0xDE, 0xDF, 0x1F, 0xDD, 0x1D, 0x1C, 0xDC,
+	0x14, 0xD4, 0xD5, 0x15, 0xD7, 0x17, 0x16, 0xD6, 0xD2, 0x12, 0x13, 0xD3, 0x11, 0xD1, 0xD0, 0x10,
+	0xF0, 0x30, 0x31, 0xF1, 0x33, 0xF3, 0xF2, 0x32, 0x36, 0xF6, 0xF7, 0x37, 0xF5, 0x35, 0x34, 0xF4,
+	0x3C, 0xFC, 0xFD, 0x3D, 0xFF, 0x3F, 0x3E, 0xFE, 0xFA, 0x3A, 0x3B, 0xFB, 0x39, 0xF9, 0xF8, 0x38,
+	0x28, 0xE8, 0xE9, 0x29, 0xEB, 0x2B, 0x2A, 0xEA, 0xEE, 0x2E, 0x2F, 0xEF, 0x2D, 0xED, 0xEC, 0x2C,
+	0xE4, 0x24, 0x25, 0xE5, 0x27, 0xE7, 0xE6, 0x26, 0x22, 0xE2, 0xE3, 0x23, 0xE1, 0x21, 0x20, 0xE0,
+	0xA0, 0x60, 0x61, 0xA1, 0x63, 0xA3, 0xA2, 0x62, 0x66, 0xA6, 0xA7, 0x67, 0xA5, 0x65, 0x64, 0xA4,
+	0x6C, 0xAC, 0xAD, 0x6D, 0xAF, 0x6F, 0x6E, 0xAE, 0xAA, 0x6A, 0x6B, 0xAB, 0x69, 0xA9, 0xA8, 0x68,
+	0x78, 0xB8, 0xB9, 0x79, 0xBB, 0x7B, 0x7A, 0xBA, 0xBE, 0x7E, 0x7F, 0xBF, 0x7D, 0xBD, 0xBC, 0x7C,
+	0xB4, 0x74, 0x75, 0xB5, 0x77, 0xB7, 0xB6, 0x76, 0x72, 0xB2, 0xB3, 0x73, 0xB1, 0x71, 0x70, 0xB0,
+	0x50, 0x90, 0x91, 0x51, 0x93, 0x53, 0x52, 0x92, 0x96, 0x56, 0x57, 0x97, 0x55, 0x95, 0x94, 0x54,
+	0x9C, 0x5C, 0x5D, 0x9D, 0x5F, 0x9F, 0x9E, 0x5E, 0x5A, 0x9A, 0x9B, 0x5B, 0x99, 0x59, 0x58, 0x98,
+	0x88, 0x48, 0x49, 0x89, 0x4B, 0x8B, 0x8A, 0x4A, 0x4E, 0x8E, 0x8F, 0x4F, 0x8D, 0x4D, 0x4C, 0x8C,
+	0x44, 0x84, 0x85, 0x45, 0x87, 0x47, 0x46, 0x86, 0x82, 0x42, 0x43, 0x83, 0x41, 0x81, 0x80, 0x40,
+}
+
+// Cyclical Redundancy Checking
+type crc struct {
+	high byte
+	low  byte
+}
+
+func (crc *crc) reset() *crc {
+	crc.high = 0xFF
+	crc.low = 0xFF
+	return crc
+}
+
+func (crc *crc) pushBytes(bs []byte) *crc {
+	var idx, b byte
+
+	for _, b = range bs {
+		idx = crc.low ^ b
+		crc.low = crc.high ^ crcHighBytes[idx]
+		crc.high = crcLowBytes[idx]
+	}
+	return crc
+}
+
+func (crc *crc) value() uint16 {
+	return uint16(crc.high)<<8 | uint16(crc.low)
+}

+ 39 - 1
services/mqttaccess/mqtt_provider.go

@@ -110,6 +110,8 @@ func (mp *MQTTProvider) OnDeviceMessage(deviceid, vendorId string, msgtype strin
 				_ = processDevNetConfig(deviceid, message.GetString("md5"))
 			case klink.ReportFirmwareAction:
 				_ = processDeviceReportUpgrade(deviceid, message.GetString("version"))
+			case klink.DevUpgradeAction:
+				_ = processDeviceUpgrade(deviceid, message)
 			}
 		}
 	case "e":
@@ -198,7 +200,43 @@ func processDeviceReportUpgrade(deviceId, version string) error {
 	var reply rpcs.ReplyEmptyResult
 	err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply)
 	if err != nil {
-		server.Log.Errorf("更新设备版本号失败:%v", args, err)
+		server.Log.Errorf("更新设备版本号失败:%v", err)
 	}
 	return nil
 }
+
+func processDeviceUpgrade(deviceId string, message *gjson.Json) error {
+	var reply rpcs.ReplyEmptyResult
+	data := gjson.New(message.GetJson("data").MustToJson())
+	switch data.GetString("cmd") {
+	case "download":
+		params := gjson.New(data.GetJson("params").MustToJson())
+
+		args := &rpcs.ChunkUpgrade{
+			DeviceId: deviceId,
+			FileId:   params.GetString("fileId"),
+			FileSize: params.GetInt64("fileSize"),
+			Size:     params.GetInt64("size"),
+			Offset:   params.GetInt("offset"),
+		}
+
+		err := server.RPCCallByName(nil, rpcs.MQTTAccessName, "Access.ChunkUpgrade", args, &reply)
+		if err != nil {
+			server.Log.Errorf("分片下载发送失败:%v", err)
+		}
+
+	case "downProgress":
+		params := gjson.New(data.GetJson("params").MustToJson())
+		var args rpcs.ArgsOtaProgress
+		args.DeviceId = deviceId
+		args.Progress = params.GetInt("progress")
+
+		err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetProgress", args, &reply)
+		if err != nil {
+			server.Log.Errorf("OTA升级进度获取失败:%v", err)
+			return err
+		}
+	}
+
+	return nil
+}