package internal import ( "fmt" "sparrow/pkg/rpcs" "sparrow/pkg/rule" "sparrow/pkg/server" "time" ) // TaskExecutor 任务执行器,用来执行具体的任务动作 type TaskExecutor struct { Actions []*rule.TaskAction } func NewTaskExecutor(actions []*rule.TaskAction) *TaskExecutor { return &TaskExecutor{ Actions: actions, } } func (a *TaskExecutor) Do() error { for _, action := range a.Actions { switch action.ActionExecutor { case "delay": if err := a.doDelayTask(action.EntityId, action.SubEntityId, action.ExecutorProperty); err != nil { return err } case "device_issue": if err := a.doDeviceIssueTask(action.EntityId, action.SubEntityId, action.ExecutorProperty); err != nil { return err } } } return nil } func (a *TaskExecutor) doDeviceIssueTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error { // 调用设备接入服务 rpchost, err := getAccessRPCHost(entityId) if err != nil { return err } fmt.Printf("rpchost:%s", rpchost) args := rpcs.ArgsSendCommand{ DeviceId: entityId, SubDevice: subEntityId, Cmd: action.FunctionCode, Params: action.FunctionValue, } reply := &rpcs.ReplyEmptyResult{} server.Log.Debugf("do Device Issue task args:%v", args) return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply) } // 执行延时任务 func (a *TaskExecutor) doDelayTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error { time.Sleep(time.Duration(action.DelaySeconds) * time.Second) return a.doDeviceIssueTask(entityId, subEntityId, action) } func getAccessRPCHost(deviceid string) (string, error) { args := rpcs.ArgsGetDeviceOnlineStatus{ Id: deviceid, } reply := &rpcs.ReplyGetDeviceOnlineStatus{} err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply) if err != nil { return "", err } return reply.AccessRPCHost, nil }