Browse Source

增加定时任务rpc

liuxiulin 1 year ago
parent
commit
d9c7dd335e

+ 2 - 0
pkg/rpcs/task.go

@@ -11,3 +11,5 @@ type ArgsSubmitTaskLifecycle struct {
 	Action string
 	Data   string
 }
+
+type ReplySubmitTask ReplyEmptyResult

+ 43 - 0
services/apiprovider/actions.go

@@ -420,3 +420,46 @@ func CheckDeviceIsOnline(req *http.Request, r render.Render) {
 		Result: 1,
 	})
 }
+
+func SubmitSceneTask(req *http.Request, r render.Render) {
+
+	var ruleReq rpcs.ArgsSubmitTask
+	decoder := json.NewDecoder(req.Body)
+	err := decoder.Decode(&ruleReq)
+	if err != nil {
+		r.JSON(http.StatusOK, renderError(ErrWrongRequestFormat, err))
+		return
+	}
+
+	reply := rpcs.ReplySubmitTask{}
+	err = server.RPCCallByName(nil, rpcs.SceneAccessServiceName, "SceneAccess.SubmitTask", ruleReq, reply)
+	if err != nil {
+		server.Log.Errorf("submit sceneTask error: %v", err)
+		r.JSON(http.StatusOK, renderError(ErrSystemFault, err))
+		return
+	}
+	r.JSON(http.StatusOK, Common{})
+	return
+}
+
+func SubmitTaskLifecycle(req *http.Request, r render.Render) {
+
+	var ruleReq rpcs.ArgsSubmitTaskLifecycle
+	decoder := json.NewDecoder(req.Body)
+	err := decoder.Decode(&ruleReq)
+	if err != nil {
+		r.JSON(http.StatusOK, renderError(ErrWrongRequestFormat, err))
+		return
+	}
+
+	reply := rpcs.ReplySubmitTask{}
+	err = server.RPCCallByName(nil, rpcs.SceneAccessServiceName, "SceneAccess.SubmitTaskLifecycle", ruleReq, reply)
+	if err != nil {
+		r.JSON(http.StatusOK, renderError(ErrWrongSecret, errors.New("invalid secret key")))
+		server.Log.Errorf("submit taskLifecycle error: %v", err)
+		r.JSON(http.StatusOK, renderError(ErrSystemFault, err))
+		return
+	}
+	r.JSON(http.StatusOK, Common{})
+	return
+}

+ 4 - 0
services/apiprovider/router.go

@@ -66,6 +66,10 @@ func route(m *martini.ClassicMartini) {
 		r.Get("/devices/check_net_config", CheckDeviceNetConfig)
 
 		r.Get("/devices/online", CheckDeviceIsOnline)
+
+		m.Post("/scene_task", SubmitSceneTask)
+
+		m.Post("/task_lifecycle", SubmitTaskLifecycle)
 	})
 	m.Group("/application/v2", func(r martini.Router) {
 		// send a command to device

BIN
services/knowoapi/knowoapi


+ 1 - 1
services/timer-service/internal/executer.go

@@ -50,7 +50,7 @@ func (a *TaskExecutor) doDeviceIssueTask(entityId, subEntityId string, action *r
 // 执行延时任务
 func (a *TaskExecutor) doDelayTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
 	time.Sleep(time.Duration(action.DelaySeconds) * time.Second)
-	return nil
+	return a.doDeviceIssueTask(entityId, subEntityId, action)
 }
 
 func getAccessRPCHost(deviceid string) (string, error) {

+ 21 - 1
services/timer-service/internal/scheduler.go

@@ -49,16 +49,36 @@ func (t *TaskSchedule) AddMessageHandle(msg *rule.TaskLifecycleMessage) error {
 }
 
 func (t *TaskSchedule) RemoveMessageHandle(msg *rule.TaskLifecycleMessage) error {
+
+	gcron.Remove(msg.TaskId)
+	_ = t.tasks.Remove(msg.TaskId)
 	server.Log.Debugf("RemoveMessageHandle :%s", msg.TaskId)
 	return nil
 }
 
 func (t *TaskSchedule) UpdateMessageHandle(msg *rule.TaskLifecycleMessage) error {
-	server.Log.Debugf("UpdateMessageHandle :%s", msg.TaskId)
+
+	gcron.Remove(msg.TaskId)
+
+	var task rule.TimerTaskMessage
+	_ = json.Unmarshal([]byte(msg.Data), &task)
+	entity, err := gcron.Add(context.Background(), task.Cron, func(ctx context.Context) {
+		if err := NewTaskExecutor(task.Actions).Do(); err != nil {
+			server.Log.Errorf("do taskid :%s error:%s", task.TaskId, err.Error())
+		}
+	}, task.TaskId)
+	if err != nil {
+		return err
+	}
+
+	t.tasks.Set(task.TaskId, entity)
+	server.Log.Debugf("UpdateMessageHandle :%s", task.TaskId)
 	return nil
 }
 
 func (t *TaskSchedule) SnapMessageHandle(msg *rule.TaskLifecycleMessage) error {
+	gcron.Stop(msg.TaskId)
+
 	server.Log.Debugf("SnapMessageHandle :%s", msg.TaskId)
 	return nil
 }