rabbitmq.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package msgQueue
  2. import (
  3. "errors"
  4. "github.com/streadway/amqp"
  5. "sparrow/pkg/protocol"
  6. "sparrow/pkg/queue"
  7. "sparrow/pkg/server"
  8. "sync"
  9. "time"
  10. )
  11. // RabbitMessageQueueAdmin rabbit mq 管理器
  12. type RabbitMessageQueueAdmin struct {
  13. conn *amqp.Connection
  14. ch *amqp.Channel
  15. arguments map[string]interface{}
  16. }
  17. func (r *RabbitMessageQueueAdmin) GetChannel() *amqp.Channel {
  18. return r.ch
  19. }
  20. func (r *RabbitMessageQueueAdmin) GetConn() *amqp.Connection {
  21. return r.conn
  22. }
  23. // RabbitMqSettings 配置
  24. type RabbitMqSettings struct {
  25. Host string
  26. }
  27. func NewRabbitMessageQueueAdmin(settings *RabbitMqSettings, args map[string]interface{}) *RabbitMessageQueueAdmin {
  28. conn, err := amqp.Dial(settings.Host)
  29. if err != nil {
  30. panic(err)
  31. }
  32. ch, err := conn.Channel()
  33. if err != nil {
  34. panic(err)
  35. }
  36. return &RabbitMessageQueueAdmin{
  37. conn: conn,
  38. ch: ch,
  39. arguments: args,
  40. }
  41. }
  42. func (r *RabbitMessageQueueAdmin) CreateTopicIfNotExists(topic string) error {
  43. _, err := r.ch.QueueDeclare(topic,
  44. true, // durable
  45. false, // delete when unused
  46. false, // exclusive
  47. false, // no-wait
  48. nil)
  49. return err
  50. }
  51. func (r *RabbitMessageQueueAdmin) Destroy() error {
  52. if r.ch != nil {
  53. if err := r.ch.Close(); err != nil {
  54. return err
  55. }
  56. }
  57. if r.conn != nil {
  58. return r.conn.Close()
  59. }
  60. return nil
  61. }
  62. // RabbitMqProducer rabbit mq message producer
  63. type RabbitMqProducer struct {
  64. defaultTopic string
  65. admin *RabbitMessageQueueAdmin
  66. settings *RabbitMqSettings
  67. channel *amqp.Channel
  68. conn *amqp.Connection
  69. topics map[string]*queue.TopicPartitionInfo
  70. }
  71. func NewRabbitMqProducer(admin *RabbitMessageQueueAdmin, defaultTopic string) *RabbitMqProducer {
  72. result := new(RabbitMqProducer)
  73. result.admin = admin
  74. result.defaultTopic = defaultTopic
  75. result.conn = admin.conn
  76. result.channel = admin.ch
  77. result.topics = make(map[string]*queue.TopicPartitionInfo)
  78. return result
  79. }
  80. func (r *RabbitMqProducer) Init() error {
  81. return nil
  82. }
  83. func (r *RabbitMqProducer) GetDefaultTopic() string {
  84. return r.defaultTopic
  85. }
  86. func (r *RabbitMqProducer) Send(info *queue.TopicPartitionInfo, payload protocol.Payload, callback queue.Callback) error {
  87. r.createTopicIfNoExist(info)
  88. if r.channel == nil {
  89. return errors.New("rabbit mq channel is not initialized")
  90. }
  91. bytes, err := payload.Marshal()
  92. if err != nil {
  93. server.Log.Errorf("queue message marshal error:%s", err.Error())
  94. return err
  95. }
  96. server.Log.Debugf("publish message to %s", info.String())
  97. err = r.channel.Publish("", info.String(), false, false,
  98. amqp.Publishing{
  99. DeliveryMode: amqp.Persistent,
  100. Body: bytes,
  101. })
  102. if err != nil {
  103. if callback != nil {
  104. callback.OnFailure(err)
  105. }
  106. server.Log.Errorf("rabbit mq message publish error:%s", err.Error())
  107. return err
  108. }
  109. if callback != nil {
  110. callback.OnSuccess()
  111. }
  112. return nil
  113. }
  114. func (r *RabbitMqProducer) createTopicIfNoExist(tpi *queue.TopicPartitionInfo) {
  115. if _, ok := r.topics[tpi.String()]; !ok {
  116. _ = r.admin.CreateTopicIfNotExists(tpi.String())
  117. r.topics[tpi.String()] = tpi
  118. }
  119. }
  120. func (r *RabbitMqProducer) Stop() error {
  121. if r.admin != nil {
  122. return r.admin.Destroy()
  123. }
  124. return nil
  125. }
  126. type RabbitMqConsumer struct {
  127. admin *RabbitMessageQueueAdmin
  128. topics []string
  129. topic string
  130. partitions []*queue.TopicPartitionInfo
  131. subscribe bool
  132. mu sync.Mutex
  133. recvChan chan []byte
  134. }
  135. func (r *RabbitMqConsumer) GetTopic() string {
  136. return r.topic
  137. }
  138. func (r *RabbitMqConsumer) Subscribe() error {
  139. r.mu.Lock()
  140. defer r.mu.Unlock()
  141. r.partitions = append(r.partitions, &queue.TopicPartitionInfo{
  142. Topic: r.topic,
  143. TenantId: "1ps9djpswi0cds7cofynkso300eql4iu",
  144. Partition: 0,
  145. MyPartition: true,
  146. })
  147. r.subscribe = false
  148. return nil
  149. }
  150. func (r *RabbitMqConsumer) SubscribeWithPartitions(partitions []*queue.TopicPartitionInfo) error {
  151. r.mu.Lock()
  152. defer r.mu.Unlock()
  153. r.partitions = partitions
  154. r.subscribe = false
  155. return nil
  156. }
  157. func (r *RabbitMqConsumer) UnSubscribe() {
  158. _ = r.admin.Destroy()
  159. }
  160. func (r *RabbitMqConsumer) Pop(duration time.Duration) (<-chan queue.QueueMessage, error) {
  161. result := make(chan queue.QueueMessage, 10)
  162. if !r.subscribe && len(r.partitions) == 0 {
  163. time.Sleep(duration)
  164. } else {
  165. r.mu.Lock()
  166. defer r.mu.Unlock()
  167. if !r.subscribe {
  168. for _, p := range r.partitions {
  169. r.topics = append(r.topics, p.String())
  170. }
  171. r.doSubscribe(r.topics)
  172. r.subscribe = true
  173. }
  174. go r.doPop(duration)
  175. go func() {
  176. for {
  177. select {
  178. case msg := <-r.recvChan:
  179. m := &queue.GobQueueMessage{}
  180. err := m.UnMarshal(msg)
  181. if err != nil {
  182. server.Log.Error(err)
  183. continue
  184. }
  185. result <- m
  186. }
  187. }
  188. }()
  189. }
  190. return result, nil
  191. }
  192. func (r *RabbitMqConsumer) doSubscribe(topics []string) {
  193. for _, item := range topics {
  194. _ = r.admin.CreateTopicIfNotExists(item)
  195. }
  196. }
  197. func (r *RabbitMqConsumer) doPop(duration time.Duration) error {
  198. if r.admin.ch == nil || r.admin.conn == nil {
  199. return errors.New("ch and conn is not init")
  200. }
  201. for _, topic := range r.topics {
  202. go func() {
  203. msgs, err := r.admin.ch.Consume(
  204. topic,
  205. "", // consumer
  206. false, // auto-ack
  207. false, // exclusive
  208. false, // no-local
  209. false, // no-wait
  210. nil, // args
  211. )
  212. if err != nil {
  213. server.Log.Error(err)
  214. return
  215. }
  216. for d := range msgs {
  217. r.recvChan <- d.Body
  218. d.Ack(true)
  219. }
  220. }()
  221. }
  222. return nil
  223. }
  224. func (r *RabbitMqConsumer) Commit() error {
  225. r.mu.Lock()
  226. defer r.mu.Unlock()
  227. return r.admin.ch.Ack(0, true)
  228. }
  229. func NewRabbitConsumer(admin *RabbitMessageQueueAdmin, topic string) *RabbitMqConsumer {
  230. return &RabbitMqConsumer{
  231. admin: admin,
  232. topics: make([]string, 0),
  233. topic: topic,
  234. partitions: make([]*queue.TopicPartitionInfo, 0),
  235. recvChan: make(chan []byte, 10),
  236. }
  237. }