liuxiulin 1 gadu atpakaļ
vecāks
revīzija
7e53398ade
1 mainītis faili ar 15 papildinājumiem un 9 dzēšanām
  1. 15 9
      services/timer-service/internal/scheduler.go

+ 15 - 9
services/timer-service/internal/scheduler.go

@@ -3,6 +3,7 @@ package internal
 import (
 import (
 	"context"
 	"context"
 	"encoding/json"
 	"encoding/json"
+	"fmt"
 	"github.com/gogf/gf/container/gmap"
 	"github.com/gogf/gf/container/gmap"
 	"github.com/gogf/gf/encoding/gjson"
 	"github.com/gogf/gf/encoding/gjson"
 	"github.com/gogf/gf/v2/os/gcron"
 	"github.com/gogf/gf/v2/os/gcron"
@@ -13,11 +14,13 @@ import (
 // TaskSchedule task schedule 任务调度
 // TaskSchedule task schedule 任务调度
 type TaskSchedule struct {
 type TaskSchedule struct {
 	tasks *gmap.HashMap // 保存任务名称与任务实体的映射
 	tasks *gmap.HashMap // 保存任务名称与任务实体的映射
+	cron  *gcron.Cron
 }
 }
 
 
 func NewTaskSchedule() *TaskSchedule {
 func NewTaskSchedule() *TaskSchedule {
 	return &TaskSchedule{
 	return &TaskSchedule{
 		tasks: gmap.NewHashMap(true),
 		tasks: gmap.NewHashMap(true),
+		cron:  gcron.New(),
 	}
 	}
 }
 }
 
 
@@ -34,7 +37,7 @@ func (t *TaskSchedule) AddTask(msg []byte) error {
 	newJson := gjson.New(task.Actions)
 	newJson := gjson.New(task.Actions)
 	server.Log.Debugf("add task: taskId:%s,corn:%s,actions:%s", task.TaskId, task.Cron, newJson.MustToJsonString())
 	server.Log.Debugf("add task: taskId:%s,corn:%s,actions:%s", task.TaskId, task.Cron, newJson.MustToJsonString())
 	// 创建任务
 	// 创建任务
-	entity, err := gcron.Add(context.Background(), task.Cron, func(ctx context.Context) {
+	entity, err := t.cron.Add(context.Background(), task.Cron, func(ctx context.Context) {
 		if err = NewTaskExecutor(task.Actions).Do(); err != nil {
 		if err = NewTaskExecutor(task.Actions).Do(); err != nil {
 			server.Log.Errorf("do taskid :%s error:%s", task.TaskId, err.Error())
 			server.Log.Errorf("do taskid :%s error:%s", task.TaskId, err.Error())
 		}
 		}
@@ -52,21 +55,21 @@ func (t *TaskSchedule) AddMessageHandle(msg *rule.TaskLifecycleMessage) error {
 }
 }
 
 
 func (t *TaskSchedule) RemoveMessageHandle(msg *rule.TaskLifecycleMessage) error {
 func (t *TaskSchedule) RemoveMessageHandle(msg *rule.TaskLifecycleMessage) error {
-
-	gcron.Remove(msg.TaskId)
+	fmt.Printf("before任务列表:%v", t.cron.Entries())
+	t.cron.Remove(msg.TaskId)
 	_ = t.tasks.Remove(msg.TaskId)
 	_ = t.tasks.Remove(msg.TaskId)
 
 
 	server.Log.Debugf("RemoveMessageHandle :%s", msg.TaskId)
 	server.Log.Debugf("RemoveMessageHandle :%s", msg.TaskId)
+	fmt.Printf("after任务列表:%v", t.cron.Entries())
 	return nil
 	return nil
 }
 }
 
 
 func (t *TaskSchedule) UpdateMessageHandle(msg *rule.TaskLifecycleMessage) error {
 func (t *TaskSchedule) UpdateMessageHandle(msg *rule.TaskLifecycleMessage) error {
 
 
-	gcron.Remove(msg.TaskId)
-
+	t.cron.Remove(msg.TaskId)
 	var task rule.TimerTaskMessage
 	var task rule.TimerTaskMessage
 	_ = json.Unmarshal([]byte(msg.Data), &task)
 	_ = json.Unmarshal([]byte(msg.Data), &task)
-	entity, err := gcron.Add(context.Background(), task.Cron, func(ctx context.Context) {
+	entity, err := t.cron.Add(context.Background(), task.Cron, func(ctx context.Context) {
 		if err := NewTaskExecutor(task.Actions).Do(); err != nil {
 		if err := NewTaskExecutor(task.Actions).Do(); err != nil {
 			server.Log.Errorf("do taskid :%s error:%s", task.TaskId, err.Error())
 			server.Log.Errorf("do taskid :%s error:%s", task.TaskId, err.Error())
 		}
 		}
@@ -81,20 +84,23 @@ func (t *TaskSchedule) UpdateMessageHandle(msg *rule.TaskLifecycleMessage) error
 }
 }
 
 
 func (t *TaskSchedule) SnapMessageHandle(msg *rule.TaskLifecycleMessage) error {
 func (t *TaskSchedule) SnapMessageHandle(msg *rule.TaskLifecycleMessage) error {
-	gcron.Stop(msg.TaskId)
+	t.cron.Stop(msg.TaskId)
 
 
 	server.Log.Debugf("SnapMessageHandle :%s", msg.TaskId)
 	server.Log.Debugf("SnapMessageHandle :%s", msg.TaskId)
 	return nil
 	return nil
 }
 }
 
 
 func (t *TaskSchedule) StartMessageHandle(msg *rule.TaskLifecycleMessage) error {
 func (t *TaskSchedule) StartMessageHandle(msg *rule.TaskLifecycleMessage) error {
-	gcron.Start(msg.TaskId)
+	t.cron.Start(msg.TaskId)
 	server.Log.Debugf("StartMessageHandle :%s", msg.TaskId)
 	server.Log.Debugf("StartMessageHandle :%s", msg.TaskId)
+	server.Log.Debugf("taskId:%s,status:%d", msg.TaskId, t.cron.Search(msg.TaskId).Status())
 	return nil
 	return nil
 }
 }
 
 
 func (t *TaskSchedule) StopMessageHandle(msg *rule.TaskLifecycleMessage) error {
 func (t *TaskSchedule) StopMessageHandle(msg *rule.TaskLifecycleMessage) error {
-	gcron.Stop(msg.TaskId)
+	t.cron.Stop(msg.TaskId)
+	t.cron.Entries()
 	server.Log.Debugf("StopMessageHandle :%s", msg.TaskId)
 	server.Log.Debugf("StopMessageHandle :%s", msg.TaskId)
+	server.Log.Debugf("taskId:%s,status:%d", msg.TaskId, t.cron.Search(msg.TaskId).Status())
 	return nil
 	return nil
 }
 }