timer.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package manager
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/gogf/gf/container/gmap"
  8. "github.com/gogf/gf/v2/os/gcron"
  9. "sparrow/pkg/server"
  10. )
  11. type TimerSceneConfig struct {
  12. SceneId string `json:"scene_id"`
  13. Conditions []*TimerTaskCondition `json:"conditions"`
  14. Actions []*Action `json:"actions"`
  15. }
  16. type TimerTaskCondition struct {
  17. TaskId string `json:"task_id"`
  18. Times int `json:"times"` // 执行次数 -1 表示无限次
  19. Cron string `json:"cron"`
  20. }
  21. // TimerSceneService 定时场景实现
  22. type TimerSceneService struct {
  23. task *gmap.HashMap
  24. cron *gcron.Cron
  25. }
  26. // NewTimerSceneService 创建定时场景
  27. func NewTimerSceneService() *TimerSceneService {
  28. return &TimerSceneService{
  29. task: gmap.NewHashMap(true),
  30. cron: gcron.New(),
  31. }
  32. }
  33. func (t *TimerSceneService) Add(config string) error {
  34. var c TimerSceneConfig
  35. err := json.Unmarshal([]byte(config), &c)
  36. if err != nil {
  37. server.Log.Errorf("config to timerConfig error :%s", err.Error())
  38. return err
  39. }
  40. //if len(c.Conditions) == 0 || len(c.Actions) == 0 {
  41. // server.Log.Errorf("timer scene-manager config error")
  42. // return errors.New("timer scene-manager config error")
  43. //}
  44. if len(c.Conditions) == 0 || len(c.Actions) == 0 {
  45. return errors.New("timer scene-manager config error:conditions or actions is empty")
  46. }
  47. var addedTaskIds []string
  48. defer func() {
  49. // 如果添加失败,回滚已添加的Cron任务
  50. if err != nil && len(addedTaskIds) > 0 {
  51. for _, taskId := range addedTaskIds {
  52. t.cron.Remove(taskId)
  53. }
  54. server.Log.Errorf("add scene %s failed, rollback cron tasks: %v", c.SceneId, addedTaskIds)
  55. }
  56. }()
  57. for _, v := range c.Conditions {
  58. err = t.addTask(c.SceneId, v, c.Actions)
  59. if err != nil {
  60. server.Log.Errorf("add timerTask error:sceneId:%s, taskId: %s, error: %v", c.SceneId, v.TaskId, err)
  61. return err
  62. }
  63. addedTaskIds = append(addedTaskIds, v.TaskId)
  64. }
  65. t.task.Set(c.SceneId, c)
  66. server.Log.Debugf("AddTimeScene :%s", c.SceneId)
  67. return nil
  68. }
  69. func (t *TimerSceneService) addTask(sceneId string, c *TimerTaskCondition, actions []*Action) error {
  70. _, err := t.cron.AddTimes(context.Background(), c.Cron, c.Times, func(ctx context.Context) {
  71. taskExecutor := NewTaskExecutor(actions)
  72. if err := taskExecutor.Do(sceneId); err != nil {
  73. server.Log.Errorf("do task :%s error:%s", c.TaskId, err.Error())
  74. }
  75. err := taskExecutor.saveHis(sceneId, []string{c.TaskId})
  76. if err != nil {
  77. server.Log.Errorf("save task history error:sceneId:%s", sceneId)
  78. }
  79. }, c.TaskId)
  80. return err
  81. }
  82. func (t *TimerSceneService) Update(config string) error {
  83. var c TimerSceneConfig
  84. err := json.Unmarshal([]byte(config), &c)
  85. if err != nil {
  86. server.Log.Errorf("config to timerConfig error :%s", err.Error())
  87. return errors.New("timer scene-manager config error")
  88. }
  89. if t.task.Contains(c.SceneId) {
  90. oldTask := t.task.Get(c.SceneId).(TimerSceneConfig)
  91. for _, v := range oldTask.Conditions {
  92. t.cron.Remove(v.TaskId)
  93. }
  94. }
  95. var addedTaskIds []string
  96. defer func() {
  97. if err != nil && len(addedTaskIds) > 0 {
  98. for _, taskId := range addedTaskIds {
  99. t.cron.Remove(taskId)
  100. }
  101. server.Log.Errorf("update scene %s failed, rollback new cron tasks: %v", c.SceneId, addedTaskIds)
  102. }
  103. }()
  104. for _, v := range c.Conditions {
  105. err = t.addTask(c.SceneId, v, c.Actions)
  106. if err != nil {
  107. server.Log.Errorf("add timerTask error:sceneId:%s, taskId: %s, error: %v", c.SceneId, v.TaskId, err)
  108. return err
  109. }
  110. addedTaskIds = append(addedTaskIds, v.TaskId)
  111. }
  112. t.task.Set(c.SceneId, c)
  113. server.Log.Debugf("UpdateTimeScene :%s", config)
  114. return nil
  115. }
  116. func (t *TimerSceneService) Remove(id string) error {
  117. if !t.task.Contains(id) {
  118. return errors.New("场景不存在\n")
  119. }
  120. scene := t.task.Get(id).(TimerSceneConfig)
  121. for _, v := range scene.Conditions {
  122. t.cron.Remove(v.TaskId)
  123. fmt.Printf("remove taskId:%s\n", v.TaskId)
  124. }
  125. server.Log.Debugf("RemoveTimeScene :%s\n", scene.SceneId)
  126. t.task.Remove(id)
  127. return nil
  128. }
  129. func (t *TimerSceneService) Start(id string) error {
  130. if !t.task.Contains(id) {
  131. return errors.New("场景不存在\n")
  132. }
  133. scene := t.task.Get(id).(TimerSceneConfig)
  134. for _, v := range scene.Conditions {
  135. t.cron.Start(v.TaskId)
  136. }
  137. server.Log.Debugf("StartTimeScene :%s", scene.SceneId)
  138. return nil
  139. }
  140. func (t *TimerSceneService) Stop(id string) error {
  141. if !t.task.Contains(id) {
  142. return errors.New("场景不存在\n")
  143. }
  144. scene := t.task.Get(id).(TimerSceneConfig)
  145. for _, v := range scene.Conditions {
  146. t.cron.Stop(v.TaskId)
  147. }
  148. server.Log.Debugf("StopTimeScene :%s", scene.SceneId)
  149. return nil
  150. }