executer.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package internal
  2. import (
  3. "fmt"
  4. "sparrow/pkg/rpcs"
  5. "sparrow/pkg/rule"
  6. "sparrow/pkg/server"
  7. "time"
  8. )
  9. // TaskExecutor 任务执行器,用来执行具体的任务动作
  10. type TaskExecutor struct {
  11. Actions []*rule.TaskAction
  12. }
  13. func NewTaskExecutor(actions []*rule.TaskAction) *TaskExecutor {
  14. return &TaskExecutor{
  15. Actions: actions,
  16. }
  17. }
  18. func (a *TaskExecutor) Do() error {
  19. for _, action := range a.Actions {
  20. switch action.ActionExecutor {
  21. case "delay":
  22. if err := a.doDelayTask(action.EntityId, action.SubEntityId, action.ExecutorProperty); err != nil {
  23. return err
  24. }
  25. case "device_issue":
  26. if err := a.doDeviceIssueTask(action.EntityId, action.SubEntityId, action.ExecutorProperty); err != nil {
  27. return err
  28. }
  29. }
  30. }
  31. return nil
  32. }
  33. func (a *TaskExecutor) doDeviceIssueTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
  34. // 调用设备接入服务
  35. rpchost, err := getAccessRPCHost(entityId)
  36. if err != nil {
  37. return err
  38. }
  39. fmt.Printf("rpchost:%s", rpchost)
  40. args := rpcs.ArgsSendCommand{
  41. DeviceId: entityId,
  42. SubDevice: subEntityId,
  43. Cmd: action.FunctionCode,
  44. Params: action.FunctionValue,
  45. }
  46. reply := &rpcs.ReplyEmptyResult{}
  47. server.Log.Debugf("do Device Issue task args:%v", args)
  48. return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
  49. }
  50. // 执行延时任务
  51. func (a *TaskExecutor) doDelayTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
  52. time.Sleep(time.Duration(action.DelaySeconds) * time.Second)
  53. return a.doDeviceIssueTask(entityId, subEntityId, action)
  54. }
  55. func getAccessRPCHost(deviceid string) (string, error) {
  56. args := rpcs.ArgsGetDeviceOnlineStatus{
  57. Id: deviceid,
  58. }
  59. reply := &rpcs.ReplyGetDeviceOnlineStatus{}
  60. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply)
  61. if err != nil {
  62. return "", err
  63. }
  64. return reply.AccessRPCHost, nil
  65. }