123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- package manager
- import (
- "context"
- "encoding/json"
- "errors"
- "github.com/gogf/gf/container/gmap"
- "github.com/gogf/gf/v2/os/gcron"
- "sparrow/pkg/server"
- "sparrow/services/scene-service/internal/service"
- )
- type TimerSceneConfig struct {
- SceneId string `json:"scene_id"`
- Conditions []*TimerTaskCondition `json:"conditions"`
- Actions []*service.Action `json:"actions"`
- }
- type TimerTaskCondition struct {
- TaskId string `json:"task_id"`
- Times int `json:"times"` // 执行次数 -1 表示无限次
- Cron string `json:"cron"`
- }
- // TimerSceneService 定时场景实现
- type TimerSceneService struct {
- task gmap.HashMap
- cron *gcron.Cron
- }
- // NewTimerSceneService 创建定时场景
- func NewTimerSceneService() *TimerSceneService {
- return &TimerSceneService{
- cron: gcron.New(),
- }
- }
- func (t *TimerSceneService) Add(config string) error {
- var c TimerSceneConfig
- err := json.Unmarshal([]byte(config), &c)
- if err != nil {
- server.Log.Errorf("config to timerConfig error :%s", err.Error())
- return err
- }
- //if len(c.Conditions) == 0 || len(c.Actions) == 0 {
- // server.Log.Errorf("timer scene-manager config error")
- // return errors.New("timer scene-manager config error")
- //}
- for _, v := range c.Conditions {
- err = t.addTask(v, c.Actions)
- if err != nil {
- server.Log.Errorf("add timerTask error:sceneId:%s, taskId: %s, error: %v", c.SceneId, v.TaskId, err)
- return err
- }
- }
- t.task.Set(c.SceneId, c)
- //server.Log.Debugf("AddTimeScene :%s", c.SceneId)
- return nil
- }
- func (t *TimerSceneService) addTask(c *TimerTaskCondition, actions []*service.Action) error {
- _, err := t.cron.AddTimes(context.Background(), c.Cron, c.Times, func(ctx context.Context) {
- if err := service.NewTaskExecutor(actions).Do(); err != nil {
- server.Log.Errorf("do task :%s error:%s", c.TaskId, err.Error())
- }
- }, c.TaskId)
- return err
- }
- func (t *TimerSceneService) Update(config string) error {
- var c TimerSceneConfig
- err := json.Unmarshal([]byte(config), &c)
- if err != nil {
- server.Log.Errorf("config to timerConfig error :%s", err.Error())
- }
- if t.task.Contains(c.SceneId) {
- oldTask := t.task.Get(c.SceneId).(TimerSceneConfig)
- for _, v := range oldTask.Conditions {
- t.cron.Remove(v.TaskId)
- }
- }
- for _, v := range c.Conditions {
- err = t.addTask(v, c.Actions)
- if err != nil {
- server.Log.Errorf("add timerTask error:sceneId:%s, taskId: %s, error: %v", c.SceneId, v.TaskId, err)
- return err
- }
- }
- t.task.Set(c.SceneId, c)
- server.Log.Debugf("UpdateTimeScene :%s", config)
- return nil
- }
- func (t *TimerSceneService) Remove(id string) error {
- if !t.task.Contains(id) {
- return errors.New("场景不存在")
- }
- scene := t.task.Get(id).(TimerSceneConfig)
- for _, v := range scene.Conditions {
- t.cron.Remove(v.TaskId)
- }
- server.Log.Debugf("RemoveTimeScene :%s", scene.SceneId)
- t.task.Remove(id)
- return nil
- }
- func (t *TimerSceneService) Start(id string) error {
- if !t.task.Contains(id) {
- return errors.New("场景不存在")
- }
- scene := t.task.Get(id).(TimerSceneConfig)
- for _, v := range scene.Conditions {
- t.cron.Start(v.TaskId)
- }
- server.Log.Debugf("StartTimeScene :%s", scene.SceneId)
- return nil
- }
- func (t *TimerSceneService) Stop(id string) error {
- if !t.task.Contains(id) {
- return errors.New("场景不存在")
- }
- scene := t.task.Get(id).(TimerSceneConfig)
- for _, v := range scene.Conditions {
- t.cron.Stop(v.TaskId)
- }
- server.Log.Debugf("StopTimeScene :%s", scene.SceneId)
- return nil
- }
|