timer_service.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package internal
  2. import (
  3. "fmt"
  4. "github.com/streadway/amqp"
  5. "sparrow/pkg/rule"
  6. "sparrow/pkg/server"
  7. "time"
  8. )
  9. type TimerService struct {
  10. tc *rule.TaskLifecycleConsumer
  11. host string
  12. done chan bool
  13. isReady bool
  14. conn *amqp.Connection
  15. notifyCloseChannel chan *amqp.Error
  16. notifyChanClose chan *amqp.Error
  17. ch *amqp.Channel
  18. reconnectChan chan struct{}
  19. }
  20. func NewTimerService(host string) *TimerService {
  21. ts := &TimerService{
  22. host: host,
  23. done: make(chan bool),
  24. reconnectChan: make(chan struct{}),
  25. }
  26. go ts.handleReconnect()
  27. return ts
  28. }
  29. func (s *TimerService) init(conn *amqp.Connection) error {
  30. ch, err := conn.Channel()
  31. if err != nil {
  32. return err
  33. }
  34. s.ch = ch
  35. tc := rule.NewTaskLifecycleConsumer(ch)
  36. err = tc.Init()
  37. if err != nil {
  38. return err
  39. }
  40. err = tc.Start()
  41. if err != nil {
  42. return err
  43. }
  44. tc.SetExternalConsumer(s)
  45. s.tc = tc
  46. s.notifyChanClose = make(chan *amqp.Error)
  47. s.ch.NotifyClose(s.notifyChanClose)
  48. s.isReady = true
  49. return s.initTaskConsumer()
  50. }
  51. // 初始化任务消费者
  52. func (s *TimerService) initTaskConsumer() error {
  53. q, err := s.ch.QueueDeclare("", false, true, false, false, nil)
  54. if err != nil {
  55. return err
  56. }
  57. err = s.ch.QueueBind(q.Name, "*.*.timer", rule.TaskExchange, false, nil)
  58. if err != nil {
  59. return err
  60. }
  61. msgChan, err := s.ch.Consume(q.Name, "", false, false, false, false, nil)
  62. if err != nil {
  63. return err
  64. }
  65. go func() {
  66. for {
  67. select {
  68. case <-s.reconnectChan:
  69. return
  70. case msg := <-msgChan:
  71. s.handleTimerTask(msg.Body)
  72. }
  73. }
  74. }()
  75. return nil
  76. }
  77. func (s *TimerService) handleTimerTask(msg []byte) {
  78. fmt.Printf("%s", msg)
  79. }
  80. func (s *TimerService) connect() (*amqp.Connection, error) {
  81. conn, err := amqp.Dial(s.host)
  82. if err != nil {
  83. return nil, err
  84. }
  85. s.conn = conn
  86. s.notifyCloseChannel = make(chan *amqp.Error)
  87. s.conn.NotifyClose(s.notifyCloseChannel)
  88. return conn, err
  89. }
  90. func (s *TimerService) handleReconnect() {
  91. for {
  92. s.isReady = false
  93. conn, err := s.connect()
  94. fmt.Println("handleReconnect")
  95. if err != nil {
  96. server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
  97. select {
  98. case <-s.done:
  99. return
  100. case <-time.After(4 * time.Second):
  101. }
  102. continue
  103. }
  104. if done := s.handleReInit(conn); done {
  105. break
  106. }
  107. }
  108. }
  109. func (s *TimerService) handleReInit(conn *amqp.Connection) bool {
  110. for {
  111. s.isReady = false
  112. err := s.init(conn)
  113. if err != nil {
  114. select {
  115. case <-s.done:
  116. return true
  117. case <-time.After(time.Second * 3):
  118. }
  119. continue
  120. }
  121. select {
  122. case <-s.done:
  123. return true
  124. case err := <-s.notifyCloseChannel:
  125. fmt.Println("Connection closed. Reconnecting..." + err.Error())
  126. close(s.reconnectChan)
  127. return false
  128. case err := <-s.notifyChanClose:
  129. fmt.Println("Channel closed. Re-running init..." + err.Error())
  130. }
  131. }
  132. }
  133. /* 实现任务生命周期消息管理 */
  134. // AddMessageHandle 新增任务
  135. func (s *TimerService) AddMessageHandle(msg *rule.TaskLifecycleMessage) error {
  136. fmt.Printf("%v\r\n", msg)
  137. return nil
  138. }
  139. // RemoveMessageHandle 删除任务
  140. func (s *TimerService) RemoveMessageHandle(msg *rule.TaskLifecycleMessage) error {
  141. fmt.Printf("%v\r\n", msg)
  142. return nil
  143. }
  144. // UpdateMessageHandle 更新任务
  145. func (s *TimerService) UpdateMessageHandle(msg *rule.TaskLifecycleMessage) error {
  146. fmt.Printf("%v\r\n", msg)
  147. return nil
  148. }
  149. // SnapMessageHandle 快照任务
  150. func (s *TimerService) SnapMessageHandle(msg *rule.TaskLifecycleMessage) error {
  151. fmt.Printf("%v\r\n", msg)
  152. return nil
  153. }