Browse Source

add oneKey-service

liuxiulin 1 year ago
parent
commit
7321553790

+ 1 - 0
pkg/rpcs/common.go

@@ -17,4 +17,5 @@ const (
 	EmqxAgentServiceName   = "EmqxAgentServer"
 	EmqxAgentServiceName   = "EmqxAgentServer"
 	SceneAccessServiceName = "SceneAccess"
 	SceneAccessServiceName = "SceneAccess"
 	TimerServiceName       = "TimerService"
 	TimerServiceName       = "TimerService"
+	OneKeyServiceName      = "OneKeyService"
 )
 )

+ 68 - 0
services/onekey-service/internal/executer.go

@@ -0,0 +1,68 @@
+package internal
+
+import (
+	"fmt"
+	"sparrow/pkg/rpcs"
+	"sparrow/pkg/rule"
+	"sparrow/pkg/server"
+	"time"
+)
+
+// TaskExecutor 任务执行器,用来执行具体的任务动作
+type TaskExecutor struct {
+	Actions []*rule.TaskAction
+}
+
+func NewTaskExecutor(actions []*rule.TaskAction) *TaskExecutor {
+	return &TaskExecutor{
+		Actions: actions,
+	}
+}
+
+func (a *TaskExecutor) Do() error {
+	for _, action := range a.Actions {
+		switch action.ActionExecutor {
+		case "delay":
+			return a.doDelayTask(action.EntityId, action.SubEntityId, action.ExecutorProperty)
+		case "device_issue":
+			return a.doDeviceIssueTask(action.EntityId, action.SubEntityId, action.ExecutorProperty)
+		}
+	}
+	return nil
+}
+
+func (a *TaskExecutor) doDeviceIssueTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
+	// 调用设备接入服务
+	rpchost, err := getAccessRPCHost(entityId)
+	if err != nil {
+		return err
+	}
+	fmt.Printf("rpchost:%s", rpchost)
+	args := rpcs.ArgsSendCommand{
+		DeviceId:  entityId,
+		SubDevice: subEntityId,
+		Cmd:       action.FunctionCode,
+		Params:    action.FunctionValue,
+	}
+	reply := &rpcs.ReplyEmptyResult{}
+	server.Log.Debugf("do Device Issue task args:%v", args)
+	return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
+}
+
+// 执行延时任务
+func (a *TaskExecutor) doDelayTask(entityId, subEntityId string, action *rule.TaskExecutorProperty) error {
+	time.Sleep(time.Duration(action.DelaySeconds) * time.Second)
+	return a.doDeviceIssueTask(entityId, subEntityId, action)
+}
+
+func getAccessRPCHost(deviceid string) (string, error) {
+	args := rpcs.ArgsGetDeviceOnlineStatus{
+		Id: deviceid,
+	}
+	reply := &rpcs.ReplyGetDeviceOnlineStatus{}
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply)
+	if err != nil {
+		return "", err
+	}
+	return reply.AccessRPCHost, nil
+}

+ 139 - 0
services/onekey-service/internal/onekey_service.go

@@ -0,0 +1,139 @@
+package internal
+
+import (
+	"fmt"
+	"github.com/streadway/amqp"
+	"sparrow/pkg/rpcs"
+	"sparrow/pkg/rule"
+	"sparrow/pkg/server"
+	"time"
+)
+
+type OneKeyService struct {
+	tc                 *rule.TaskLifecycleConsumer
+	host               string
+	done               chan bool
+	isReady            bool
+	conn               *amqp.Connection
+	notifyCloseChannel chan *amqp.Error
+	notifyChanClose    chan *amqp.Error
+	ch                 *amqp.Channel
+	reconnectChan      chan struct{}
+	taskSchedule       *TaskSchedule
+}
+
+func NewOneKeyService(host string) *OneKeyService {
+	ts := &OneKeyService{
+		host:          host,
+		done:          make(chan bool),
+		taskSchedule:  NewTaskSchedule(),
+		reconnectChan: make(chan struct{}),
+	}
+	go ts.handleReconnect()
+	return ts
+}
+
+func (s *OneKeyService) Ping(args string, result rpcs.ReplyEmptyResult) error {
+	return nil
+}
+
+func (s *OneKeyService) init(conn *amqp.Connection) error {
+	ch, err := conn.Channel()
+	if err != nil {
+		return err
+	}
+	s.ch = ch
+
+	s.notifyChanClose = make(chan *amqp.Error)
+	s.ch.NotifyClose(s.notifyChanClose)
+	s.isReady = true
+	return s.initTaskConsumer()
+}
+
+// 初始化任务消费者
+func (s *OneKeyService) initTaskConsumer() error {
+	q, err := s.ch.QueueDeclare("", false, true, false, false, nil)
+	if err != nil {
+		return err
+	}
+	err = s.ch.QueueBind(q.Name, "*.*.oneKey", rule.TaskExchange, false, nil)
+	if err != nil {
+		return err
+	}
+	msgChan, err := s.ch.Consume(q.Name, "", false, false, false, false, nil)
+	if err != nil {
+		return err
+	}
+	go func() {
+		for {
+			select {
+			case <-s.reconnectChan:
+				return
+			case msg := <-msgChan:
+				s.handleOneKeyTask(msg.Body)
+			}
+		}
+	}()
+	return nil
+}
+
+func (s *OneKeyService) handleOneKeyTask(msg []byte) {
+	err := s.taskSchedule.DoTask(msg)
+	if err != nil {
+		server.Log.Errorf("do task error :%s", err.Error())
+	}
+}
+
+func (s *OneKeyService) connect() (*amqp.Connection, error) {
+	conn, err := amqp.Dial(s.host)
+	if err != nil {
+		return nil, err
+	}
+	s.conn = conn
+	s.notifyCloseChannel = make(chan *amqp.Error)
+	s.conn.NotifyClose(s.notifyCloseChannel)
+	return conn, err
+}
+func (s *OneKeyService) handleReconnect() {
+	for {
+		s.isReady = false
+		conn, err := s.connect()
+		fmt.Println("handleReconnect")
+		if err != nil {
+			server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
+			select {
+			case <-s.done:
+				return
+			case <-time.After(4 * time.Second):
+			}
+			continue
+		}
+		if done := s.handleReInit(conn); done {
+			break
+		}
+	}
+}
+func (s *OneKeyService) handleReInit(conn *amqp.Connection) bool {
+	for {
+		s.isReady = false
+		err := s.init(conn)
+		if err != nil {
+			select {
+			case <-s.done:
+				return true
+			case <-time.After(time.Second * 3):
+			}
+			continue
+		}
+		select {
+		case <-s.done:
+			return true
+		case err := <-s.notifyCloseChannel:
+			fmt.Println("Connection closed. Reconnecting..." + err.Error())
+			close(s.reconnectChan)
+			return false
+		case err := <-s.notifyChanClose:
+			fmt.Println("Channel closed. Re-running init..." + err.Error())
+		}
+	}
+}

