liuxiulin 1 年之前
父節點
當前提交
e53f57b756

+ 1 - 0
services/apiprovider/actions.go

@@ -431,6 +431,7 @@ func SubmitSceneTask(req *http.Request, r render.Render) {
 		return
 	}
 	reply := rpcs.ReplySubmitTask{}
+	server.Log.Debugf("submit sceneTask %v", ruleReq)
 	err = server.RPCCallByName(nil, rpcs.SceneAccessServiceName, "SceneService.SubmitTask", ruleReq, &reply)
 	if err != nil {
 		server.Log.Errorf("submit sceneTask error: %v", err)

+ 6 - 1
services/onekey-service/internal/executer.go

@@ -46,7 +46,12 @@ func (a *TaskExecutor) doDeviceIssueTask(entityId, subEntityId string, action *r
 	}
 	reply := &rpcs.ReplyEmptyResult{}
 	server.Log.Debugf("do Device Issue task args:%v", args)
-	return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
+	err = server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
+	if err != nil {
+		return err
+	}
+	time.Sleep(100 * time.Millisecond)
+	return nil
 }
 
 // 执行延时任务

+ 11 - 1
services/onekey-service/internal/onekey_service.go

@@ -43,7 +43,17 @@ func (s *OneKeyService) init(conn *amqp.Connection) error {
 		return err
 	}
 	s.ch = ch
-
+	tc := rule.NewTaskLifecycleConsumer(ch)
+	err = tc.Init()
+	if err != nil {
+		return err
+	}
+	err = tc.Start()
+	if err != nil {
+		return err
+	}
+	tc.SetExternalConsumer(s.taskSchedule)
+	s.tc = tc
 	s.notifyChanClose = make(chan *amqp.Error)
 	s.ch.NotifyClose(s.notifyChanClose)
 	s.isReady = true

+ 39 - 0
services/onekey-service/internal/scheduler.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"github.com/gogf/gf/container/gmap"
 	"github.com/gogf/gf/encoding/gjson"
+	"github.com/gogf/gf/v2/os/gcron"
 	"sparrow/pkg/rule"
 	"sparrow/pkg/server"
 )
@@ -13,6 +14,10 @@ type TaskSchedule struct {
 	tasks *gmap.HashMap // 保存任务名称与任务实体的映射
 }
 
+func (t *TaskSchedule) AddMessageHandle(msg *rule.TaskLifecycleMessage) error {
+	return nil
+}
+
 func NewTaskSchedule() *TaskSchedule {
 	return &TaskSchedule{
 		tasks: gmap.NewHashMap(true),
@@ -36,3 +41,37 @@ func (t *TaskSchedule) DoTask(msg []byte) error {
 
 	return nil
 }
+
+func (t *TaskSchedule) RemoveMessageHandle(msg *rule.TaskLifecycleMessage) error {
+
+	gcron.Remove(msg.TaskId)
+	_ = t.tasks.Remove(msg.TaskId)
+
+	server.Log.Debugf("RemoveMessageHandle :%s", msg.TaskId)
+	return nil
+}
+
+func (t *TaskSchedule) UpdateMessageHandle(msg *rule.TaskLifecycleMessage) error {
+
+	server.Log.Debugf("UpdateMessageHandle :%s", msg.TaskId)
+	return nil
+}
+
+func (t *TaskSchedule) SnapMessageHandle(msg *rule.TaskLifecycleMessage) error {
+	gcron.Stop(msg.TaskId)
+
+	server.Log.Debugf("SnapMessageHandle :%s", msg.TaskId)
+	return nil
+}
+
+func (t *TaskSchedule) StartMessageHandle(msg *rule.TaskLifecycleMessage) error {
+	gcron.Start(msg.TaskId)
+	server.Log.Debugf("StartMessageHandle :%s", msg.TaskId)
+	return nil
+}
+
+func (t *TaskSchedule) StopMessageHandle(msg *rule.TaskLifecycleMessage) error {
+	gcron.Stop(msg.TaskId)
+	server.Log.Debugf("StopMessageHandle :%s", msg.TaskId)
+	return nil
+}