package manager import ( "fmt" "github.com/gogf/gf/util/guid" "github.com/gogf/gf/v2/encoding/gjson" "sparrow/pkg/models" "sparrow/pkg/rpcs" "sparrow/pkg/server" "sparrow/pkg/utils" "time" ) type Action struct { DeviceID string `json:"device_id"` // 设备ID SubDeviceId string `json:"sub_device_id"` // 实体子设备Id,如果需要 ActionExecutor string `json:"action_executor"` // 动作对象类型 ExecutorProperty *TaskExecutorProperty `json:"executor_property"` // 动作执行明细 PlcPubMessage *PlcPubMessage `json:"plc_pub_message"` // PLC消息 } // TaskExecutorProperty 定时任务执行动作执行参数 type TaskExecutorProperty struct { /* 指令 code。当 action_executor 是 device_issue 或 device_group_issue 时,此参数必填。 */ FunctionCode string `json:"function_code"` /* 指令 value。当 action_executor 是 device_issue 或 device_group_issue 时,此参数必填。 */ FunctionValue map[string]interface{} `json:"function_value"` /* 延时时间。当 action_executor 是 delay 时,此参数必填。 */ DelaySeconds int64 `json:"delay_seconds"` } type PlcPubMessage struct { Topic string `json:"topic"` Payload []byte `json:"payload"` } // TaskExecutor 任务执行器,用来执行具体的任务动作 type TaskExecutor struct { Actions []*Action } func NewTaskExecutor(actions []*Action) *TaskExecutor { return &TaskExecutor{ Actions: actions, } } func (a *TaskExecutor) Do(id string) error { for _, action := range a.Actions { if err := a.doTask(action); err != nil { return err } err := a.saveHis(id, action) if err != nil { return err } } return nil } func (a *TaskExecutor) doTask(action *Action) error { // 调用设备接入服务 rpchost, err := getAccessRPCHost(action.DeviceID) if err != nil { return err } reply := &rpcs.ReplyEmptyResult{} if rpchost != "" { args := rpcs.ArgsSendCommand{ DeviceId: action.DeviceID, SubDevice: action.SubDeviceId, Cmd: action.ExecutorProperty.FunctionCode, Params: action.ExecutorProperty.FunctionValue, } server.Log.Debugf("do Device Issue task args:%v", args) return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply) } var publishArgs rpcs.ArgsPublishMessage publishArgs.Topic = action.PlcPubMessage.Topic publishArgs.Payload = action.PlcPubMessage.Payload err = server.RPCCallByName(nil, rpcs.EmqxAgentServiceName, "Access.PublishMessage", publishArgs, reply) if err != nil { server.Log.Errorf("plc设备发送消息失败:%v", err) } fmt.Printf("plc设备发送消息成功:topic:%s,payload:%s", publishArgs.Topic, publishArgs.Payload) return nil } // 执行延时任务 func (a *TaskExecutor) doDelayTask(action *Action) error { time.Sleep(time.Duration(action.ExecutorProperty.DelaySeconds) * time.Second) return a.doTask(action) } func getAccessRPCHost(deviceid string) (string, error) { args := rpcs.ArgsGetDeviceOnlineStatus{ Id: deviceid, } reply := &rpcs.ReplyGetDeviceOnlineStatus{} err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply) if err != nil { return "", err } return reply.AccessRPCHost, nil } func (a *TaskExecutor) saveHis(id string, action *Action) error { client := utils.NewHttpClient() url := "http://127.0.0.1:8199/iot/v1/scene_history" body := make(map[string]interface{}) body["scene_id"] = id body["time"] = time.Now() _, err := client.Post(url, "application/json", gjson.New(body)) if err != nil { server.Log.Errorf("sync his error:%s", err.Error()) } args := models.SceneHis{ RecordId: guid.S(), SceneID: id, DeviceId: action.DeviceID, SubDeviceId: action.SubDeviceId, } if action.ExecutorProperty.FunctionCode != "" { args.Cmd = action.ExecutorProperty.FunctionCode args.Params = gjson.New(action.ExecutorProperty.FunctionValue).MustToJsonString() } if action.PlcPubMessage != nil { args.Topic = action.PlcPubMessage.Topic args.Payload = string(action.PlcPubMessage.Payload) } return server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateSceneHis", args, &rpcs.ReplyEmptyResult{}) }