+ 38 - 0
services/onekey-service/internal/scheduler.go

@@ -0,0 +1,38 @@
+package internal
+
+import (
+	"encoding/json"
+	"github.com/gogf/gf/container/gmap"
+	"github.com/gogf/gf/encoding/gjson"
+	"sparrow/pkg/rule"
+	"sparrow/pkg/server"
+)
+
+// TaskSchedule task schedule 任务调度
+type TaskSchedule struct {
+	tasks *gmap.HashMap // 保存任务名称与任务实体的映射
+}
+
+func NewTaskSchedule() *TaskSchedule {
+	return &TaskSchedule{
+		tasks: gmap.NewHashMap(true),
+	}
+}
+
+func (t *TaskSchedule) DoTask(msg []byte) error {
+	var task rule.TimerTaskMessage
+	err := json.Unmarshal([]byte(msg), &task)
+	if err != nil {
+		return err
+	}
+
+	newJson := gjson.New(task.Actions)
+	server.Log.Debugf("do task:actions:%s", newJson.MustToJsonString())
+	// 创建任务
+	err = NewTaskExecutor(task.Actions).Do()
+	if err != nil {
+		return err
+	}
+
+	return nil
+}

+ 31 - 0
services/onekey-service/main.go

@@ -0,0 +1,31 @@
+package main
+
+import (
+	"flag"
+	"sparrow/pkg/rpcs"
+	"sparrow/pkg/server"
+	"sparrow/services/onekey-service/internal"
+)
+
+const (
+	flagRabbitHost    = "rabbithost"
+	defaultRabbitHost = "amqp://guest:guest@localhost:5672/"
+)
+
+var (
+	confRabbitHost = flag.String(flagRabbitHost, defaultRabbitHost, "rabbitmq host address, amqp://user:password@ip:port/")
+)
+
+func main() {
+	// init server
+	err := server.Init(rpcs.OneKeyServiceName)
+	if err != nil {
+		server.Log.Fatal(err)
+		return
+	}
+	internal.NewOneKeyService(*confRabbitHost)
+	err = server.Run()
+	if err != nil {
+		server.Log.Fatal(err)
+	}
+}

+ 4 - 1
services/scene-access/internal/service/scene.go

@@ -11,7 +11,8 @@ import (
 	"time"
 	"time"
 )
 )
 
 
-const TimerTopic = "sparrow.task.timer" // 定时任务主题
+const TimerTopic = "sparrow.task.timer"   // 定时任务主题
+const OneKeyTopic = "sparrow.task.oneKey" // 一键执行任务主题
 
 
 // SceneService 场景服务
 // SceneService 场景服务
 type SceneService struct {
 type SceneService struct {
@@ -42,6 +43,8 @@ func (s *SceneService) SubmitTask(args rpcs.ArgsSubmitTask, reply *rpcs.ReplyEmp
 	switch args.Type {
 	switch args.Type {
 	case "timer":
 	case "timer":
 		topic = TimerTopic
 		topic = TimerTopic
+	case "oneKey":
+		topic = OneKeyTopic
 	}
 	}
 
 
 	return s.taskManager.Publish(topic, []byte(args.Data))
 	return s.taskManager.Publish(topic, []byte(args.Data))