task_lifecycle_consumer.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package rule
  2. import (
  3. "encoding/json"
  4. "github.com/streadway/amqp"
  5. "sparrow/pkg/server"
  6. )
  7. type ExternalConsumer interface {
  8. AddMessageHandle(msg *TaskLifecycleMessage) error
  9. RemoveMessageHandle(msg *TaskLifecycleMessage) error
  10. UpdateMessageHandle(msg *TaskLifecycleMessage) error
  11. SnapMessageHandle(msg *TaskLifecycleMessage) error
  12. }
  13. const TaskLifecycleExchange = "task_lifecycle_exchange"
  14. const TaskExchange = "task_exchange"
  15. type TaskLifecycleConsumer struct {
  16. ch *amqp.Channel
  17. ec ExternalConsumer
  18. stopChan chan struct{}
  19. messageChan chan []byte
  20. }
  21. func NewTaskLifecycleConsumer(ch *amqp.Channel) *TaskLifecycleConsumer {
  22. return &TaskLifecycleConsumer{
  23. ch: ch,
  24. stopChan: make(chan struct{}),
  25. messageChan: make(chan []byte, 10),
  26. }
  27. }
  28. func (a *TaskLifecycleConsumer) SetExternalConsumer(ec ExternalConsumer) {
  29. a.ec = ec
  30. }
  31. func (a *TaskLifecycleConsumer) Stop() {
  32. close(a.stopChan)
  33. }
  34. func (a *TaskLifecycleConsumer) Start() error {
  35. go func() {
  36. for {
  37. select {
  38. case <-a.stopChan:
  39. return
  40. case msg := <-a.messageChan:
  41. a.handleMessage(msg)
  42. }
  43. }
  44. }()
  45. return nil
  46. }
  47. func (a *TaskLifecycleConsumer) handleMessage(msg []byte) {
  48. if a.ec == nil {
  49. server.Log.Errorf("TaskLifecycleConsumer is unset")
  50. return
  51. }
  52. var tm TaskLifecycleMessage
  53. err := json.Unmarshal(msg, &tm)
  54. if err != nil {
  55. server.Log.Errorf("handle lifecycle message error :%v", err)
  56. return
  57. }
  58. switch tm.Action {
  59. case "add":
  60. err = a.ec.AddMessageHandle(&tm)
  61. case "remove":
  62. err = a.ec.RemoveMessageHandle(&tm)
  63. case "update":
  64. err = a.ec.UpdateMessageHandle(&tm)
  65. case "snap":
  66. err = a.ec.SnapMessageHandle(&tm)
  67. }
  68. }
  69. func (a *TaskLifecycleConsumer) Init() error {
  70. err := a.ch.ExchangeDeclare(
  71. TaskLifecycleExchange, // name
  72. "fanout", // type
  73. true, // durable
  74. false, // auto-deleted
  75. false, // internal
  76. false, // no-wait
  77. nil, // arguments
  78. )
  79. if err != nil {
  80. return err
  81. }
  82. //绑定queue到交换机
  83. q, err := a.ch.QueueDeclare(
  84. "", // name
  85. false, // durable
  86. false, // delete when unused
  87. true, // exclusive
  88. false, // no-wait
  89. nil, // arguments
  90. )
  91. if err != nil {
  92. return err
  93. }
  94. err = a.ch.QueueBind(
  95. q.Name, // queue name
  96. "", // routing key
  97. TaskLifecycleExchange,
  98. false,
  99. nil,
  100. )
  101. if err != nil {
  102. return err
  103. }
  104. // creat consumer
  105. msg, err := a.ch.Consume(q.Name, "", true, false, false, false, nil)
  106. if err != nil {
  107. return err
  108. }
  109. go func() {
  110. for {
  111. select {
  112. case <-a.stopChan:
  113. return
  114. case d := <-msg:
  115. a.messageChan <- d.Body
  116. }
  117. }
  118. }()
  119. return nil
  120. }