scene.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package service
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/streadway/amqp"
  6. "sparrow/pkg/rule"
  7. "sparrow/pkg/server"
  8. "sparrow/services/scene-access/internal/service/manager"
  9. "time"
  10. )
  11. const TimerTopic = "sparrow.task.timer" // 定时任务主题
  12. // SceneService 场景服务
  13. type SceneService struct {
  14. rabbitMQAddress string
  15. taskManager manager.Producer
  16. lifecycleManager manager.Producer
  17. done chan bool
  18. isReady bool
  19. conn *amqp.Connection
  20. notifyCloseChannel chan *amqp.Error
  21. notifyChanClose chan *amqp.Error
  22. ch *amqp.Channel
  23. }
  24. func NewSceneService(mqAddr string) *SceneService {
  25. srv := &SceneService{
  26. rabbitMQAddress: mqAddr,
  27. }
  28. go srv.handleReconnect()
  29. return srv
  30. }
  31. // SubmitTask 提交一个任务
  32. func (s *SceneService) SubmitTask() error {
  33. return s.taskManager.Publish(TimerTopic, []byte("this is a task"))
  34. }
  35. func (s *SceneService) SubmitTaskLifecycle() error {
  36. taskMsg := rule.TaskLifecycleMessage{
  37. TaskId: "xxxxxx",
  38. Action: "add",
  39. Data: "test",
  40. }
  41. data, err := json.Marshal(&taskMsg)
  42. if err != nil {
  43. return err
  44. }
  45. return s.lifecycleManager.Publish("sparrow.task.timer.lifecycle", data)
  46. }
  47. func (s *SceneService) init(conn *amqp.Connection) error {
  48. ch, err := conn.Channel()
  49. if err != nil {
  50. return err
  51. }
  52. s.ch = ch
  53. taskManager := manager.NewTaskManager(ch)
  54. err = taskManager.Init()
  55. if err != nil {
  56. return err
  57. }
  58. s.taskManager = taskManager
  59. lifecycleManager := manager.NewTaskLifecycleManager(ch)
  60. err = lifecycleManager.Init()
  61. if err != nil {
  62. return err
  63. }
  64. s.lifecycleManager = lifecycleManager
  65. s.notifyChanClose = make(chan *amqp.Error)
  66. s.ch.NotifyClose(s.notifyChanClose)
  67. s.isReady = true
  68. return nil
  69. }
  70. func (s *SceneService) connect() (*amqp.Connection, error) {
  71. conn, err := amqp.Dial(s.rabbitMQAddress)
  72. if err != nil {
  73. return nil, err
  74. }
  75. s.conn = conn
  76. s.notifyCloseChannel = make(chan *amqp.Error)
  77. s.conn.NotifyClose(s.notifyCloseChannel)
  78. return conn, err
  79. }
  80. func (s *SceneService) handleReconnect() {
  81. for {
  82. s.isReady = false
  83. conn, err := s.connect()
  84. fmt.Println("handleReconnect")
  85. if err != nil {
  86. server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
  87. select {
  88. case <-s.done:
  89. return
  90. case <-time.After(4 * time.Second):
  91. }
  92. continue
  93. }
  94. if done := s.handleReInit(conn); done {
  95. break
  96. }
  97. }
  98. }
  99. func (s *SceneService) handleReInit(conn *amqp.Connection) bool {
  100. for {
  101. s.isReady = false
  102. err := s.init(conn)
  103. if err != nil {
  104. select {
  105. case <-s.done:
  106. return true
  107. case <-time.After(time.Second * 3):
  108. }
  109. continue
  110. }
  111. select {
  112. case <-s.done:
  113. return true
  114. case err := <-s.notifyCloseChannel:
  115. fmt.Println("Connection closed. Reconnecting..." + err.Error())
  116. return false
  117. case err := <-s.notifyChanClose:
  118. fmt.Println("Channel closed. Re-running init..." + err.Error())
  119. }
  120. }
  121. }