浏览代码

新增定时任务服务

lijian 1 年之前
父节点
当前提交
fe21d6d5c7

+ 2 - 2
pkg/productconfig/productconfig.go

@@ -14,7 +14,7 @@ type CommandOrEventParam struct {
 	Name      string
 }
 
-// ProductCommandOrEvent ``
+// ProductCommandOrEvent 
 type ProductCommandOrEvent struct {
 	No       int
 	Part     int
@@ -91,7 +91,7 @@ func (config *ProductConfig) ValidateCommandOrEvent(name string, params []interf
 	} else if typ == "event" {
 		target = config.Events
 	} else {
-		return nil, []interface{}{}, errors.New("wrong target type.")
+		return nil, []interface{}{}, errors.New("wrong target type")
 	}
 
 	// search for name

+ 4 - 8
pkg/queue/msgQueue/rabbitmq.go

@@ -84,10 +84,10 @@ func (r *RabbitMessageQueueAdmin) handleReInit(conn *amqp.Connection) bool {
 		select {
 		case <-r.done:
 			return true
-		case err :=<-r.notifyCloseChan:
-			fmt.Println("Connection closed. Reconnecting..."+ err.Error())
+		case err := <-r.notifyCloseChan:
+			fmt.Println("Connection closed. Reconnecting..." + err.Error())
 			return false
-		case err :=<-r.notifyChanClose:
+		case err := <-r.notifyChanClose:
 			fmt.Println("Channel closed. Re-running init..." + err.Error())
 		}
 	}
@@ -99,10 +99,6 @@ func (r *RabbitMessageQueueAdmin) init(conn *amqp.Connection) error {
 	if err != nil {
 		return err
 	}
-	//err = ch.Confirm(false)
-	//if err != nil {
-	//	return err
-	//}
 	r.ch = ch
 	r.notifyChanClose = make(chan *amqp.Error)
 	r.ch.NotifyClose(r.notifyChanClose)
@@ -296,7 +292,7 @@ func (r *RabbitMqConsumer) doPop(duration time.Duration) error {
 			msgs, err := r.admin.ch.Consume(
 				tpc,
 				"",    // consumer
-				true, // auto-ack
+				true,  // auto-ack
 				false, // exclusive
 				false, // no-local
 				false, // no-wait

+ 130 - 0
pkg/rule/task_lifecycle_consumer.go

@@ -0,0 +1,130 @@
+package rule
+
+import (
+	"encoding/json"
+	"github.com/streadway/amqp"
+	"sparrow/pkg/server"
+)
+
+type ExternalConsumer interface {
+	AddMessageHandle(msg *TaskLifecycleMessage) error
+	RemoveMessageHandle(msg *TaskLifecycleMessage) error
+	UpdateMessageHandle(msg *TaskLifecycleMessage) error
+	SnapMessageHandle(msg *TaskLifecycleMessage) error
+}
+
+const TaskLifecycleExchange = "task_lifecycle_exchange"
+const TaskExchange = "task_exchange"
+
+type TaskLifecycleConsumer struct {
+	ch          *amqp.Channel
+	ec          ExternalConsumer
+	stopChan    chan struct{}
+	messageChan chan []byte
+}
+
+func NewTaskLifecycleConsumer(ch *amqp.Channel) *TaskLifecycleConsumer {
+	return &TaskLifecycleConsumer{
+		ch:          ch,
+		stopChan:    make(chan struct{}),
+		messageChan: make(chan []byte, 10),
+	}
+}
+
+func (a *TaskLifecycleConsumer) SetExternalConsumer(ec ExternalConsumer) {
+	a.ec = ec
+}
+
+func (a *TaskLifecycleConsumer) Stop() {
+	close(a.stopChan)
+}
+
+func (a *TaskLifecycleConsumer) Start() error {
+	go func() {
+		for {
+			select {
+			case <-a.stopChan:
+				return
+			case msg := <-a.messageChan:
+				a.handleMessage(msg)
+			}
+		}
+	}()
+	return nil
+}
+
+func (a *TaskLifecycleConsumer) handleMessage(msg []byte) {
+	if a.ec == nil {
+		server.Log.Errorf("TaskLifecycleConsumer is unset")
+		return
+	}
+	var tm TaskLifecycleMessage
+	err := json.Unmarshal(msg, &tm)
+	if err != nil {
+		server.Log.Errorf("handle lifecycle message error :%v", err)
+		return
+	}
+	switch tm.Action {
+	case "add":
+		err = a.ec.AddMessageHandle(&tm)
+	case "remove":
+		err = a.ec.RemoveMessageHandle(&tm)
+	case "update":
+		err = a.ec.UpdateMessageHandle(&tm)
+	case "snap":
+		err = a.ec.SnapMessageHandle(&tm)
+	}
+}
+
+func (a *TaskLifecycleConsumer) Init() error {
+	err := a.ch.ExchangeDeclare(
+		TaskLifecycleExchange, // name
+		"fanout",              // type
+		true,                  // durable
+		false,                 // auto-deleted
+		false,                 // internal
+		false,                 // no-wait
+		nil,                   // arguments
+	)
+	if err != nil {
+		return err
+	}
+	//绑定queue到交换机
+	q, err := a.ch.QueueDeclare(
+		"",    // name
+		false, // durable
+		false, // delete when unused
+		true,  // exclusive
+		false, // no-wait
+		nil,   // arguments
+	)
+	if err != nil {
+		return err
+	}
+	err = a.ch.QueueBind(
+		q.Name, // queue name
+		"",     // routing key
+		TaskLifecycleExchange,
+		false,
+		nil,
+	)
+	if err != nil {
+		return err
+	}
+	// creat consumer
+	msg, err := a.ch.Consume(q.Name, "", true, false, false, false, nil)
+	if err != nil {
+		return err
+	}
+	go func() {
+		for {
+			select {
+			case <-a.stopChan:
+				return
+			case d := <-msg:
+				a.messageChan <- d.Body
+			}
+		}
+	}()
+	return nil
+}

+ 7 - 0
pkg/rule/task_lifecycle_message.go

@@ -0,0 +1,7 @@
+package rule
+
+type TaskLifecycleMessage struct {
+	TaskId string `json:"task_id"`
+	Action string `json:"action"`
+	Data   string `json:"data"` // 具体的任务操作配置字符串
+}

+ 21 - 0
services/scene-access/README.md

@@ -0,0 +1,21 @@
+## 场景服务
+
+### 核心功能
+
+* 实现场景服务,提供场景管理、场景联动、场景联动规则、场景联动日志, 设备定时服务等功能
+* 将外部提交的任务,根据类型,发布到rabbitMQ中的不同的Queue中,由不同的消费者(任务执行器)进行消费
+
+
+
+### 实现思路
+
+任务类型: 定时类任务,设备上报状态触发类任务,场景联动任务等
+
+使用rabbitmq的exchange的Routing类型,将任务类型作为routing key,将任务内容作为消息体,将消息投递到对应的Queue中
+
+
+### 任务生命周期管理
+
+消息类型:删除,更新,停止,启动。
+
+所有任务的生命周期消息都投递到一个交换机上,交换机的类型为fanout,所有任务执行器都可以订阅该交换机。保证每个执行器都可以管理目前正在执行的任务的生命周期

+ 6 - 0
services/scene-access/internal/service/config.go

@@ -0,0 +1,6 @@
+package service
+
+// Config 服务配置
+type Config struct {
+	RabbitMQ string // rabbit mq broker 连接字符串
+}

+ 7 - 0
services/scene-access/internal/service/manager/producer.go

@@ -0,0 +1,7 @@
+package manager
+
+type Producer interface {
+	Init() error
+	Publish(topic string, msg []byte) error
+	CreateTopicIfNotExists(topic string) error
+}

+ 31 - 0
services/scene-access/internal/service/manager/task_lifecycle_manager.go

@@ -0,0 +1,31 @@
+package manager
+
+import (
+	"github.com/streadway/amqp"
+	"sparrow/pkg/rule"
+)
+
+// TaskLifecycleManager 任务生命周期发布管理器
+type TaskLifecycleManager struct {
+	ch *amqp.Channel
+}
+
+func NewTaskLifecycleManager(ch *amqp.Channel) Producer {
+	return &TaskLifecycleManager{ch: ch}
+}
+
+func (t *TaskLifecycleManager) Init() error {
+	return t.ch.ExchangeDeclare(rule.TaskLifecycleExchange, "fanout", true, false, false, false, nil)
+}
+
+func (t *TaskLifecycleManager) Publish(topic string, msg []byte) error {
+	return t.ch.Publish(rule.TaskLifecycleExchange, topic, false, false, amqp.Publishing{
+		Body:         msg,
+		DeliveryMode: amqp.Persistent,
+	})
+}
+
+func (t *TaskLifecycleManager) CreateTopicIfNotExists(topic string) error {
+	//TODO implement me
+	panic("implement me")
+}

+ 38 - 0
services/scene-access/internal/service/manager/task_manager.go

@@ -0,0 +1,38 @@
+package manager
+
+import (
+	"github.com/streadway/amqp"
+	"sparrow/pkg/rule"
+)
+
+// TaskManager 任务发布管理器
+// 发布任务消息到
+type TaskManager struct {
+	ch *amqp.Channel
+}
+
+func NewTaskManager(ch *amqp.Channel) Producer {
+	return &TaskManager{
+		ch: ch,
+	}
+}
+
+func (a *TaskManager) Init() error {
+	// 定义exchange
+	return a.ch.ExchangeDeclare(rule.TaskExchange, "topic", true, false, false, false, nil)
+}
+
+func (a *TaskManager) Publish(topic string, msg []byte) error {
+	err := a.ch.Publish(rule.TaskExchange, topic, false, false, amqp.Publishing{
+		DeliveryMode: amqp.Persistent,
+		Body:         msg,
+	})
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (a *TaskManager) CreateTopicIfNotExists(topic string) error {
+	return nil
+}

+ 129 - 0
services/scene-access/internal/service/scene.go

@@ -0,0 +1,129 @@
+package service
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/streadway/amqp"
+	"sparrow/pkg/rule"
+	"sparrow/pkg/server"
+	"sparrow/services/scene-access/internal/service/manager"
+	"time"
+)
+
+const TimerTopic = "sparrow.task.timer" // 定时任务主题
+
+// SceneService 场景服务
+type SceneService struct {
+	rabbitMQAddress    string
+	taskManager        manager.Producer
+	lifecycleManager   manager.Producer
+	done               chan bool
+	isReady            bool
+	conn               *amqp.Connection
+	notifyCloseChannel chan *amqp.Error
+	notifyChanClose    chan *amqp.Error
+	ch                 *amqp.Channel
+}
+
+func NewSceneService(mqAddr string) *SceneService {
+	srv := &SceneService{
+		rabbitMQAddress: mqAddr,
+	}
+	go srv.handleReconnect()
+	return srv
+}
+
+// SubmitTask 提交一个任务
+func (s *SceneService) SubmitTask() error {
+	return s.taskManager.Publish(TimerTopic, []byte("this is a task"))
+}
+
+func (s *SceneService) SubmitTaskLifecycle() error {
+	taskMsg := rule.TaskLifecycleMessage{
+		TaskId: "xxxxxx",
+		Action: "add",
+		Data:   "test",
+	}
+	data, err := json.Marshal(&taskMsg)
+	if err != nil {
+		return err
+	}
+	return s.lifecycleManager.Publish("sparrow.task.timer.lifecycle", data)
+}
+
+func (s *SceneService) init(conn *amqp.Connection) error {
+	ch, err := conn.Channel()
+	if err != nil {
+		return err
+	}
+	s.ch = ch
+	taskManager := manager.NewTaskManager(ch)
+	err = taskManager.Init()
+	if err != nil {
+		return err
+	}
+	s.taskManager = taskManager
+	lifecycleManager := manager.NewTaskLifecycleManager(ch)
+	err = lifecycleManager.Init()
+	if err != nil {
+		return err
+	}
+	s.lifecycleManager = lifecycleManager
+	s.notifyChanClose = make(chan *amqp.Error)
+	s.ch.NotifyClose(s.notifyChanClose)
+	s.isReady = true
+	return nil
+}
+
+func (s *SceneService) connect() (*amqp.Connection, error) {
+	conn, err := amqp.Dial(s.rabbitMQAddress)
+	if err != nil {
+		return nil, err
+	}
+	s.conn = conn
+	s.notifyCloseChannel = make(chan *amqp.Error)
+	s.conn.NotifyClose(s.notifyCloseChannel)
+	return conn, err
+}
+func (s *SceneService) handleReconnect() {
+	for {
+		s.isReady = false
+		conn, err := s.connect()
+		fmt.Println("handleReconnect")
+		if err != nil {
+			server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
+			select {
+			case <-s.done:
+				return
+			case <-time.After(4 * time.Second):
+			}
+			continue
+		}
+		if done := s.handleReInit(conn); done {
+			break
+		}
+	}
+}
+func (s *SceneService) handleReInit(conn *amqp.Connection) bool {
+	for {
+		s.isReady = false
+		err := s.init(conn)
+		if err != nil {
+			select {
+			case <-s.done:
+				return true
+			case <-time.After(time.Second * 3):
+			}
+			continue
+		}
+		select {
+		case <-s.done:
+			return true
+		case err := <-s.notifyCloseChannel:
+			fmt.Println("Connection closed. Reconnecting..." + err.Error())
+			return false
+		case err := <-s.notifyChanClose:
+			fmt.Println("Channel closed. Re-running init..." + err.Error())
+		}
+	}
+}

+ 16 - 0
services/scene-access/main.go

@@ -0,0 +1,16 @@
+package main
+
+import (
+	"sparrow/services/scene-access/internal/service"
+	"time"
+)
+
+func main() {
+	scene := service.NewSceneService("amqp://admin:YjT5tlNVKxDsbUQ5jqfPhXfae@127.0.0.1:5672")
+	time.Sleep(time.Second * 5)
+	for {
+		scene.SubmitTask()
+		scene.SubmitTaskLifecycle()
+		time.Sleep(time.Second * 5)
+	}
+}

+ 165 - 0
services/timer-service/internal/timer_service.go

@@ -0,0 +1,165 @@
+package internal
+
+import (
+	"fmt"
+	"github.com/streadway/amqp"
+	"sparrow/pkg/rule"
+	"sparrow/pkg/server"
+	"time"
+)
+
+type TimerService struct {
+	tc                 *rule.TaskLifecycleConsumer
+	host               string
+	done               chan bool
+	isReady            bool
+	conn               *amqp.Connection
+	notifyCloseChannel chan *amqp.Error
+	notifyChanClose    chan *amqp.Error
+	ch                 *amqp.Channel
+	reconnectChan      chan struct{}
+}
+
+func NewTimerService(host string) *TimerService {
+	ts := &TimerService{
+		host:          host,
+		done:          make(chan bool),
+		reconnectChan: make(chan struct{}),
+	}
+	go ts.handleReconnect()
+	return ts
+}
+
+func (s *TimerService) init(conn *amqp.Connection) error {
+	ch, err := conn.Channel()
+	if err != nil {
+		return err
+	}
+	s.ch = ch
+	tc := rule.NewTaskLifecycleConsumer(ch)
+	err = tc.Init()
+	if err != nil {
+		return err
+	}
+	err = tc.Start()
+	if err != nil {
+		return err
+	}
+	tc.SetExternalConsumer(s)
+	s.tc = tc
+	s.notifyChanClose = make(chan *amqp.Error)
+	s.ch.NotifyClose(s.notifyChanClose)
+	s.isReady = true
+	return s.initTaskConsumer()
+}
+
+// 初始化任务消费者
+func (s *TimerService) initTaskConsumer() error {
+	q, err := s.ch.QueueDeclare("", false, true, false, false, nil)
+	if err != nil {
+		return err
+	}
+	err = s.ch.QueueBind(q.Name, "*.*.timer", rule.TaskExchange, false, nil)
+	if err != nil {
+		return err
+	}
+	msgChan, err := s.ch.Consume(q.Name, "", false, false, false, false, nil)
+	if err != nil {
+		return err
+	}
+	go func() {
+		for {
+			select {
+			case <-s.reconnectChan:
+				return
+			case msg := <-msgChan:
+				s.handleTimerTask(msg.Body)
+			}
+		}
+	}()
+	return nil
+}
+
+func (s *TimerService) handleTimerTask(msg []byte) {
+	fmt.Printf("%s", msg)
+}
+
+func (s *TimerService) connect() (*amqp.Connection, error) {
+	conn, err := amqp.Dial(s.host)
+	if err != nil {
+		return nil, err
+	}
+	s.conn = conn
+	s.notifyCloseChannel = make(chan *amqp.Error)
+	s.conn.NotifyClose(s.notifyCloseChannel)
+	return conn, err
+}
+func (s *TimerService) handleReconnect() {
+	for {
+		s.isReady = false
+		conn, err := s.connect()
+		fmt.Println("handleReconnect")
+		if err != nil {
+			server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
+			select {
+			case <-s.done:
+				return
+			case <-time.After(4 * time.Second):
+			}
+			continue
+		}
+		if done := s.handleReInit(conn); done {
+			break
+		}
+	}
+}
+func (s *TimerService) handleReInit(conn *amqp.Connection) bool {
+	for {
+		s.isReady = false
+		err := s.init(conn)
+		if err != nil {
+			select {
+			case <-s.done:
+				return true
+			case <-time.After(time.Second * 3):
+			}
+			continue
+		}
+		select {
+		case <-s.done:
+			return true
+		case err := <-s.notifyCloseChannel:
+			fmt.Println("Connection closed. Reconnecting..." + err.Error())
+			close(s.reconnectChan)
+			return false
+		case err := <-s.notifyChanClose:
+			fmt.Println("Channel closed. Re-running init..." + err.Error())
+		}
+	}
+}
+
+/* 实现任务生命周期消息管理 */
+
+// AddMessageHandle 新增任务
+func (s *TimerService) AddMessageHandle(msg *rule.TaskLifecycleMessage) error {
+	fmt.Printf("%v\r\n", msg)
+	return nil
+}
+
+// RemoveMessageHandle 删除任务
+func (s *TimerService) RemoveMessageHandle(msg *rule.TaskLifecycleMessage) error {
+	fmt.Printf("%v\r\n", msg)
+	return nil
+}
+
+// UpdateMessageHandle 更新任务
+func (s *TimerService) UpdateMessageHandle(msg *rule.TaskLifecycleMessage) error {
+	fmt.Printf("%v\r\n", msg)
+	return nil
+}
+
+// SnapMessageHandle 快照任务
+func (s *TimerService) SnapMessageHandle(msg *rule.TaskLifecycleMessage) error {
+	fmt.Printf("%v\r\n", msg)
+	return nil
+}

+ 17 - 0
services/timer-service/main.go

@@ -0,0 +1,17 @@
+package main
+
+import (
+	"flag"
+	"sparrow/services/timer-service/internal"
+	"time"
+)
+
+var (
+	uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
+)
+
+func main() {
+	internal.NewTimerService("amqp://admin:YjT5tlNVKxDsbUQ5jqfPhXfae@127.0.0.1:5672")
+	time.Sleep(time.Second * 5)
+	select {}
+}