scene.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package service
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/streadway/amqp"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/rule"
  8. "sparrow/pkg/server"
  9. "sparrow/services/scene-access/internal/service/manager"
  10. "time"
  11. )
  12. const TimerTopic = "sparrow.task.timer" // 定时任务主题
  13. // SceneService 场景服务
  14. type SceneService struct {
  15. rabbitMQAddress string
  16. taskManager manager.Producer
  17. lifecycleManager manager.Producer
  18. done chan bool
  19. isReady bool
  20. conn *amqp.Connection
  21. notifyCloseChannel chan *amqp.Error
  22. notifyChanClose chan *amqp.Error
  23. ch *amqp.Channel
  24. }
  25. func NewSceneService(mqAddr string) *SceneService {
  26. srv := &SceneService{
  27. rabbitMQAddress: mqAddr,
  28. }
  29. go srv.handleReconnect()
  30. return srv
  31. }
  32. // SubmitTask rpc 提交一个任务
  33. func (s *SceneService) SubmitTask(args rpcs.ArgsSubmitTask, reply *rpcs.ReplyEmptyResult) error {
  34. var (
  35. topic string
  36. )
  37. switch args.Type {
  38. case "timer":
  39. topic = TimerTopic
  40. }
  41. return s.taskManager.Publish(topic, []byte(args.Data))
  42. }
  43. // SubmitTaskLifecycle rpc 提交一个任务生命周期
  44. func (s *SceneService) SubmitTaskLifecycle(args rpcs.ArgsSubmitTaskLifecycle, reply *rpcs.ReplyEmptyResult) error {
  45. taskMsg := rule.TaskLifecycleMessage{
  46. TaskId: args.TaskId,
  47. Action: args.Action,
  48. Data: args.Data,
  49. }
  50. data, err := json.Marshal(&taskMsg)
  51. if err != nil {
  52. return err
  53. }
  54. return s.lifecycleManager.Publish("sparrow.task.timer.lifecycle", data)
  55. }
  56. func (s *SceneService) init(conn *amqp.Connection) error {
  57. ch, err := conn.Channel()
  58. if err != nil {
  59. return err
  60. }
  61. s.ch = ch
  62. taskManager := manager.NewTaskManager(ch)
  63. err = taskManager.Init()
  64. if err != nil {
  65. return err
  66. }
  67. s.taskManager = taskManager
  68. lifecycleManager := manager.NewTaskLifecycleManager(ch)
  69. err = lifecycleManager.Init()
  70. if err != nil {
  71. return err
  72. }
  73. s.lifecycleManager = lifecycleManager
  74. s.notifyChanClose = make(chan *amqp.Error)
  75. s.ch.NotifyClose(s.notifyChanClose)
  76. s.isReady = true
  77. return nil
  78. }
  79. func (s *SceneService) connect() (*amqp.Connection, error) {
  80. conn, err := amqp.Dial(s.rabbitMQAddress)
  81. if err != nil {
  82. return nil, err
  83. }
  84. s.conn = conn
  85. s.notifyCloseChannel = make(chan *amqp.Error)
  86. s.conn.NotifyClose(s.notifyCloseChannel)
  87. return conn, err
  88. }
  89. func (s *SceneService) handleReconnect() {
  90. for {
  91. s.isReady = false
  92. conn, err := s.connect()
  93. fmt.Println("handleReconnect")
  94. if err != nil {
  95. server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
  96. select {
  97. case <-s.done:
  98. return
  99. case <-time.After(4 * time.Second):
  100. }
  101. continue
  102. }
  103. if done := s.handleReInit(conn); done {
  104. break
  105. }
  106. }
  107. }
  108. func (s *SceneService) handleReInit(conn *amqp.Connection) bool {
  109. for {
  110. s.isReady = false
  111. err := s.init(conn)
  112. if err != nil {
  113. select {
  114. case <-s.done:
  115. return true
  116. case <-time.After(time.Second * 3):
  117. }
  118. continue
  119. }
  120. select {
  121. case <-s.done:
  122. return true
  123. case err := <-s.notifyCloseChannel:
  124. fmt.Println("Connection closed. Reconnecting..." + err.Error())
  125. return false
  126. case err := <-s.notifyChanClose:
  127. fmt.Println("Channel closed. Re-running init..." + err.Error())
  128. }
  129. }
  130. }