executer.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package internal
  2. import (
  3. "sparrow/pkg/rpcs"
  4. "sparrow/pkg/rule"
  5. "sparrow/pkg/server"
  6. "time"
  7. )
  8. // TaskExecutor 任务执行器,用来执行具体的任务动作
  9. type TaskExecutor struct {
  10. Actions []*rule.TaskAction
  11. }
  12. func NewTaskExecutor(actions []*rule.TaskAction) *TaskExecutor {
  13. return &TaskExecutor{
  14. Actions: actions,
  15. }
  16. }
  17. func (a *TaskExecutor) Do() error {
  18. for _, action := range a.Actions {
  19. switch action.ActionExecutor {
  20. case "delay":
  21. return a.doDelayTask(action.EntityId, action.SubEntityId, action.ExecutorProperty)
  22. case "device_issue":
  23. return a.doDeviceIssueTask(action.EntityId, action.SubEntityId, action.ExecutorProperty)
  24. }
  25. }
  26. return nil
  27. }
  28. func (a *TaskExecutor) doDeviceIssueTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
  29. // 调用设备接入服务
  30. rpchost, err := getAccessRPCHost(entityId)
  31. if err != nil {
  32. return err
  33. }
  34. args := rpcs.ArgsSendCommand{
  35. DeviceId: entityId,
  36. SubDevice: subEntityId,
  37. Cmd: action.FunctionCode,
  38. Params: action.FunctionValue,
  39. }
  40. reply := &rpcs.ReplyEmptyResult{}
  41. server.Log.Debugf("do Device Issue task args:%v", args)
  42. return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
  43. }
  44. // 执行延时任务
  45. func (a *TaskExecutor) doDelayTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
  46. time.Sleep(time.Duration(action.DelaySeconds) * time.Second)
  47. return a.doDeviceIssueTask(entityId, subEntityId, action)
  48. }
  49. func getAccessRPCHost(deviceid string) (string, error) {
  50. args := rpcs.ArgsGetDeviceOnlineStatus{
  51. Id: deviceid,
  52. }
  53. reply := &rpcs.ReplyGetDeviceOnlineStatus{}
  54. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply)
  55. if err != nil {
  56. return "", err
  57. }
  58. return reply.AccessRPCHost, nil
  59. }