executer.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package internal
  2. import (
  3. "fmt"
  4. "sparrow/pkg/rpcs"
  5. "sparrow/pkg/rule"
  6. "sparrow/pkg/server"
  7. "time"
  8. )
  9. // TaskExecutor 任务执行器,用来执行具体的任务动作
  10. type TaskExecutor struct {
  11. Actions []*rule.TaskAction
  12. }
  13. func NewTaskExecutor(actions []*rule.TaskAction) *TaskExecutor {
  14. return &TaskExecutor{
  15. Actions: actions,
  16. }
  17. }
  18. func (a *TaskExecutor) Do() error {
  19. for _, action := range a.Actions {
  20. switch action.ActionExecutor {
  21. case "delay":
  22. if err := a.doDelayTask(action.EntityId, action.SubEntityId, action.ExecutorProperty); err != nil {
  23. return err
  24. }
  25. case "device_issue":
  26. if err := a.doDeviceIssueTask(action.EntityId, action.SubEntityId, action.ExecutorProperty); err != nil {
  27. return err
  28. }
  29. }
  30. time.Sleep(100 * time.Millisecond)
  31. }
  32. return nil
  33. }
  34. func (a *TaskExecutor) doDeviceIssueTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
  35. // 调用设备接入服务
  36. rpchost, err := getAccessRPCHost(entityId)
  37. if err != nil {
  38. return err
  39. }
  40. fmt.Printf("rpchost:%s", rpchost)
  41. args := rpcs.ArgsSendCommand{
  42. DeviceId: entityId,
  43. SubDevice: subEntityId,
  44. Cmd: action.FunctionCode,
  45. Params: action.FunctionValue,
  46. }
  47. reply := &rpcs.ReplyEmptyResult{}
  48. server.Log.Debugf("do Device Issue task args:%v", args)
  49. err = server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
  50. if err != nil {
  51. return err
  52. }
  53. return nil
  54. }
  55. // 执行延时任务
  56. func (a *TaskExecutor) doDelayTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
  57. time.Sleep(time.Duration(action.DelaySeconds) * time.Second)
  58. return a.doDeviceIssueTask(entityId, subEntityId, action)
  59. }
  60. func getAccessRPCHost(deviceid string) (string, error) {
  61. args := rpcs.ArgsGetDeviceOnlineStatus{
  62. Id: deviceid,
  63. }
  64. reply := &rpcs.ReplyGetDeviceOnlineStatus{}
  65. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply)
  66. if err != nil {
  67. return "", err
  68. }
  69. return reply.AccessRPCHost, nil
  70. }