1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- package internal
- import (
- "encoding/json"
- "github.com/gogf/gf/container/gmap"
- "github.com/gogf/gf/encoding/gjson"
- "github.com/gogf/gf/v2/os/gcron"
- "sparrow/pkg/rule"
- "sparrow/pkg/server"
- )
- // TaskSchedule task schedule 任务调度
- type TaskSchedule struct {
- tasks *gmap.HashMap // 保存任务名称与任务实体的映射
- }
- func (t *TaskSchedule) AddMessageHandle(msg *rule.TaskLifecycleMessage) error {
- return nil
- }
- func NewTaskSchedule() *TaskSchedule {
- return &TaskSchedule{
- tasks: gmap.NewHashMap(true),
- }
- }
- func (t *TaskSchedule) DoTask(msg []byte) error {
- var task rule.TimerTaskMessage
- err := json.Unmarshal([]byte(msg), &task)
- if err != nil {
- return err
- }
- newJson := gjson.New(task.Actions)
- server.Log.Debugf("do task:actions:%s", newJson.MustToJsonString())
- // 创建任务
- err = NewTaskExecutor(task.Actions).Do()
- if err != nil {
- return err
- }
- return nil
- }
- func (t *TaskSchedule) RemoveMessageHandle(msg *rule.TaskLifecycleMessage) error {
- gcron.Remove(msg.TaskId)
- _ = t.tasks.Remove(msg.TaskId)
- server.Log.Debugf("RemoveMessageHandle :%s", msg.TaskId)
- return nil
- }
- func (t *TaskSchedule) UpdateMessageHandle(msg *rule.TaskLifecycleMessage) error {
- server.Log.Debugf("UpdateMessageHandle :%s", msg.TaskId)
- return nil
- }
- func (t *TaskSchedule) SnapMessageHandle(msg *rule.TaskLifecycleMessage) error {
- gcron.Stop(msg.TaskId)
- server.Log.Debugf("SnapMessageHandle :%s", msg.TaskId)
- return nil
- }
- func (t *TaskSchedule) StartMessageHandle(msg *rule.TaskLifecycleMessage) error {
- gcron.Start(msg.TaskId)
- server.Log.Debugf("StartMessageHandle :%s", msg.TaskId)
- return nil
- }
- func (t *TaskSchedule) StopMessageHandle(msg *rule.TaskLifecycleMessage) error {
- gcron.Stop(msg.TaskId)
- server.Log.Debugf("StopMessageHandle :%s", msg.TaskId)
- return nil
- }
|