scene.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package service
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/streadway/amqp"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/rule"
  8. "sparrow/pkg/server"
  9. "sparrow/services/scene-access/internal/service/manager"
  10. "time"
  11. )
  12. const TimerTopic = "sparrow.task.timer" // 定时任务主题
  13. // SceneService 场景服务
  14. type SceneService struct {
  15. rabbitMQAddress string
  16. taskManager manager.Producer
  17. lifecycleManager manager.Producer
  18. done chan bool
  19. isReady bool
  20. conn *amqp.Connection
  21. notifyCloseChannel chan *amqp.Error
  22. notifyChanClose chan *amqp.Error
  23. ch *amqp.Channel
  24. }
  25. func NewSceneService(mqAddr string) *SceneService {
  26. srv := &SceneService{
  27. rabbitMQAddress: mqAddr,
  28. }
  29. go srv.handleReconnect()
  30. return srv
  31. }
  32. // SubmitTask rpc 提交一个任务
  33. func (s *SceneService) SubmitTask(args rpcs.ArgsSubmitTask, reply *rpcs.ReplyEmptyResult) error {
  34. var (
  35. topic string
  36. )
  37. switch args.Type {
  38. case "timer":
  39. topic = TimerTopic
  40. }
  41. return s.taskManager.Publish(topic, []byte(args.Data))
  42. }
  43. // SubmitTaskLifecycle rpc 提交一个任务生命周期
  44. func (s *SceneService) SubmitTaskLifecycle(args rpcs.ArgsSubmitTaskLifecycle, reply *rpcs.ReplyEmptyResult) error {
  45. taskMsg := rule.TaskLifecycleMessage{
  46. TaskId: args.TaskId,
  47. Action: args.Action,
  48. Data: args.Data,
  49. }
  50. data, err := json.Marshal(&taskMsg)
  51. if err != nil {
  52. return err
  53. }
  54. fmt.Printf("task_id:%s,action:%s,data:%v", args.TaskId, args.Action, args.Data)
  55. return s.lifecycleManager.Publish("sparrow.task.timer.lifecycle", data)
  56. }
  57. func (s *SceneService) init(conn *amqp.Connection) error {
  58. ch, err := conn.Channel()
  59. if err != nil {
  60. return err
  61. }
  62. s.ch = ch
  63. taskManager := manager.NewTaskManager(ch)
  64. err = taskManager.Init()
  65. if err != nil {
  66. return err
  67. }
  68. s.taskManager = taskManager
  69. lifecycleManager := manager.NewTaskLifecycleManager(ch)
  70. err = lifecycleManager.Init()
  71. if err != nil {
  72. return err
  73. }
  74. s.lifecycleManager = lifecycleManager
  75. s.notifyChanClose = make(chan *amqp.Error)
  76. s.ch.NotifyClose(s.notifyChanClose)
  77. s.isReady = true
  78. return nil
  79. }
  80. func (s *SceneService) connect() (*amqp.Connection, error) {
  81. conn, err := amqp.Dial(s.rabbitMQAddress)
  82. if err != nil {
  83. return nil, err
  84. }
  85. s.conn = conn
  86. s.notifyCloseChannel = make(chan *amqp.Error)
  87. s.conn.NotifyClose(s.notifyCloseChannel)
  88. return conn, err
  89. }
  90. func (s *SceneService) handleReconnect() {
  91. for {
  92. s.isReady = false
  93. conn, err := s.connect()
  94. fmt.Println("handleReconnect")
  95. if err != nil {
  96. server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
  97. select {
  98. case <-s.done:
  99. return
  100. case <-time.After(4 * time.Second):
  101. }
  102. continue
  103. }
  104. if done := s.handleReInit(conn); done {
  105. break
  106. }
  107. }
  108. }
  109. func (s *SceneService) handleReInit(conn *amqp.Connection) bool {
  110. for {
  111. s.isReady = false
  112. err := s.init(conn)
  113. if err != nil {
  114. select {
  115. case <-s.done:
  116. return true
  117. case <-time.After(time.Second * 3):
  118. }
  119. continue
  120. }
  121. select {
  122. case <-s.done:
  123. return true
  124. case err := <-s.notifyCloseChannel:
  125. fmt.Println("Connection closed. Reconnecting..." + err.Error())
  126. return false
  127. case err := <-s.notifyChanClose:
  128. fmt.Println("Channel closed. Re-running init..." + err.Error())
  129. }
  130. }
  131. }