package service import ( "encoding/json" "fmt" "github.com/streadway/amqp" "sparrow/pkg/rpcs" "sparrow/pkg/rule" "sparrow/pkg/server" "sparrow/services/scene-access/internal/service/manager" "time" ) const TimerTopic = "sparrow.task.timer" // 定时任务主题 const OneKeyTopic = "sparrow.task.oneKey" // 一键执行任务主题 // SceneService 场景服务 type SceneService struct { rabbitMQAddress string taskManager manager.Producer lifecycleManager manager.Producer done chan bool isReady bool conn *amqp.Connection notifyCloseChannel chan *amqp.Error notifyChanClose chan *amqp.Error ch *amqp.Channel } func NewSceneService(mqAddr string) *SceneService { srv := &SceneService{ rabbitMQAddress: mqAddr, } go srv.handleReconnect() return srv } // SubmitTask rpc 提交一个任务 func (s *SceneService) SubmitTask(args rpcs.ArgsSubmitTask, reply *rpcs.ReplyEmptyResult) error { var ( topic string ) switch args.Type { case "timer": topic = TimerTopic case "oneKey": topic = OneKeyTopic } return s.taskManager.Publish(topic, []byte(args.Data)) } // SubmitTaskLifecycle rpc 提交一个任务生命周期 func (s *SceneService) SubmitTaskLifecycle(args rpcs.ArgsSubmitTaskLifecycle, reply *rpcs.ReplyEmptyResult) error { taskMsg := rule.TaskLifecycleMessage{ TaskId: args.TaskId, Action: args.Action, Data: args.Data, } data, err := json.Marshal(&taskMsg) if err != nil { return err } return s.lifecycleManager.Publish("sparrow.task.timer.lifecycle", data) } func (s *SceneService) init(conn *amqp.Connection) error { ch, err := conn.Channel() if err != nil { return err } s.ch = ch taskManager := manager.NewTaskManager(ch) err = taskManager.Init() if err != nil { return err } s.taskManager = taskManager lifecycleManager := manager.NewTaskLifecycleManager(ch) err = lifecycleManager.Init() if err != nil { return err } s.lifecycleManager = lifecycleManager s.notifyChanClose = make(chan *amqp.Error) s.ch.NotifyClose(s.notifyChanClose) s.isReady = true return nil } func (s *SceneService) connect() (*amqp.Connection, error) { conn, err := amqp.Dial(s.rabbitMQAddress) if err != nil { return nil, err } s.conn = conn s.notifyCloseChannel = make(chan *amqp.Error) s.conn.NotifyClose(s.notifyCloseChannel) return conn, err } func (s *SceneService) handleReconnect() { for { s.isReady = false conn, err := s.connect() fmt.Println("handleReconnect") if err != nil { server.Log.Errorf("connect to rabbitmq error:%s", err.Error()) select { case <-s.done: return case <-time.After(4 * time.Second): } continue } if done := s.handleReInit(conn); done { break } } } func (s *SceneService) handleReInit(conn *amqp.Connection) bool { for { s.isReady = false err := s.init(conn) if err != nil { select { case <-s.done: return true case <-time.After(time.Second * 3): } continue } select { case <-s.done: return true case err := <-s.notifyCloseChannel: fmt.Println("Connection closed. Reconnecting..." + err.Error()) return false case err := <-s.notifyChanClose: fmt.Println("Channel closed. Re-running init..." + err.Error()) } } }