package service import ( "sparrow/pkg/rpcs" "sparrow/pkg/server" "time" ) type Action struct { DeviceID string `json:"device_id"` // 设备ID SubDeviceId string `json:"sub_device_id"` // 实体子设备Id,如果需要 ActionExecutor string `json:"action_executor"` // 动作对象类型 ExecutorProperty *TaskExecutorProperty `json:"executor_property"` // 动作执行明细 PlcPubMessage *PlcPubMessage `json:"plc_pub_message"` // PLC消息 } // TaskExecutorProperty 定时任务执行动作执行参数 type TaskExecutorProperty struct { /* 指令 code。当 action_executor 是 device_issue 或 device_group_issue 时,此参数必填。 */ FunctionCode string `json:"function_code"` /* 指令 value。当 action_executor 是 device_issue 或 device_group_issue 时,此参数必填。 */ FunctionValue map[string]interface{} `json:"function_value"` /* 延时时间。当 action_executor 是 delay 时,此参数必填。 */ DelaySeconds int64 `json:"delay_seconds"` } type PlcPubMessage struct { Topic string `json:"topic"` Payload []byte `json:"payload"` } // TaskExecutor 任务执行器,用来执行具体的任务动作 type TaskExecutor struct { Actions []*Action } func NewTaskExecutor(actions []*Action) *TaskExecutor { return &TaskExecutor{ Actions: actions, } } func (a *TaskExecutor) Do() error { for _, action := range a.Actions { if err := a.doTask(action); err != nil { return err } } return nil } func (a *TaskExecutor) doTask(action *Action) error { // 调用设备接入服务 rpchost, err := getAccessRPCHost(action.DeviceID) if err != nil { return err } reply := &rpcs.ReplyEmptyResult{} if rpchost != "" { args := rpcs.ArgsSendCommand{ DeviceId: action.DeviceID, SubDevice: action.SubDeviceId, Cmd: action.ExecutorProperty.FunctionCode, Params: action.ExecutorProperty.FunctionValue, } server.Log.Debugf("do Device Issue task args:%v", args) return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply) } var publishArgs rpcs.ArgsPublishMessage publishArgs.Topic = action.PlcPubMessage.Topic publishArgs.Payload = action.PlcPubMessage.Payload err = server.RPCCallByName(nil, rpcs.EmqxAgentServiceName, "Access.PublishMessage", publishArgs, reply) if err != nil { server.Log.Errorf("plc设备发送消息失败:%v", err) } return nil } // 执行延时任务 func (a *TaskExecutor) doDelayTask(action *Action) error { time.Sleep(time.Duration(action.ExecutorProperty.DelaySeconds) * time.Second) return a.doTask(action) } func getAccessRPCHost(deviceid string) (string, error) { args := rpcs.ArgsGetDeviceOnlineStatus{ Id: deviceid, } reply := &rpcs.ReplyGetDeviceOnlineStatus{} err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply) if err != nil { return "", err } return reply.AccessRPCHost, nil }