scene.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package service
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/streadway/amqp"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/rule"
  8. "sparrow/pkg/server"
  9. "sparrow/services/scene-access/internal/service/manager"
  10. "time"
  11. )
  12. const TimerTopic = "sparrow.task.timer" // 定时任务主题
  13. const OneKeyTopic = "sparrow.task.oneKey" // 一键执行任务主题
  14. // SceneService 场景服务
  15. type SceneService struct {
  16. rabbitMQAddress string
  17. taskManager manager.Producer
  18. lifecycleManager manager.Producer
  19. done chan bool
  20. isReady bool
  21. conn *amqp.Connection
  22. notifyCloseChannel chan *amqp.Error
  23. notifyChanClose chan *amqp.Error
  24. ch *amqp.Channel
  25. }
  26. func NewSceneService(mqAddr string) *SceneService {
  27. srv := &SceneService{
  28. rabbitMQAddress: mqAddr,
  29. }
  30. go srv.handleReconnect()
  31. return srv
  32. }
  33. // SubmitTask rpc 提交一个任务
  34. func (s *SceneService) SubmitTask(args rpcs.ArgsSubmitTask, reply *rpcs.ReplyEmptyResult) error {
  35. var (
  36. topic string
  37. )
  38. switch args.Type {
  39. case "timer":
  40. topic = TimerTopic
  41. case "oneKey":
  42. topic = OneKeyTopic
  43. }
  44. server.Log.Debugf("收到类型为%s的任务: %s", args.Type, args.Data)
  45. return s.taskManager.Publish(topic, []byte(args.Data))
  46. }
  47. // SubmitTaskLifecycle rpc 提交一个任务生命周期
  48. func (s *SceneService) SubmitTaskLifecycle(args rpcs.ArgsSubmitTaskLifecycle, reply *rpcs.ReplyEmptyResult) error {
  49. taskMsg := rule.TaskLifecycleMessage{
  50. TaskId: args.TaskId,
  51. Action: args.Action,
  52. Data: args.Data,
  53. }
  54. data, err := json.Marshal(&taskMsg)
  55. if err != nil {
  56. return err
  57. }
  58. return s.lifecycleManager.Publish("sparrow.task.timer.lifecycle", data)
  59. }
  60. func (s *SceneService) init(conn *amqp.Connection) error {
  61. ch, err := conn.Channel()
  62. if err != nil {
  63. return err
  64. }
  65. s.ch = ch
  66. taskManager := manager.NewTaskManager(ch)
  67. err = taskManager.Init()
  68. if err != nil {
  69. return err
  70. }
  71. s.taskManager = taskManager
  72. lifecycleManager := manager.NewTaskLifecycleManager(ch)
  73. err = lifecycleManager.Init()
  74. if err != nil {
  75. return err
  76. }
  77. s.lifecycleManager = lifecycleManager
  78. s.notifyChanClose = make(chan *amqp.Error)
  79. s.ch.NotifyClose(s.notifyChanClose)
  80. s.isReady = true
  81. return nil
  82. }
  83. func (s *SceneService) connect() (*amqp.Connection, error) {
  84. conn, err := amqp.Dial(s.rabbitMQAddress)
  85. if err != nil {
  86. return nil, err
  87. }
  88. s.conn = conn
  89. s.notifyCloseChannel = make(chan *amqp.Error)
  90. s.conn.NotifyClose(s.notifyCloseChannel)
  91. return conn, err
  92. }
  93. func (s *SceneService) handleReconnect() {
  94. for {
  95. s.isReady = false
  96. conn, err := s.connect()
  97. fmt.Println("handleReconnect")
  98. if err != nil {
  99. server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
  100. select {
  101. case <-s.done:
  102. return
  103. case <-time.After(4 * time.Second):
  104. }
  105. continue
  106. }
  107. if done := s.handleReInit(conn); done {
  108. break
  109. }
  110. }
  111. }
  112. func (s *SceneService) handleReInit(conn *amqp.Connection) bool {
  113. for {
  114. s.isReady = false
  115. err := s.init(conn)
  116. if err != nil {
  117. select {
  118. case <-s.done:
  119. return true
  120. case <-time.After(time.Second * 3):
  121. }
  122. continue
  123. }
  124. select {
  125. case <-s.done:
  126. return true
  127. case err := <-s.notifyCloseChannel:
  128. fmt.Println("Connection closed. Reconnecting..." + err.Error())
  129. return false
  130. case err := <-s.notifyChanClose:
  131. fmt.Println("Channel closed. Re-running init..." + err.Error())
  132. }
  133. }
  134. }