executer.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package internal
  2. import (
  3. "fmt"
  4. "sparrow/pkg/rpcs"
  5. "sparrow/pkg/rule"
  6. "sparrow/pkg/server"
  7. "time"
  8. )
  9. // TaskExecutor 任务执行器,用来执行具体的任务动作
  10. type TaskExecutor struct {
  11. Actions []*rule.TaskAction
  12. }
  13. func NewTaskExecutor(actions []*rule.TaskAction) *TaskExecutor {
  14. return &TaskExecutor{
  15. Actions: actions,
  16. }
  17. }
  18. func (a *TaskExecutor) Do() error {
  19. for _, action := range a.Actions {
  20. switch action.ActionExecutor {
  21. case "delay":
  22. return a.doDelayTask(action.EntityId, action.SubEntityId, action.ExecutorProperty)
  23. case "device_issue":
  24. return a.doDeviceIssueTask(action.EntityId, action.SubEntityId, action.ExecutorProperty)
  25. }
  26. }
  27. return nil
  28. }
  29. func (a *TaskExecutor) doDeviceIssueTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
  30. // 调用设备接入服务
  31. rpchost, err := getAccessRPCHost(entityId)
  32. if err != nil {
  33. return err
  34. }
  35. fmt.Printf("rpchost:%s", rpchost)
  36. args := rpcs.ArgsSendCommand{
  37. DeviceId: entityId,
  38. SubDevice: subEntityId,
  39. Cmd: action.FunctionCode,
  40. Params: action.FunctionValue,
  41. }
  42. reply := &rpcs.ReplyEmptyResult{}
  43. server.Log.Debugf("do Device Issue task args:%v", args)
  44. err = server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
  45. if err != nil {
  46. return err
  47. }
  48. time.Sleep(100 * time.Millisecond)
  49. return nil
  50. }
  51. // 执行延时任务
  52. func (a *TaskExecutor) doDelayTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
  53. time.Sleep(time.Duration(action.DelaySeconds) * time.Second)
  54. return a.doDeviceIssueTask(entityId, subEntityId, action)
  55. }
  56. func getAccessRPCHost(deviceid string) (string, error) {
  57. args := rpcs.ArgsGetDeviceOnlineStatus{
  58. Id: deviceid,
  59. }
  60. reply := &rpcs.ReplyGetDeviceOnlineStatus{}
  61. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply)
  62. if err != nil {
  63. return "", err
  64. }
  65. return reply.AccessRPCHost, nil
  66. }