timer.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package manager
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/gogf/gf/container/gmap"
  8. "github.com/gogf/gf/v2/os/gcron"
  9. "sparrow/pkg/server"
  10. )
  11. type TimerSceneConfig struct {
  12. SceneId string `json:"scene_id"`
  13. Conditions []*TimerTaskCondition `json:"conditions"`
  14. Actions []*Action `json:"actions"`
  15. }
  16. type TimerTaskCondition struct {
  17. TaskId string `json:"task_id"`
  18. Times int `json:"times"` // 执行次数 -1 表示无限次
  19. Cron string `json:"cron"`
  20. }
  21. // TimerSceneService 定时场景实现
  22. type TimerSceneService struct {
  23. task gmap.HashMap
  24. cron *gcron.Cron
  25. }
  26. // NewTimerSceneService 创建定时场景
  27. func NewTimerSceneService() *TimerSceneService {
  28. return &TimerSceneService{
  29. cron: gcron.New(),
  30. }
  31. }
  32. func (t *TimerSceneService) Add(config string) error {
  33. var c TimerSceneConfig
  34. err := json.Unmarshal([]byte(config), &c)
  35. if err != nil {
  36. server.Log.Errorf("config to timerConfig error :%s", err.Error())
  37. return err
  38. }
  39. //if len(c.Conditions) == 0 || len(c.Actions) == 0 {
  40. // server.Log.Errorf("timer scene-manager config error")
  41. // return errors.New("timer scene-manager config error")
  42. //}
  43. for _, v := range c.Conditions {
  44. err = t.addTask(c.SceneId, v, c.Actions)
  45. if err != nil {
  46. server.Log.Errorf("add timerTask error:sceneId:%s, taskId: %s, error: %v", c.SceneId, v.TaskId, err)
  47. return err
  48. }
  49. }
  50. t.task.Set(c.SceneId, c)
  51. //server.Log.Debugf("AddTimeScene :%s", c.SceneId)
  52. return nil
  53. }
  54. func (t *TimerSceneService) addTask(sceneId string, c *TimerTaskCondition, actions []*Action) error {
  55. _, err := t.cron.AddTimes(context.Background(), c.Cron, c.Times, func(ctx context.Context) {
  56. taskExecutor := NewTaskExecutor(actions)
  57. if err := taskExecutor.Do(sceneId); err != nil {
  58. server.Log.Errorf("do task :%s error:%s", c.TaskId, err.Error())
  59. }
  60. err := taskExecutor.saveHis(sceneId, []string{c.TaskId})
  61. if err != nil {
  62. server.Log.Errorf("save task history error:sceneId:%s", sceneId)
  63. }
  64. }, c.TaskId)
  65. return err
  66. }
  67. func (t *TimerSceneService) Update(config string) error {
  68. var c TimerSceneConfig
  69. err := json.Unmarshal([]byte(config), &c)
  70. if err != nil {
  71. server.Log.Errorf("config to timerConfig error :%s", err.Error())
  72. }
  73. if t.task.Contains(c.SceneId) {
  74. oldTask := t.task.Get(c.SceneId).(TimerSceneConfig)
  75. for _, v := range oldTask.Conditions {
  76. t.cron.Remove(v.TaskId)
  77. }
  78. }
  79. for _, v := range c.Conditions {
  80. err = t.addTask(c.SceneId, v, c.Actions)
  81. if err != nil {
  82. server.Log.Errorf("add timerTask error:sceneId:%s, taskId: %s, error: %v", c.SceneId, v.TaskId, err)
  83. return err
  84. }
  85. }
  86. t.task.Set(c.SceneId, c)
  87. server.Log.Debugf("UpdateTimeScene :%s", config)
  88. return nil
  89. }
  90. func (t *TimerSceneService) Remove(id string) error {
  91. if !t.task.Contains(id) {
  92. return errors.New("场景不存在\n")
  93. }
  94. scene := t.task.Get(id).(TimerSceneConfig)
  95. for _, v := range scene.Conditions {
  96. t.cron.Remove(v.TaskId)
  97. fmt.Printf("remove taskId:%s\n", v.TaskId)
  98. }
  99. server.Log.Debugf("RemoveTimeScene :%s\n", scene.SceneId)
  100. t.task.Remove(id)
  101. return nil
  102. }
  103. func (t *TimerSceneService) Start(id string) error {
  104. if !t.task.Contains(id) {
  105. return errors.New("场景不存在")
  106. }
  107. scene := t.task.Get(id).(TimerSceneConfig)
  108. for _, v := range scene.Conditions {
  109. t.cron.Start(v.TaskId)
  110. }
  111. server.Log.Debugf("StartTimeScene :%s", scene.SceneId)
  112. return nil
  113. }
  114. func (t *TimerSceneService) Stop(id string) error {
  115. if !t.task.Contains(id) {
  116. return errors.New("场景不存在")
  117. }
  118. scene := t.task.Get(id).(TimerSceneConfig)
  119. for _, v := range scene.Conditions {
  120. t.cron.Stop(v.TaskId)
  121. }
  122. server.Log.Debugf("StopTimeScene :%s", scene.SceneId)
  123. return nil
  124. }