package manager import ( "bytes" "encoding/json" "fmt" "sparrow/pkg/rpcs" "sparrow/pkg/server" "sparrow/pkg/utils" "strings" "time" ) type Action struct { DeviceID string `json:"device_id"` // 设备ID SubDeviceId string `json:"sub_device_id"` // 实体子设备Id,如果需要 ActionExecutor string `json:"action_executor"` // 动作对象类型 ExecutorProperty *TaskExecutorProperty `json:"executor_property"` // 动作执行明细 PlcPubMessage *PlcPubMessage `json:"plc_pub_message"` // PLC消息 } // TaskExecutorProperty 定时任务执行动作执行参数 type TaskExecutorProperty struct { /* 指令 code。当 action_executor 是 device_issue 或 device_group_issue 时,此参数必填。 */ FunctionCode string `json:"function_code"` /* 指令 value。当 action_executor 是 device_issue 或 device_group_issue 时,此参数必填。 */ FunctionValue map[string]interface{} `json:"function_value"` /* 延时时间。当 action_executor 是 delay 时,此参数必填。 */ DelaySeconds int64 `json:"delay_seconds"` } type PlcPubMessage struct { Topic string `json:"topic"` Payload []byte `json:"payload"` } // TaskExecutor 任务执行器,用来执行具体的任务动作 type TaskExecutor struct { Actions []*Action client *utils.HttpClient } func NewTaskExecutor(actions []*Action) *TaskExecutor { client := utils.NewHttpClient() client.SetLogger(server.Log) return &TaskExecutor{ Actions: actions, client: client, } } func (a *TaskExecutor) Do(id string) error { for _, action := range a.Actions { if err := a.doTask(action); err != nil { return err } } return nil } func (a *TaskExecutor) doTask(action *Action) error { // 调用设备接入服务 rpchost, err := getAccessRPCHost(action.DeviceID) if err != nil { return err } reply := &rpcs.ReplyEmptyResult{} if rpchost != "" { args := rpcs.ArgsSendCommand{ DeviceId: action.DeviceID, SubDevice: action.SubDeviceId, Cmd: action.ExecutorProperty.FunctionCode, Params: action.ExecutorProperty.FunctionValue, } server.Log.Debugf("do Device Issue task args:%v", args) return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply) } var publishArgs rpcs.ArgsPublishMessage publishArgs.Topic = action.PlcPubMessage.Topic publishArgs.Payload = action.PlcPubMessage.Payload err = server.RPCCallByName(nil, rpcs.EmqxAgentServiceName, "Access.PublishMessage", publishArgs, reply) if err != nil { server.Log.Errorf("plc设备发送消息失败:%v", err) } fmt.Printf("plc设备发送消息成功:topic:%s,payload:%s", publishArgs.Topic, publishArgs.Payload) return nil } // 执行延时任务 func (a *TaskExecutor) doDelayTask(action *Action) error { time.Sleep(time.Duration(action.ExecutorProperty.DelaySeconds) * time.Second) return a.doTask(action) } func getAccessRPCHost(deviceid string) (string, error) { args := rpcs.ArgsGetDeviceOnlineStatus{ Id: deviceid, } reply := &rpcs.ReplyGetDeviceOnlineStatus{} err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply) if err != nil { return "", err } return reply.AccessRPCHost, nil } func (a *TaskExecutor) saveHis(id string, conditionId []string) error { url := "http://127.0.0.1:8199/iot/v1/scene_history" body := make(map[string]interface{}) body["scene_id"] = id if len(conditionId) > 0 { body["condition_id"] = strings.Join(conditionId, ",") } w := new(bytes.Buffer) if err := json.NewEncoder(w).Encode(body); err != nil { return err } req, err := utils.NewRequest("POST", url, w) if err != nil { server.Log.Error(err) return err } req.Header.Add("Content-Type", "application/json") _, err = a.client.Do(req) if err != nil { server.Log.Errorf("请求出错%s", err.Error()) } //_, err := a.client.Post(url, "application/json", gjson.New(body)) //if err != nil { // server.Log.Errorf("sync his error:%s", err.Error()) //} return err //args := models.SceneHis{ // RecordId: guid.S(), // SceneID: id, // DeviceId: action.DeviceID, // SubDeviceId: action.SubDeviceId, //} //if action.ExecutorProperty.FunctionCode != "" { // args.Cmd = action.ExecutorProperty.FunctionCode // args.Params = gjson.New(action.ExecutorProperty.FunctionValue).MustToJsonString() //} //if action.PlcPubMessage != nil { // args.Topic = action.PlcPubMessage.Topic // args.Payload = string(action.PlcPubMessage.Payload) //} //return server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateSceneHis", args, &rpcs.ReplyEmptyResult{}) }