package internal import ( "fmt" "github.com/streadway/amqp" "sparrow/pkg/rpcs" "sparrow/pkg/rule" "sparrow/pkg/server" "time" ) type TimerService struct { tc *rule.TaskLifecycleConsumer host string done chan bool isReady bool conn *amqp.Connection notifyCloseChannel chan *amqp.Error notifyChanClose chan *amqp.Error ch *amqp.Channel reconnectChan chan struct{} taskSchedule *TaskSchedule } func NewTimerService(host string) *TimerService { ts := &TimerService{ host: host, done: make(chan bool), taskSchedule: NewTaskSchedule(), reconnectChan: make(chan struct{}), } go ts.handleReconnect() return ts } func (s *TimerService) Ping(args string, result rpcs.ReplyEmptyResult) error { return nil } func (s *TimerService) init(conn *amqp.Connection) error { ch, err := conn.Channel() if err != nil { return err } s.ch = ch tc := rule.NewTaskLifecycleConsumer(ch) err = tc.Init() if err != nil { return err } err = tc.Start() if err != nil { return err } tc.SetExternalConsumer(s.taskSchedule) s.tc = tc s.notifyChanClose = make(chan *amqp.Error) s.ch.NotifyClose(s.notifyChanClose) s.isReady = true return s.initTaskConsumer() } // 初始化任务消费者 func (s *TimerService) initTaskConsumer() error { q, err := s.ch.QueueDeclare("", false, true, false, false, nil) if err != nil { return err } err = s.ch.QueueBind(q.Name, "*.*.timer", rule.TaskExchange, false, nil) if err != nil { return err } msgChan, err := s.ch.Consume(q.Name, "", false, false, false, false, nil) if err != nil { return err } go func() { for { select { case <-s.reconnectChan: return case msg := <-msgChan: s.handleTimerTask(msg.Body) } } }() return nil } func (s *TimerService) handleTimerTask(msg []byte) { err := s.taskSchedule.AddTask(msg) if err != nil { server.Log.Errorf("create task error :%s", err.Error()) } } func (s *TimerService) connect() (*amqp.Connection, error) { conn, err := amqp.Dial(s.host) if err != nil { return nil, err } s.conn = conn s.notifyCloseChannel = make(chan *amqp.Error) s.conn.NotifyClose(s.notifyCloseChannel) return conn, err } func (s *TimerService) handleReconnect() { for { s.isReady = false conn, err := s.connect() fmt.Println("handleReconnect") if err != nil { server.Log.Errorf("connect to rabbitmq error:%s", err.Error()) select { case <-s.done: return case <-time.After(4 * time.Second): } continue } if done := s.handleReInit(conn); done { break } } } func (s *TimerService) handleReInit(conn *amqp.Connection) bool { for { s.isReady = false err := s.init(conn) if err != nil { select { case <-s.done: return true case <-time.After(time.Second * 3): } continue } select { case <-s.done: return true case err := <-s.notifyCloseChannel: fmt.Println("Connection closed. Reconnecting..." + err.Error()) close(s.reconnectChan) return false case err := <-s.notifyChanClose: fmt.Println("Channel closed. Re-running init..." + err.Error()) } } }