package main import ( "context" "encoding/json" "errors" "sparrow/pkg/productconfig" "sparrow/pkg/rpcs" "strings" "github.com/gogf/gf/encoding/gjson" "github.com/opentracing/opentracing-go/ext" "github.com/opentracing/opentracing-go" "net/http" "sparrow/pkg/models" "sparrow/pkg/server" "github.com/go-martini/martini" "github.com/martini-contrib/render" ) const ( ErrOK = 0 ErrSystemFault = 10001 ErrProductNotFound = 10002 ErrDeviceNotFound = 10003 ErrDeviceNotOnline = 10004 ErrWrongRequestFormat = 10005 ErrWrongProductConfig = 10006 ErrWrongQueryFormat = 10007 ErrAccessDenied = 10008 ErrIllegalityAction = 10009 //非法操作 ErrWrongSecret = 10010 // ) var ( // ErrBadRequestString 参数不全错误 errBadRequestString = errors.New("请求参数不全") errIllegalityString = errors.New("非法操作") ) const ( defaultTimeOut = 0 // seconds ) func renderError(code int, err error) Common { result := Common{} result.Code = code result.Message = err.Error() server.Log.Error(err.Error()) return result } func done(result interface{}) Common { return Common{ Code: ErrOK, Message: "success", Result: result, } } // GetDeviceInfoByKey get device info with device key func GetDeviceInfoByKey(params martini.Params, req *http.Request, r render.Render) { key := req.URL.Query().Get("device_key") server.Log.Printf("ACTION GetDeviceInfoByKey, key:: %v", key) device := &models.Device{} span, ctx := opentracing.StartSpanFromContext(context.Background(), "GetDeviceInfoByKey") defer span.Finish() ext.SpanKindRPCClient.Set(span) span.SetTag("device_key", key) err := server.RPCCallByName(ctx, rpcs.RegistryServerName, "Registry.ValidateDevice", key, device) if err != nil { r.JSON(http.StatusOK, renderError(ErrDeviceNotFound, err)) return } result := DeviceInfoResponse{ Data: DeviceInfoData{ Identifier: device.DeviceIdentifier, Name: device.DeviceName, Description: device.DeviceDescription, Version: device.DeviceVersion, }, } r.JSON(http.StatusOK, result) return } // GetDeviceInfoByIdentifier get device info with device identifier func GetDeviceInfoByIdentifier(urlparams martini.Params, r render.Render) { identifier := urlparams["identifier"] server.Log.Printf("ACTION GetDeviceInfoByIdentifier, identifier:: %v", identifier) device := &models.Device{} err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier2", identifier, device) if err != nil { r.JSON(http.StatusOK, renderError(ErrDeviceNotFound, err)) return } result := DeviceInfoResponse{ Data: DeviceInfoData{ Identifier: device.DeviceIdentifier, Name: device.DeviceName, Description: device.DeviceDescription, Version: device.DeviceVersion, }, } r.JSON(http.StatusOK, result) return } func GetDeviceCurrentStatus(device *models.Device, config *productconfig.ProductConfig, urlparams martini.Params, r render.Render) { server.Log.Printf("ACTION GetDeviceCurrentStatus, identifier:: %v", device.DeviceIdentifier) // 1. 触发设备上报状态(下发指令,不等待返回) triggerArgs := rpcs.ArgsGetStatus{Id: device.DeviceIdentifier} triggerReply := rpcs.ReplyGetStatus{} err := server.RPCCallByName(context.Background(), rpcs.ControllerName, "Controller.GetStatus", triggerArgs, &triggerReply) if err != nil { server.Log.Errorf("trigger device status error: %v", err) } // 2. 从 DeviceManager 获取实际存储的状态数据 args := rpcs.ArgsGetStatus{Id: device.DeviceIdentifier} reply := rpcs.ReplyStatus{} err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceStatus", args, &reply) if err != nil { server.Log.Errorf("获取设备状态数据失败: %v", err) r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } if reply.Status == "" { r.JSON(http.StatusOK, DeviceStatusResponse{Data: map[string]interface{}{}}) return } // 3. 解析 JSON 字符串 var status map[string]interface{} if err := json.Unmarshal([]byte(reply.Status), &status); err != nil { server.Log.Errorf("解析设备状态数据失败: %v", err) r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } r.JSON(http.StatusOK, DeviceStatusResponse{Data: status}) return } func GetDeviceStatusByFields(device *models.Device, config *productconfig.ProductConfig, urlparams martini.Params, req *http.Request, r render.Render) { server.Log.Printf("ACTION GetDeviceStatusByFields, identifier:: %v", device.DeviceIdentifier) fieldsParam := req.URL.Query().Get("fields") if fieldsParam == "" { r.JSON(http.StatusOK, renderError(ErrWrongQueryFormat, errBadRequestString)) return } requestFields := strings.Split(fieldsParam, ",") // 触发设备上报状态 triggerArgs := rpcs.ArgsGetStatus{Id: device.DeviceIdentifier} triggerReply := rpcs.ReplyGetStatus{} err := server.RPCCallByName(context.Background(), rpcs.ControllerName, "Controller.GetStatus", triggerArgs, &triggerReply) if err != nil { server.Log.Errorf("trigger device status error: %v", err) } // 从 DeviceManager 获取实际存储的状态数据 args := rpcs.ArgsGetStatus{Id: device.DeviceIdentifier} reply := rpcs.ReplyStatus{} err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceStatus", args, &reply) if err != nil { server.Log.Errorf("获取设备状态数据失败: %v", err) r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } if reply.Status == "" { r.JSON(http.StatusOK, DeviceStatusFieldResponse{Data: map[string]interface{}{}}) return } // 解析 JSON 字符串 var status map[string]interface{} if err := json.Unmarshal([]byte(reply.Status), &status); err != nil { server.Log.Errorf("解析设备状态数据失败: %v", err) r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } server.Log.Debugf("GetDeviceStatusByFields status keys: %v", func() []string { keys := make([]string, 0, len(status)) for k := range status { keys = append(keys, k) } return keys }()) // 按指定字段过滤 filtered := make(map[string]interface{}) for _, field := range requestFields { if val, ok := status[field]; ok { filtered[field] = val } } r.JSON(http.StatusOK, DeviceStatusFieldResponse{Data: filtered}) return } // GetDeviceLatestStatus 获取设备最新状态(直接查询,不触发上报) func GetDeviceLatestStatus(device *models.Device, config *productconfig.ProductConfig, urlparams martini.Params, r render.Render) { server.Log.Printf("ACTION GetDeviceLatestStatus, identifier:: %v", device.DeviceIdentifier) // 直接从 DeviceManager 获取存储的状态数据 args := rpcs.ArgsGetStatus{Id: device.DeviceIdentifier} reply := rpcs.ReplyStatus{} err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceStatus", args, &reply) if err != nil { server.Log.Errorf("获取设备状态数据失败: %v", err) r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } if reply.Status == "" { r.JSON(http.StatusOK, DeviceStatusResponse{Data: map[string]interface{}{}}) return } var status map[string]interface{} if err := json.Unmarshal([]byte(reply.Status), &status); err != nil { server.Log.Errorf("解析设备状态数据失败: %v", err) r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } r.JSON(http.StatusOK, DeviceStatusResponse{Data: status}) return } // DeviceUpgrade 设备OTA升级 func DeviceUpgrade(device *models.Device, urlparams martini.Params, req *http.Request, r render.Render) { var param DeviceUpgradeReq decoder := json.NewDecoder(req.Body) err := decoder.Decode(¶m) if err != nil { r.JSON(http.StatusOK, renderError(ErrWrongRequestFormat, err)) return } server.Log.Infof("设备OTA升级:%s, %s", param.DeviceId, param.Version) //var args rpcs.ArgsDeviceUpgrade //args.DeviceId = param.DeviceId //args.SudDeviceId = param.SubDeviceId //args.Url = param.Url //args.Md5 = param.MD5 //args.Version = param.Version //args.FileSize = param.FileSize var reply rpcs.ReplyEmptyResult args := rpcs.ArgsSendCommand{ DeviceId: param.DeviceId, SubDevice: param.SubDeviceId, Cmd: "devUpgrade", Params: map[string]interface{}{ "md5": param.MD5, "url": param.Url, "version": param.Version, "file_size": param.FileSize, }, } err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.SendCommand", args, &reply) //err = server.RPCCallByName(context.Background(), rpcs.MQTTAccessName, "Access.Upgrade", args, &reply) if err != nil { server.Log.Errorf("设备OTA升级失败:", err) r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } r.JSON(http.StatusOK, Common{}) return } // SetDeviceStatus set device status func SetDeviceStatus(device *models.Device, config *productconfig.ProductConfig, urlparams martini.Params, req *http.Request, r render.Render) { server.Log.Printf("ACTION GetDeviceCurrentStatus, identifier:: %v,request: %v", device.DeviceIdentifier, req.Body) var args interface{} decoder := json.NewDecoder(req.Body) err := decoder.Decode(&args) if err != nil { r.JSON(http.StatusOK, renderError(ErrWrongRequestFormat, err)) return } m, ok := args.(map[string]interface{}) if !ok { r.JSON(http.StatusOK, renderError(ErrWrongRequestFormat, err)) return } status, err := config.MapToStatus(m) if err != nil { r.JSON(http.StatusOK, renderError(ErrWrongRequestFormat, err)) return } statusargs := rpcs.ArgsSetStatus{ DeviceId: device.RecordId, Status: status, } statusreply := rpcs.ReplySetStatus{} //opentracing span, ctx := opentracing.StartSpanFromContext(context.Background(), "SetDeviceStatus") defer span.Finish() ext.SpanKindRPCClient.Set(span) err = server.RPCCallByName(ctx, rpcs.ControllerName, "Controller.SetStatus", statusargs, &statusreply) if err != nil { server.Log.Errorf("set devie status error: %v", err) r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } r.JSON(http.StatusOK, Common{}) return } // SendCommandToDevice send command to device /* { "deviceCode": "5566", "subDeviceId": "1", "data": { "cmd": "powerControl", "params": { "power": 1, "temp":2 } } } */ func SendCommandToDevice(device *models.Device, config *productconfig.ProductConfig, urlparams martini.Params, req *http.Request, r render.Render) { timeout := req.URL.Query().Get("timeout") server.Log.Printf("ACTION SendCommandToDevice, identifier:: %v, request: %v, timeout: %v", device.DeviceIdentifier, req.Body, timeout) var args map[string]interface{} decoder := json.NewDecoder(req.Body) err := decoder.Decode(&args) if err != nil { r.JSON(http.StatusOK, renderError(ErrWrongRequestFormat, err)) return } j := gjson.New(args) cmdargs := rpcs.ArgsSendCommand{ DeviceId: device.DeviceIdentifier, SubDevice: j.GetString("subDeviceId"), WaitTime: uint32(defaultTimeOut), Params: j.GetMap("data.params"), Cmd: j.GetString("data.cmd"), } cmdreply := rpcs.ReplySendCommand{} err = server.RPCCallByName(context.Background(), rpcs.ControllerName, "Controller.SendCommand", cmdargs, &cmdreply) if err != nil { server.Log.Errorf("send devie command error: %v", err) r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } r.JSON(http.StatusOK, Common{}) return } func SendCommandToDeviceV2(device *models.Device, config *productconfig.ProductConfig, urlparams martini.Params, req *http.Request, r render.Render) { timeout := req.URL.Query().Get("timeout") server.Log.Printf("ACTION SendCommandToDevice, identifier:: %v, request: %v, timeout: %v", device.DeviceIdentifier, req.Body, timeout) var args map[string]interface{} decoder := json.NewDecoder(req.Body) err := decoder.Decode(&args) if err != nil { r.JSON(http.StatusOK, renderError(ErrWrongRequestFormat, err)) return } j := gjson.New(args) cmdargs := rpcs.ArgsSendCommand{ DeviceId: device.DeviceIdentifier, SubDevice: j.GetString("subDeviceId"), WaitTime: uint32(defaultTimeOut), Params: j.GetMap("data.params"), Cmd: j.GetString("data.cmd"), } cmdreply := rpcs.ReplySendCommand{} err = server.RPCCallByName(context.Background(), rpcs.ControllerName, "Controller.SendCommandV2", cmdargs, &cmdreply) if err != nil { server.Log.Errorf("send devie command error: %v", err) r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } r.JSON(http.StatusOK, Common{}) return } // AddRule 增加设备规则 func AddRule(device *models.Device, req *http.Request, r render.Render) { var ruleReq CreateRuleRequest decoder := json.NewDecoder(req.Body) err := decoder.Decode(&ruleReq) if err != nil { r.JSON(http.StatusOK, renderError(ErrWrongRequestFormat, err)) return } rule := &models.Rule{ DeviceID: device.RecordId, RuleType: ruleReq.Type, Trigger: ruleReq.Trigger, Target: ruleReq.Target, Action: ruleReq.Action, } reply := &rpcs.ReplyEmptyResult{} //opentracing span, ctx := opentracing.StartSpanFromContext(context.Background(), "AddRule") defer span.Finish() ext.SpanKindRPCClient.Set(span) err = server.RPCCallByName(ctx, rpcs.RegistryServerName, "Registry.CreateRule", rule, reply) if err != nil { server.Log.Errorf("create device rule error: %v", err) r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } r.JSON(http.StatusOK, Common{}) return } func AppAuth(req *http.Request, r render.Render) { var ruleReq rpcs.ArgsAppAuth decoder := json.NewDecoder(req.Body) err := decoder.Decode(&ruleReq) if err != nil { r.JSON(http.StatusOK, renderError(ErrWrongRequestFormat, err)) return } app := &models.Application{} err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindApplicationByAppKey", ruleReq, app) if err != nil { r.JSON(http.StatusOK, renderError(ErrWrongSecret, errors.New("invalid secret key"))) return } if app.SecretKey != ruleReq.Secretkey { // device secret is wrong. r.JSON(http.StatusOK, renderError(ErrWrongSecret, errors.New("wrong application secret"))) return } token, timeSnap := TokenMaker(app) result := AppAuthDataResponse{ AccessToken: token, ExpireAt: timeSnap, } r.JSON(http.StatusOK, Common{ Result: result, }) return } func CheckDeviceNetConfig(req *http.Request, r render.Render) { var params rpcs.ArgsCheckDeviceNetConfig params.DeviceCode = req.URL.Query().Get("device_code") params.Md5 = req.URL.Query().Get("md5") var reply rpcs.ReplyCheckDeviceNetConfig err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CheckDeviceNetConfig", ¶ms, &reply) if err != nil { r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } r.JSON(http.StatusOK, Common{ Result: reply.Result, }) } func CheckDeviceIsOnline(req *http.Request, r render.Render) { identifier := req.URL.Query().Get("device_code") device := &models.Device{} err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", identifier, device) if err != nil { r.JSON(http.StatusOK, renderError(ErrDeviceNotFound, err)) return } onlineargs := rpcs.ArgsGetDeviceOnlineStatus{ Id: device.DeviceIdentifier, } onlinereply := rpcs.ReplyGetDeviceOnlineStatus{} err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply) if err != nil || onlinereply.ClientIP == "" { r.JSON(http.StatusOK, Common{ Result: 2, }) } r.JSON(http.StatusOK, Common{ Result: 1, }) } func SubmitSceneTask(req *http.Request, r render.Render) { var ruleReq rpcs.ArgsSubmitTask decoder := json.NewDecoder(req.Body) err := decoder.Decode(&ruleReq) if err != nil { r.JSON(http.StatusOK, renderError(ErrWrongRequestFormat, err)) return } reply := rpcs.ReplySubmitTask{} server.Log.Debugf("submit sceneTask %v", ruleReq) err = server.RPCCallByName(nil, rpcs.SceneAccessServiceName, "SceneService.SubmitTask", ruleReq, &reply) if err != nil { server.Log.Errorf("submit sceneTask error: %v", err) r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } r.JSON(http.StatusOK, Common{}) return } func SubmitTaskLifecycle(req *http.Request, r render.Render) { var ruleReq rpcs.ArgsSubmitTaskLifecycle decoder := json.NewDecoder(req.Body) err := decoder.Decode(&ruleReq) if err != nil { r.JSON(http.StatusOK, renderError(ErrWrongRequestFormat, err)) return } reply := rpcs.ReplySubmitTask{} err = server.RPCCallByName(nil, rpcs.SceneAccessServiceName, "SceneService.SubmitTaskLifecycle", ruleReq, &reply) if err != nil { r.JSON(http.StatusOK, renderError(ErrWrongSecret, errors.New("invalid secret key"))) server.Log.Errorf("submit taskLifecycle error: %v", err) r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } r.JSON(http.StatusOK, Common{}) return } func SubmitSceneAction(req *http.Request, r render.Render) { var ruleReq rpcs.ArgsSubmitSceneAction decoder := json.NewDecoder(req.Body) err := decoder.Decode(&ruleReq) if err != nil { r.JSON(http.StatusOK, renderError(ErrWrongRequestFormat, err)) return } reply := rpcs.ReplySubmitSceneAction{} err = server.RPCCallByName(nil, rpcs.SceneServiceName, "SceneService.SubmitAction", ruleReq, &reply) if err != nil { r.JSON(http.StatusOK, renderError(ErrWrongSecret, errors.New("invalid secret key"))) server.Log.Errorf("submit scene-manager error: %v", err) r.JSON(http.StatusOK, renderError(ErrSystemFault, err)) return } r.JSON(http.StatusOK, Common{}) return }