123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- package internal
- import (
- "fmt"
- "github.com/streadway/amqp"
- "sparrow/pkg/rule"
- "sparrow/pkg/server"
- "time"
- )
- type TimerService struct {
- tc *rule.TaskLifecycleConsumer
- host string
- done chan bool
- isReady bool
- conn *amqp.Connection
- notifyCloseChannel chan *amqp.Error
- notifyChanClose chan *amqp.Error
- ch *amqp.Channel
- reconnectChan chan struct{}
- }
- func NewTimerService(host string) *TimerService {
- ts := &TimerService{
- host: host,
- done: make(chan bool),
- reconnectChan: make(chan struct{}),
- }
- go ts.handleReconnect()
- return ts
- }
- func (s *TimerService) init(conn *amqp.Connection) error {
- ch, err := conn.Channel()
- if err != nil {
- return err
- }
- s.ch = ch
- tc := rule.NewTaskLifecycleConsumer(ch)
- err = tc.Init()
- if err != nil {
- return err
- }
- err = tc.Start()
- if err != nil {
- return err
- }
- tc.SetExternalConsumer(s)
- s.tc = tc
- s.notifyChanClose = make(chan *amqp.Error)
- s.ch.NotifyClose(s.notifyChanClose)
- s.isReady = true
- return s.initTaskConsumer()
- }
- // 初始化任务消费者
- func (s *TimerService) initTaskConsumer() error {
- q, err := s.ch.QueueDeclare("", false, true, false, false, nil)
- if err != nil {
- return err
- }
- err = s.ch.QueueBind(q.Name, "*.*.timer", rule.TaskExchange, false, nil)
- if err != nil {
- return err
- }
- msgChan, err := s.ch.Consume(q.Name, "", false, false, false, false, nil)
- if err != nil {
- return err
- }
- go func() {
- for {
- select {
- case <-s.reconnectChan:
- return
- case msg := <-msgChan:
- s.handleTimerTask(msg.Body)
- }
- }
- }()
- return nil
- }
- func (s *TimerService) handleTimerTask(msg []byte) {
- fmt.Printf("%s", msg)
- }
- func (s *TimerService) connect() (*amqp.Connection, error) {
- conn, err := amqp.Dial(s.host)
- if err != nil {
- return nil, err
- }
- s.conn = conn
- s.notifyCloseChannel = make(chan *amqp.Error)
- s.conn.NotifyClose(s.notifyCloseChannel)
- return conn, err
- }
- func (s *TimerService) handleReconnect() {
- for {
- s.isReady = false
- conn, err := s.connect()
- fmt.Println("handleReconnect")
- if err != nil {
- server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
- select {
- case <-s.done:
- return
- case <-time.After(4 * time.Second):
- }
- continue
- }
- if done := s.handleReInit(conn); done {
- break
- }
- }
- }
- func (s *TimerService) handleReInit(conn *amqp.Connection) bool {
- for {
- s.isReady = false
- err := s.init(conn)
- if err != nil {
- select {
- case <-s.done:
- return true
- case <-time.After(time.Second * 3):
- }
- continue
- }
- select {
- case <-s.done:
- return true
- case err := <-s.notifyCloseChannel:
- fmt.Println("Connection closed. Reconnecting..." + err.Error())
- close(s.reconnectChan)
- return false
- case err := <-s.notifyChanClose:
- fmt.Println("Channel closed. Re-running init..." + err.Error())
- }
- }
- }
- /* 实现任务生命周期消息管理 */
- // AddMessageHandle 新增任务
- func (s *TimerService) AddMessageHandle(msg *rule.TaskLifecycleMessage) error {
- fmt.Printf("%v\r\n", msg)
- return nil
- }
- // RemoveMessageHandle 删除任务
- func (s *TimerService) RemoveMessageHandle(msg *rule.TaskLifecycleMessage) error {
- fmt.Printf("%v\r\n", msg)
- return nil
- }
- // UpdateMessageHandle 更新任务
- func (s *TimerService) UpdateMessageHandle(msg *rule.TaskLifecycleMessage) error {
- fmt.Printf("%v\r\n", msg)
- return nil
- }
- // SnapMessageHandle 快照任务
- func (s *TimerService) SnapMessageHandle(msg *rule.TaskLifecycleMessage) error {
- fmt.Printf("%v\r\n", msg)
- return nil
- }
|