12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- package internal
- import (
- "sparrow/pkg/rpcs"
- "sparrow/pkg/rule"
- "sparrow/pkg/server"
- "time"
- )
- // TaskExecutor 任务执行器,用来执行具体的任务动作
- type TaskExecutor struct {
- Actions []*rule.TaskAction
- }
- func NewTaskExecutor(actions []*rule.TaskAction) *TaskExecutor {
- return &TaskExecutor{
- Actions: actions,
- }
- }
- func (a *TaskExecutor) Do() error {
- for _, action := range a.Actions {
- switch action.ActionExecutor {
- case "delay":
- return a.doDelayTask(action.EntityId, action.SubEntityId, action.ExecutorProperty)
- case "device_issue":
- return a.doDeviceIssueTask(action.EntityId, action.SubEntityId, action.ExecutorProperty)
- }
- }
- return nil
- }
- func (a *TaskExecutor) doDeviceIssueTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
- // 调用设备接入服务
- rpchost, err := getAccessRPCHost(entityId)
- if err != nil {
- return err
- }
- args := rpcs.ArgsSendCommand{
- DeviceId: entityId,
- SubDevice: subEntityId,
- Cmd: action.FunctionCode,
- Params: action.FunctionValue,
- }
- reply := &rpcs.ReplyEmptyResult{}
- server.Log.Debugf("do Device Issue task args:%v", args)
- return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
- }
- // 执行延时任务
- func (a *TaskExecutor) doDelayTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
- time.Sleep(time.Duration(action.DelaySeconds) * time.Second)
- return a.doDeviceIssueTask(entityId, subEntityId, action)
- }
- func getAccessRPCHost(deviceid string) (string, error) {
- args := rpcs.ArgsGetDeviceOnlineStatus{
- Id: deviceid,
- }
- reply := &rpcs.ReplyGetDeviceOnlineStatus{}
- err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply)
- if err != nil {
- return "", err
- }
- return reply.AccessRPCHost, nil
- }
|