onekey_service.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package internal
  2. import (
  3. "fmt"
  4. "github.com/streadway/amqp"
  5. "sparrow/pkg/rpcs"
  6. "sparrow/pkg/rule"
  7. "sparrow/pkg/server"
  8. "time"
  9. )
  10. type OneKeyService struct {
  11. tc *rule.TaskLifecycleConsumer
  12. host string
  13. done chan bool
  14. isReady bool
  15. conn *amqp.Connection
  16. notifyCloseChannel chan *amqp.Error
  17. notifyChanClose chan *amqp.Error
  18. ch *amqp.Channel
  19. reconnectChan chan struct{}
  20. taskSchedule *TaskSchedule
  21. }
  22. func NewOneKeyService(host string) *OneKeyService {
  23. ts := &OneKeyService{
  24. host: host,
  25. done: make(chan bool),
  26. taskSchedule: NewTaskSchedule(),
  27. reconnectChan: make(chan struct{}),
  28. }
  29. go ts.handleReconnect()
  30. return ts
  31. }
  32. func (s *OneKeyService) Ping(args string, result rpcs.ReplyEmptyResult) error {
  33. return nil
  34. }
  35. func (s *OneKeyService) init(conn *amqp.Connection) error {
  36. ch, err := conn.Channel()
  37. if err != nil {
  38. return err
  39. }
  40. s.ch = ch
  41. tc := rule.NewTaskLifecycleConsumer(ch)
  42. err = tc.Init()
  43. if err != nil {
  44. return err
  45. }
  46. err = tc.Start()
  47. if err != nil {
  48. return err
  49. }
  50. tc.SetExternalConsumer(s.taskSchedule)
  51. s.tc = tc
  52. s.notifyChanClose = make(chan *amqp.Error)
  53. s.ch.NotifyClose(s.notifyChanClose)
  54. s.isReady = true
  55. return s.initTaskConsumer()
  56. }
  57. // 初始化任务消费者
  58. func (s *OneKeyService) initTaskConsumer() error {
  59. q, err := s.ch.QueueDeclare("", false, true, false, false, nil)
  60. if err != nil {
  61. return err
  62. }
  63. err = s.ch.QueueBind(q.Name, "*.*.oneKey", rule.TaskExchange, false, nil)
  64. if err != nil {
  65. return err
  66. }
  67. msgChan, err := s.ch.Consume(q.Name, "", false, false, false, false, nil)
  68. if err != nil {
  69. return err
  70. }
  71. go func() {
  72. for {
  73. select {
  74. case <-s.reconnectChan:
  75. return
  76. case msg := <-msgChan:
  77. s.handleOneKeyTask(msg.Body)
  78. }
  79. }
  80. }()
  81. return nil
  82. }
  83. func (s *OneKeyService) handleOneKeyTask(msg []byte) {
  84. server.Log.Debugf("do task %s", msg)
  85. err := s.taskSchedule.DoTask(msg)
  86. if err != nil {
  87. server.Log.Errorf("do task error :%s", err.Error())
  88. }
  89. }
  90. func (s *OneKeyService) connect() (*amqp.Connection, error) {
  91. conn, err := amqp.Dial(s.host)
  92. if err != nil {
  93. return nil, err
  94. }
  95. s.conn = conn
  96. s.notifyCloseChannel = make(chan *amqp.Error)
  97. s.conn.NotifyClose(s.notifyCloseChannel)
  98. return conn, err
  99. }
  100. func (s *OneKeyService) handleReconnect() {
  101. for {
  102. s.isReady = false
  103. conn, err := s.connect()
  104. fmt.Println("handleReconnect")
  105. if err != nil {
  106. server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
  107. select {
  108. case <-s.done:
  109. return
  110. case <-time.After(4 * time.Second):
  111. }
  112. continue
  113. }
  114. if done := s.handleReInit(conn); done {
  115. break
  116. }
  117. }
  118. }
  119. func (s *OneKeyService) handleReInit(conn *amqp.Connection) bool {
  120. for {
  121. s.isReady = false
  122. err := s.init(conn)
  123. if err != nil {
  124. select {
  125. case <-s.done:
  126. return true
  127. case <-time.After(time.Second * 3):
  128. }
  129. continue
  130. }
  131. select {
  132. case <-s.done:
  133. return true
  134. case err := <-s.notifyCloseChannel:
  135. fmt.Println("Connection closed. Reconnecting..." + err.Error())
  136. close(s.reconnectChan)
  137. return false
  138. case err := <-s.notifyChanClose:
  139. fmt.Println("Channel closed. Re-running init..." + err.Error())
  140. }
  141. }
  142. }