123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- package service
- import (
- "encoding/json"
- "fmt"
- "github.com/streadway/amqp"
- "sparrow/pkg/rpcs"
- "sparrow/pkg/rule"
- "sparrow/pkg/server"
- "sparrow/services/scene-access/internal/service/manager"
- "time"
- )
- const TimerTopic = "sparrow.task.timer" // 定时任务主题
- const OneKeyTopic = "sparrow.task.oneKey" // 一键执行任务主题
- // SceneService 场景服务
- type SceneService struct {
- rabbitMQAddress string
- taskManager manager.Producer
- lifecycleManager manager.Producer
- done chan bool
- isReady bool
- conn *amqp.Connection
- notifyCloseChannel chan *amqp.Error
- notifyChanClose chan *amqp.Error
- ch *amqp.Channel
- }
- func NewSceneService(mqAddr string) *SceneService {
- srv := &SceneService{
- rabbitMQAddress: mqAddr,
- }
- go srv.handleReconnect()
- return srv
- }
- // SubmitTask rpc 提交一个任务
- func (s *SceneService) SubmitTask(args rpcs.ArgsSubmitTask, reply *rpcs.ReplyEmptyResult) error {
- var (
- topic string
- )
- switch args.Type {
- case "timer":
- topic = TimerTopic
- case "oneKey":
- topic = OneKeyTopic
- }
- return s.taskManager.Publish(topic, []byte(args.Data))
- }
- // SubmitTaskLifecycle rpc 提交一个任务生命周期
- func (s *SceneService) SubmitTaskLifecycle(args rpcs.ArgsSubmitTaskLifecycle, reply *rpcs.ReplyEmptyResult) error {
- taskMsg := rule.TaskLifecycleMessage{
- TaskId: args.TaskId,
- Action: args.Action,
- Data: args.Data,
- }
- data, err := json.Marshal(&taskMsg)
- if err != nil {
- return err
- }
- return s.lifecycleManager.Publish("sparrow.task.timer.lifecycle", data)
- }
- func (s *SceneService) init(conn *amqp.Connection) error {
- ch, err := conn.Channel()
- if err != nil {
- return err
- }
- s.ch = ch
- taskManager := manager.NewTaskManager(ch)
- err = taskManager.Init()
- if err != nil {
- return err
- }
- s.taskManager = taskManager
- lifecycleManager := manager.NewTaskLifecycleManager(ch)
- err = lifecycleManager.Init()
- if err != nil {
- return err
- }
- s.lifecycleManager = lifecycleManager
- s.notifyChanClose = make(chan *amqp.Error)
- s.ch.NotifyClose(s.notifyChanClose)
- s.isReady = true
- return nil
- }
- func (s *SceneService) connect() (*amqp.Connection, error) {
- conn, err := amqp.Dial(s.rabbitMQAddress)
- if err != nil {
- return nil, err
- }
- s.conn = conn
- s.notifyCloseChannel = make(chan *amqp.Error)
- s.conn.NotifyClose(s.notifyCloseChannel)
- return conn, err
- }
- func (s *SceneService) handleReconnect() {
- for {
- s.isReady = false
- conn, err := s.connect()
- fmt.Println("handleReconnect")
- if err != nil {
- server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
- select {
- case <-s.done:
- return
- case <-time.After(4 * time.Second):
- }
- continue
- }
- if done := s.handleReInit(conn); done {
- break
- }
- }
- }
- func (s *SceneService) handleReInit(conn *amqp.Connection) bool {
- for {
- s.isReady = false
- err := s.init(conn)
- if err != nil {
- select {
- case <-s.done:
- return true
- case <-time.After(time.Second * 3):
- }
- continue
- }
- select {
- case <-s.done:
- return true
- case err := <-s.notifyCloseChannel:
- fmt.Println("Connection closed. Reconnecting..." + err.Error())
- return false
- case err := <-s.notifyChanClose:
- fmt.Println("Channel closed. Re-running init..." + err.Error())
- }
- }
- }
|