scene.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package service
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/streadway/amqp"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/rule"
  8. "sparrow/pkg/server"
  9. "sparrow/services/scene-access/internal/service/manager"
  10. "time"
  11. )
  12. const TimerTopic = "sparrow.task.timer" // 定时任务主题
  13. // SceneService 场景服务
  14. type SceneService struct {
  15. rabbitMQAddress string
  16. taskManager manager.Producer
  17. lifecycleManager manager.Producer
  18. done chan bool
  19. isReady bool
  20. conn *amqp.Connection
  21. notifyCloseChannel chan *amqp.Error
  22. notifyChanClose chan *amqp.Error
  23. ch *amqp.Channel
  24. }
  25. func NewSceneService(mqAddr string) *SceneService {
  26. srv := &SceneService{
  27. rabbitMQAddress: mqAddr,
  28. }
  29. go srv.handleReconnect()
  30. return srv
  31. }
  32. // SubmitTask rpc 提交一个任务
  33. func (s *SceneService) SubmitTask(args rpcs.ArgsSubmitTask, reply *rpcs.ReplyEmptyResult) error {
  34. fmt.Println("-------")
  35. var (
  36. topic string
  37. )
  38. switch args.Type {
  39. case "timer":
  40. topic = TimerTopic
  41. }
  42. fmt.Println("-----------------------")
  43. return s.taskManager.Publish(topic, []byte(args.Data))
  44. }
  45. // SubmitTaskLifecycle rpc 提交一个任务生命周期
  46. func (s *SceneService) SubmitTaskLifecycle(args rpcs.ArgsSubmitTaskLifecycle, reply *rpcs.ReplyEmptyResult) error {
  47. taskMsg := rule.TaskLifecycleMessage{
  48. TaskId: args.TaskId,
  49. Action: args.Action,
  50. Data: args.Data,
  51. }
  52. data, err := json.Marshal(&taskMsg)
  53. if err != nil {
  54. return err
  55. }
  56. return s.lifecycleManager.Publish("sparrow.task.timer.lifecycle", data)
  57. }
  58. func (s *SceneService) init(conn *amqp.Connection) error {
  59. ch, err := conn.Channel()
  60. if err != nil {
  61. return err
  62. }
  63. s.ch = ch
  64. taskManager := manager.NewTaskManager(ch)
  65. err = taskManager.Init()
  66. if err != nil {
  67. return err
  68. }
  69. s.taskManager = taskManager
  70. lifecycleManager := manager.NewTaskLifecycleManager(ch)
  71. err = lifecycleManager.Init()
  72. if err != nil {
  73. return err
  74. }
  75. s.lifecycleManager = lifecycleManager
  76. s.notifyChanClose = make(chan *amqp.Error)
  77. s.ch.NotifyClose(s.notifyChanClose)
  78. s.isReady = true
  79. return nil
  80. }
  81. func (s *SceneService) connect() (*amqp.Connection, error) {
  82. conn, err := amqp.Dial(s.rabbitMQAddress)
  83. if err != nil {
  84. return nil, err
  85. }
  86. s.conn = conn
  87. s.notifyCloseChannel = make(chan *amqp.Error)
  88. s.conn.NotifyClose(s.notifyCloseChannel)
  89. return conn, err
  90. }
  91. func (s *SceneService) handleReconnect() {
  92. for {
  93. s.isReady = false
  94. conn, err := s.connect()
  95. fmt.Println("handleReconnect")
  96. if err != nil {
  97. server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
  98. select {
  99. case <-s.done:
  100. return
  101. case <-time.After(4 * time.Second):
  102. }
  103. continue
  104. }
  105. if done := s.handleReInit(conn); done {
  106. break
  107. }
  108. }
  109. }
  110. func (s *SceneService) handleReInit(conn *amqp.Connection) bool {
  111. for {
  112. s.isReady = false
  113. err := s.init(conn)
  114. if err != nil {
  115. select {
  116. case <-s.done:
  117. return true
  118. case <-time.After(time.Second * 3):
  119. }
  120. continue
  121. }
  122. select {
  123. case <-s.done:
  124. return true
  125. case err := <-s.notifyCloseChannel:
  126. fmt.Println("Connection closed. Reconnecting..." + err.Error())
  127. return false
  128. case err := <-s.notifyChanClose:
  129. fmt.Println("Channel closed. Re-running init..." + err.Error())
  130. }
  131. }
  132. }