timer_service.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package internal
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/frame/g"
  5. "github.com/streadway/amqp"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/rule"
  8. "sparrow/pkg/server"
  9. "time"
  10. )
  11. type TimerService struct {
  12. tc *rule.TaskLifecycleConsumer
  13. host string
  14. done chan bool
  15. isReady bool
  16. conn *amqp.Connection
  17. notifyCloseChannel chan *amqp.Error
  18. notifyChanClose chan *amqp.Error
  19. ch *amqp.Channel
  20. reconnectChan chan struct{}
  21. taskSchedule *TaskSchedule
  22. }
  23. func NewTimerService(host string) *TimerService {
  24. ts := &TimerService{
  25. host: host,
  26. done: make(chan bool),
  27. taskSchedule: NewTaskSchedule(),
  28. reconnectChan: make(chan struct{}),
  29. }
  30. go ts.handleReconnect()
  31. go ts.restartAllTask()
  32. return ts
  33. }
  34. func (s *TimerService) Ping(args string, result rpcs.ReplyEmptyResult) error {
  35. return nil
  36. }
  37. func (s *TimerService) init(conn *amqp.Connection) error {
  38. ch, err := conn.Channel()
  39. if err != nil {
  40. return err
  41. }
  42. s.ch = ch
  43. tc := rule.NewTaskLifecycleConsumer(ch)
  44. err = tc.Init()
  45. if err != nil {
  46. return err
  47. }
  48. err = tc.Start()
  49. if err != nil {
  50. return err
  51. }
  52. tc.SetExternalConsumer(s.taskSchedule)
  53. s.tc = tc
  54. s.notifyChanClose = make(chan *amqp.Error)
  55. s.ch.NotifyClose(s.notifyChanClose)
  56. s.isReady = true
  57. return s.initTaskConsumer()
  58. }
  59. // 初始化任务消费者
  60. func (s *TimerService) initTaskConsumer() error {
  61. q, err := s.ch.QueueDeclare("", false, true, false, false, nil)
  62. if err != nil {
  63. return err
  64. }
  65. err = s.ch.QueueBind(q.Name, "*.*.timer", rule.TaskExchange, false, nil)
  66. if err != nil {
  67. return err
  68. }
  69. msgChan, err := s.ch.Consume(q.Name, "", false, false, false, false, nil)
  70. if err != nil {
  71. return err
  72. }
  73. go func() {
  74. for {
  75. select {
  76. case <-s.reconnectChan:
  77. return
  78. case msg := <-msgChan:
  79. s.handleTimerTask(msg.Body)
  80. }
  81. }
  82. }()
  83. return nil
  84. }
  85. func (s *TimerService) handleTimerTask(msg []byte) {
  86. err := s.taskSchedule.AddTask(msg)
  87. if err != nil {
  88. server.Log.Errorf("create task error :%s", err.Error())
  89. }
  90. }
  91. func (s *TimerService) connect() (*amqp.Connection, error) {
  92. conn, err := amqp.Dial(s.host)
  93. if err != nil {
  94. return nil, err
  95. }
  96. s.conn = conn
  97. s.notifyCloseChannel = make(chan *amqp.Error)
  98. s.conn.NotifyClose(s.notifyCloseChannel)
  99. return conn, err
  100. }
  101. func (s *TimerService) handleReconnect() {
  102. for {
  103. s.isReady = false
  104. conn, err := s.connect()
  105. fmt.Println("handleReconnect")
  106. if err != nil {
  107. server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
  108. select {
  109. case <-s.done:
  110. return
  111. case <-time.After(4 * time.Second):
  112. }
  113. continue
  114. }
  115. if done := s.handleReInit(conn); done {
  116. break
  117. }
  118. }
  119. }
  120. func (s *TimerService) handleReInit(conn *amqp.Connection) bool {
  121. for {
  122. s.isReady = false
  123. err := s.init(conn)
  124. if err != nil {
  125. select {
  126. case <-s.done:
  127. return true
  128. case <-time.After(time.Second * 3):
  129. }
  130. continue
  131. }
  132. select {
  133. case <-s.done:
  134. return true
  135. case err := <-s.notifyCloseChannel:
  136. fmt.Println("Connection closed. Reconnecting..." + err.Error())
  137. close(s.reconnectChan)
  138. return false
  139. case err := <-s.notifyChanClose:
  140. fmt.Println("Channel closed. Re-running init..." + err.Error())
  141. }
  142. }
  143. }
  144. func (s *TimerService) restartAllTask() {
  145. _, err := g.Client().Post(server.GetRPCHost(), "http://192.168.0.62:8199/web/v1/scenes/restart")
  146. if err != nil {
  147. server.Log.Errorf("restart all task error:%s", err.Error())
  148. }
  149. }