executer.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package internal
  2. import (
  3. "fmt"
  4. "sparrow/pkg/rpcs"
  5. "sparrow/pkg/rule"
  6. "sparrow/pkg/server"
  7. "time"
  8. )
  9. // TaskExecutor 任务执行器,用来执行具体的任务动作
  10. type TaskExecutor struct {
  11. Actions []*rule.TaskAction
  12. }
  13. func NewTaskExecutor(actions []*rule.TaskAction) *TaskExecutor {
  14. return &TaskExecutor{
  15. Actions: actions,
  16. }
  17. }
  18. func (a *TaskExecutor) Do() error {
  19. for _, action := range a.Actions {
  20. switch action.ActionExecutor {
  21. case "delay":
  22. return a.doDelayTask(action.EntityId, action.SubEntityId, action.ExecutorProperty)
  23. case "device_issue":
  24. return a.doDeviceIssueTask(action.EntityId, action.SubEntityId, action.ExecutorProperty)
  25. }
  26. }
  27. return nil
  28. }
  29. func (a *TaskExecutor) doDeviceIssueTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
  30. // 调用设备接入服务
  31. rpchost, err := getAccessRPCHost(entityId)
  32. if err != nil {
  33. return err
  34. }
  35. fmt.Printf("rpchost:%s", rpchost)
  36. args := rpcs.ArgsSendCommand{
  37. DeviceId: entityId,
  38. SubDevice: subEntityId,
  39. Cmd: action.FunctionCode,
  40. Params: action.FunctionValue,
  41. }
  42. reply := &rpcs.ReplyEmptyResult{}
  43. server.Log.Debugf("do Device Issue task args:%v", args)
  44. return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
  45. }
  46. // 执行延时任务
  47. func (a *TaskExecutor) doDelayTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
  48. time.Sleep(time.Duration(action.DelaySeconds) * time.Second)
  49. return a.doDeviceIssueTask(entityId, subEntityId, action)
  50. }
  51. func getAccessRPCHost(deviceid string) (string, error) {
  52. args := rpcs.ArgsGetDeviceOnlineStatus{
  53. Id: deviceid,
  54. }
  55. reply := &rpcs.ReplyGetDeviceOnlineStatus{}
  56. fmt.Println("ggggggggggggggggggggggggggg")
  57. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply)
  58. if err != nil {
  59. return "", err
  60. }
  61. return reply.AccessRPCHost, nil
  62. }