scene.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package service
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/streadway/amqp"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/rule"
  8. "sparrow/pkg/server"
  9. "sparrow/services/scene-access/internal/service/manager"
  10. "time"
  11. )
  12. const TimerTopic = "sparrow.task.timer" // 定时任务主题
  13. const OneKeyTopic = "sparrow.task.oneKey" // 一键执行任务主题
  14. // SceneService 场景服务
  15. type SceneService struct {
  16. rabbitMQAddress string
  17. taskManager manager.Producer
  18. lifecycleManager manager.Producer
  19. done chan bool
  20. isReady bool
  21. conn *amqp.Connection
  22. notifyCloseChannel chan *amqp.Error
  23. notifyChanClose chan *amqp.Error
  24. ch *amqp.Channel
  25. }
  26. func NewSceneService(mqAddr string) *SceneService {
  27. srv := &SceneService{
  28. rabbitMQAddress: mqAddr,
  29. }
  30. go srv.handleReconnect()
  31. return srv
  32. }
  33. // SubmitTask rpc 提交一个任务
  34. func (s *SceneService) SubmitTask(args rpcs.ArgsSubmitTask, reply *rpcs.ReplyEmptyResult) error {
  35. var (
  36. topic string
  37. )
  38. switch args.Type {
  39. case "timer":
  40. topic = TimerTopic
  41. case "oneKey":
  42. topic = OneKeyTopic
  43. }
  44. return s.taskManager.Publish(topic, []byte(args.Data))
  45. }
  46. // SubmitTaskLifecycle rpc 提交一个任务生命周期
  47. func (s *SceneService) SubmitTaskLifecycle(args rpcs.ArgsSubmitTaskLifecycle, reply *rpcs.ReplyEmptyResult) error {
  48. taskMsg := rule.TaskLifecycleMessage{
  49. TaskId: args.TaskId,
  50. Action: args.Action,
  51. Data: args.Data,
  52. }
  53. data, err := json.Marshal(&taskMsg)
  54. if err != nil {
  55. return err
  56. }
  57. return s.lifecycleManager.Publish("sparrow.task.timer.lifecycle", data)
  58. }
  59. func (s *SceneService) init(conn *amqp.Connection) error {
  60. ch, err := conn.Channel()
  61. if err != nil {
  62. return err
  63. }
  64. s.ch = ch
  65. taskManager := manager.NewTaskManager(ch)
  66. err = taskManager.Init()
  67. if err != nil {
  68. return err
  69. }
  70. s.taskManager = taskManager
  71. lifecycleManager := manager.NewTaskLifecycleManager(ch)
  72. err = lifecycleManager.Init()
  73. if err != nil {
  74. return err
  75. }
  76. s.lifecycleManager = lifecycleManager
  77. s.notifyChanClose = make(chan *amqp.Error)
  78. s.ch.NotifyClose(s.notifyChanClose)
  79. s.isReady = true
  80. return nil
  81. }
  82. func (s *SceneService) connect() (*amqp.Connection, error) {
  83. conn, err := amqp.Dial(s.rabbitMQAddress)
  84. if err != nil {
  85. return nil, err
  86. }
  87. s.conn = conn
  88. s.notifyCloseChannel = make(chan *amqp.Error)
  89. s.conn.NotifyClose(s.notifyCloseChannel)
  90. return conn, err
  91. }
  92. func (s *SceneService) handleReconnect() {
  93. for {
  94. s.isReady = false
  95. conn, err := s.connect()
  96. fmt.Println("handleReconnect")
  97. if err != nil {
  98. server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
  99. select {
  100. case <-s.done:
  101. return
  102. case <-time.After(4 * time.Second):
  103. }
  104. continue
  105. }
  106. if done := s.handleReInit(conn); done {
  107. break
  108. }
  109. }
  110. }
  111. func (s *SceneService) handleReInit(conn *amqp.Connection) bool {
  112. for {
  113. s.isReady = false
  114. err := s.init(conn)
  115. if err != nil {
  116. select {
  117. case <-s.done:
  118. return true
  119. case <-time.After(time.Second * 3):
  120. }
  121. continue
  122. }
  123. select {
  124. case <-s.done:
  125. return true
  126. case err := <-s.notifyCloseChannel:
  127. fmt.Println("Connection closed. Reconnecting..." + err.Error())
  128. return false
  129. case err := <-s.notifyChanClose:
  130. fmt.Println("Channel closed. Re-running init..." + err.Error())
  131. }
  132. }
  133. }