rabbitmq.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package msgQueue
  2. import (
  3. "errors"
  4. "github.com/streadway/amqp"
  5. "sparrow/pkg/protocol"
  6. "sparrow/pkg/queue"
  7. "sparrow/pkg/server"
  8. "sync"
  9. "time"
  10. )
  11. // RabbitMessageQueueAdmin rabbit mq 管理器
  12. type RabbitMessageQueueAdmin struct {
  13. conn *amqp.Connection
  14. ch *amqp.Channel
  15. arguments map[string]interface{}
  16. }
  17. func (r *RabbitMessageQueueAdmin) GetChannel() *amqp.Channel {
  18. return r.ch
  19. }
  20. func (r *RabbitMessageQueueAdmin) GetConn() *amqp.Connection {
  21. return r.conn
  22. }
  23. // RabbitMqSettings 配置
  24. type RabbitMqSettings struct {
  25. Host string
  26. }
  27. func NewRabbitMessageQueueAdmin(settings *RabbitMqSettings, args map[string]interface{}) *RabbitMessageQueueAdmin {
  28. conn, err := amqp.Dial(settings.Host)
  29. if err != nil {
  30. panic(err)
  31. }
  32. ch, err := conn.Channel()
  33. if err != nil {
  34. panic(err)
  35. }
  36. return &RabbitMessageQueueAdmin{
  37. conn: conn,
  38. ch: ch,
  39. arguments: args,
  40. }
  41. }
  42. func (r *RabbitMessageQueueAdmin) CreateTopicIfNotExists(topic string) error {
  43. _, err := r.ch.QueueDeclare(topic,
  44. true, // durable
  45. false, // delete when unused
  46. false, // exclusive
  47. false, // no-wait
  48. nil)
  49. return err
  50. }
  51. func (r *RabbitMessageQueueAdmin) Destroy() error {
  52. if r.ch != nil {
  53. if err := r.ch.Close(); err != nil {
  54. return err
  55. }
  56. }
  57. if r.conn != nil {
  58. return r.conn.Close()
  59. }
  60. return nil
  61. }
  62. // RabbitMqProducer rabbit mq message producer
  63. type RabbitMqProducer struct {
  64. defaultTopic string
  65. admin *RabbitMessageQueueAdmin
  66. settings *RabbitMqSettings
  67. channel *amqp.Channel
  68. conn *amqp.Connection
  69. topics map[string]*queue.TopicPartitionInfo
  70. }
  71. func NewRabbitMqProducer(admin *RabbitMessageQueueAdmin, defaultTopic string) *RabbitMqProducer {
  72. result := new(RabbitMqProducer)
  73. result.admin = admin
  74. result.defaultTopic = defaultTopic
  75. result.conn = admin.conn
  76. result.channel = admin.ch
  77. result.topics = make(map[string]*queue.TopicPartitionInfo)
  78. return result
  79. }
  80. func (r *RabbitMqProducer) Init() error {
  81. return nil
  82. }
  83. func (r *RabbitMqProducer) GetDefaultTopic() string {
  84. return r.defaultTopic
  85. }
  86. func (r *RabbitMqProducer) Send(info *queue.TopicPartitionInfo, payload protocol.Payload, callback queue.Callback) error {
  87. r.createTopicIfNoExist(info)
  88. if r.channel == nil {
  89. return errors.New("rabbit mq channel is not initialized")
  90. }
  91. bytes, err := payload.Marshal()
  92. if err != nil {
  93. server.Log.Errorf("queue message marshal error:%s", err.Error())
  94. return err
  95. }
  96. err = r.channel.Publish("", info.String(), false, false,
  97. amqp.Publishing{
  98. DeliveryMode: amqp.Persistent,
  99. Body: bytes,
  100. })
  101. if err != nil {
  102. if callback != nil {
  103. callback.OnFailure(err)
  104. }
  105. server.Log.Errorf("rabbit mq message publish error:%s", err.Error())
  106. return err
  107. }
  108. if callback != nil {
  109. callback.OnSuccess()
  110. }
  111. return nil
  112. }
  113. func (r *RabbitMqProducer) createTopicIfNoExist(tpi *queue.TopicPartitionInfo) {
  114. if _, ok := r.topics[tpi.String()]; !ok {
  115. _ = r.admin.CreateTopicIfNotExists(tpi.String())
  116. r.topics[tpi.String()] = tpi
  117. }
  118. }
  119. func (r *RabbitMqProducer) Stop() error {
  120. if r.admin != nil {
  121. return r.admin.Destroy()
  122. }
  123. return nil
  124. }
  125. type RabbitMqConsumer struct {
  126. admin *RabbitMessageQueueAdmin
  127. topics []string
  128. topic string
  129. partitions []*queue.TopicPartitionInfo
  130. subscribe bool
  131. mu sync.Mutex
  132. recvChan chan []byte
  133. }
  134. func (r *RabbitMqConsumer) GetTopic() string {
  135. return r.topic
  136. }
  137. func (r *RabbitMqConsumer) Subscribe() error {
  138. r.mu.Lock()
  139. defer r.mu.Unlock()
  140. r.partitions = append(r.partitions, &queue.TopicPartitionInfo{
  141. Topic: r.topic,
  142. TenantId: "",
  143. Partition: 0,
  144. MyPartition: true,
  145. })
  146. r.subscribe = false
  147. return nil
  148. }
  149. func (r *RabbitMqConsumer) SubscribeWithPartitions(partitions []*queue.TopicPartitionInfo) error {
  150. r.mu.Lock()
  151. defer r.mu.Unlock()
  152. r.partitions = partitions
  153. r.subscribe = false
  154. return nil
  155. }
  156. func (r *RabbitMqConsumer) UnSubscribe() {
  157. _ = r.admin.Destroy()
  158. }
  159. func (r *RabbitMqConsumer) Pop(duration time.Duration) (<-chan queue.QueueMessage, error) {
  160. result := make(chan queue.QueueMessage, 10)
  161. if !r.subscribe && len(r.partitions) == 0 {
  162. time.Sleep(duration)
  163. } else {
  164. r.mu.Lock()
  165. defer r.mu.Unlock()
  166. if !r.subscribe {
  167. for _, p := range r.partitions {
  168. r.topics = append(r.topics, p.String())
  169. }
  170. r.doSubscribe(r.topics)
  171. r.subscribe = true
  172. }
  173. go r.doPop(duration)
  174. go func() {
  175. for {
  176. select {
  177. case msg := <-r.recvChan:
  178. m := &queue.GobQueueMessage{}
  179. err := m.UnMarshal(msg)
  180. if err != nil {
  181. server.Log.Error(err)
  182. continue
  183. }
  184. result <- m
  185. }
  186. }
  187. }()
  188. }
  189. return result, nil
  190. }
  191. func (r *RabbitMqConsumer) doSubscribe(topics []string) {
  192. for _, item := range topics {
  193. _ = r.admin.CreateTopicIfNotExists(item)
  194. }
  195. }
  196. func (r *RabbitMqConsumer) doPop(duration time.Duration) error {
  197. if r.admin.ch == nil || r.admin.conn == nil {
  198. return errors.New("ch and conn is not init")
  199. }
  200. for _, topic := range r.topics {
  201. go func() {
  202. msgs, err := r.admin.ch.Consume(
  203. topic,
  204. "", // consumer
  205. false, // auto-ack
  206. false, // exclusive
  207. false, // no-local
  208. false, // no-wait
  209. nil, // args
  210. )
  211. if err != nil {
  212. server.Log.Error(err)
  213. return
  214. }
  215. for d := range msgs {
  216. r.recvChan <- d.Body
  217. d.Ack(true)
  218. }
  219. }()
  220. }
  221. return nil
  222. }
  223. func (r *RabbitMqConsumer) Commit() error {
  224. r.mu.Lock()
  225. defer r.mu.Unlock()
  226. return r.admin.ch.Ack(0, true)
  227. }
  228. func NewRabbitConsumer(admin *RabbitMessageQueueAdmin, topic string) *RabbitMqConsumer {
  229. return &RabbitMqConsumer{
  230. admin: admin,
  231. topics: make([]string, 0),
  232. topic: topic,
  233. partitions: make([]*queue.TopicPartitionInfo, 0),
  234. recvChan: make(chan []byte, 10),
  235. }
  236. }