scheduler.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package internal
  2. import (
  3. "encoding/json"
  4. "github.com/gogf/gf/container/gmap"
  5. "github.com/gogf/gf/encoding/gjson"
  6. "github.com/gogf/gf/v2/os/gcron"
  7. "sparrow/pkg/rule"
  8. "sparrow/pkg/server"
  9. )
  10. // TaskSchedule task schedule 任务调度
  11. type TaskSchedule struct {
  12. tasks *gmap.HashMap // 保存任务名称与任务实体的映射
  13. }
  14. func (t *TaskSchedule) AddMessageHandle(msg *rule.TaskLifecycleMessage) error {
  15. return nil
  16. }
  17. func NewTaskSchedule() *TaskSchedule {
  18. return &TaskSchedule{
  19. tasks: gmap.NewHashMap(true),
  20. }
  21. }
  22. func (t *TaskSchedule) DoTask(msg []byte) error {
  23. var task rule.TimerTaskMessage
  24. err := json.Unmarshal([]byte(msg), &task)
  25. if err != nil {
  26. return err
  27. }
  28. newJson := gjson.New(task.Actions)
  29. server.Log.Debugf("do task:actions:%s", newJson.MustToJsonString())
  30. // 创建任务
  31. err = NewTaskExecutor(task.Actions).Do()
  32. if err != nil {
  33. return err
  34. }
  35. return nil
  36. }
  37. func (t *TaskSchedule) RemoveMessageHandle(msg *rule.TaskLifecycleMessage) error {
  38. gcron.Remove(msg.TaskId)
  39. _ = t.tasks.Remove(msg.TaskId)
  40. server.Log.Debugf("RemoveMessageHandle :%s", msg.TaskId)
  41. return nil
  42. }
  43. func (t *TaskSchedule) UpdateMessageHandle(msg *rule.TaskLifecycleMessage) error {
  44. server.Log.Debugf("UpdateMessageHandle :%s", msg.TaskId)
  45. return nil
  46. }
  47. func (t *TaskSchedule) SnapMessageHandle(msg *rule.TaskLifecycleMessage) error {
  48. gcron.Stop(msg.TaskId)
  49. server.Log.Debugf("SnapMessageHandle :%s", msg.TaskId)
  50. return nil
  51. }
  52. func (t *TaskSchedule) StartMessageHandle(msg *rule.TaskLifecycleMessage) error {
  53. gcron.Start(msg.TaskId)
  54. server.Log.Debugf("StartMessageHandle :%s", msg.TaskId)
  55. return nil
  56. }
  57. func (t *TaskSchedule) StopMessageHandle(msg *rule.TaskLifecycleMessage) error {
  58. gcron.Stop(msg.TaskId)
  59. server.Log.Debugf("StopMessageHandle :%s", msg.TaskId)
  60. return nil
  61. }