rabbitmq.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. package msgQueue
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/streadway/amqp"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/queue"
  8. "sparrow/pkg/server"
  9. "sync"
  10. "time"
  11. )
  12. // RabbitMessageQueueAdmin rabbit mq 管理器
  13. type RabbitMessageQueueAdmin struct {
  14. conn *amqp.Connection
  15. ch *amqp.Channel
  16. isReady bool
  17. done chan bool
  18. arguments map[string]interface{}
  19. notifyCloseChan chan *amqp.Error
  20. notifyChanClose chan *amqp.Error
  21. setting *RabbitMqSettings
  22. }
  23. func (r *RabbitMessageQueueAdmin) GetChannel() *amqp.Channel {
  24. return r.ch
  25. }
  26. func (r *RabbitMessageQueueAdmin) GetConn() *amqp.Connection {
  27. return r.conn
  28. }
  29. // RabbitMqSettings 配置
  30. type RabbitMqSettings struct {
  31. Host string
  32. }
  33. func NewRabbitMessageQueueAdmin(settings *RabbitMqSettings, args map[string]interface{}) *RabbitMessageQueueAdmin {
  34. rdm := &RabbitMessageQueueAdmin{
  35. done: make(chan bool),
  36. arguments: args,
  37. }
  38. rdm.setting = settings
  39. go rdm.handleReconnect()
  40. return rdm
  41. }
  42. func (r *RabbitMessageQueueAdmin) handleReconnect() {
  43. for {
  44. r.isReady = false
  45. conn, err := r.connect(r.setting.Host)
  46. fmt.Println("handleReconnect")
  47. if err != nil {
  48. server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
  49. select {
  50. case <-r.done:
  51. return
  52. case <-time.After(4 * time.Second):
  53. }
  54. continue
  55. }
  56. if done := r.handleReInit(conn); done {
  57. break
  58. }
  59. }
  60. }
  61. func (r *RabbitMessageQueueAdmin) handleReInit(conn *amqp.Connection) bool {
  62. for {
  63. r.isReady = false
  64. err := r.init(conn)
  65. fmt.Println("handleReInit")
  66. if err != nil {
  67. select {
  68. case <-r.done:
  69. return true
  70. case <-time.After(time.Second * 3):
  71. }
  72. continue
  73. }
  74. fmt.Println("handleReInit1")
  75. select {
  76. case <-r.done:
  77. return true
  78. case err := <-r.notifyCloseChan:
  79. fmt.Println("Connection closed. Reconnecting..." + err.Error())
  80. return false
  81. case err := <-r.notifyChanClose:
  82. fmt.Println("Channel closed. Re-running init..." + err.Error())
  83. }
  84. }
  85. }
  86. func (r *RabbitMessageQueueAdmin) init(conn *amqp.Connection) error {
  87. fmt.Println("init")
  88. ch, err := conn.Channel()
  89. if err != nil {
  90. return err
  91. }
  92. r.ch = ch
  93. r.notifyChanClose = make(chan *amqp.Error)
  94. r.ch.NotifyClose(r.notifyChanClose)
  95. r.isReady = true
  96. return nil
  97. }
  98. func (r *RabbitMessageQueueAdmin) connect(addr string) (*amqp.Connection, error) {
  99. conn, err := amqp.Dial(addr)
  100. if err != nil {
  101. return nil, err
  102. }
  103. r.conn = conn
  104. r.notifyCloseChan = make(chan *amqp.Error)
  105. r.conn.NotifyClose(r.notifyCloseChan)
  106. return conn, err
  107. }
  108. func (r *RabbitMessageQueueAdmin) CreateTopicIfNotExists(topic string) error {
  109. _, err := r.ch.QueueDeclare(topic,
  110. true, // durable
  111. false, // delete when unused
  112. false, // exclusive
  113. false, // no-wait
  114. nil)
  115. return err
  116. }
  117. func (r *RabbitMessageQueueAdmin) Destroy() error {
  118. if r.ch != nil {
  119. if err := r.ch.Close(); err != nil {
  120. return err
  121. }
  122. }
  123. if r.conn != nil {
  124. return r.conn.Close()
  125. }
  126. return nil
  127. }
  128. // RabbitMqProducer rabbit mq message producer
  129. type RabbitMqProducer struct {
  130. defaultTopic string
  131. admin *RabbitMessageQueueAdmin
  132. settings *RabbitMqSettings
  133. channel *amqp.Channel
  134. conn *amqp.Connection
  135. topics map[string]*queue.TopicPartitionInfo
  136. }
  137. func NewRabbitMqProducer(admin *RabbitMessageQueueAdmin, defaultTopic string) *RabbitMqProducer {
  138. result := new(RabbitMqProducer)
  139. result.admin = admin
  140. result.defaultTopic = defaultTopic
  141. result.topics = make(map[string]*queue.TopicPartitionInfo)
  142. return result
  143. }
  144. func (r *RabbitMqProducer) Init() error {
  145. return nil
  146. }
  147. func (r *RabbitMqProducer) GetDefaultTopic() string {
  148. return r.defaultTopic
  149. }
  150. func (r *RabbitMqProducer) Send(info *queue.TopicPartitionInfo, payload protocol.Payload, callback queue.Callback) error {
  151. r.createTopicIfNoExist(info)
  152. bytes, err := payload.Marshal()
  153. if err != nil {
  154. server.Log.Errorf("queue message marshal error:%s", err.Error())
  155. return err
  156. }
  157. // server.Log.Debugf("publish message to %s", info.String())
  158. err = r.admin.ch.Publish("", info.String(), false, false,
  159. amqp.Publishing{
  160. DeliveryMode: amqp.Persistent,
  161. Body: bytes,
  162. })
  163. if err != nil {
  164. if callback != nil {
  165. callback.OnFailure(err)
  166. }
  167. return err
  168. }
  169. if callback != nil {
  170. callback.OnSuccess()
  171. }
  172. return nil
  173. }
  174. func (r *RabbitMqProducer) createTopicIfNoExist(tpi *queue.TopicPartitionInfo) {
  175. if _, ok := r.topics[tpi.String()]; !ok {
  176. _ = r.admin.CreateTopicIfNotExists(tpi.String())
  177. r.topics[tpi.String()] = tpi
  178. }
  179. }
  180. func (r *RabbitMqProducer) Stop() error {
  181. if r.admin != nil {
  182. return r.admin.Destroy()
  183. }
  184. return nil
  185. }
  186. type RabbitMqConsumer struct {
  187. admin *RabbitMessageQueueAdmin
  188. topics []string
  189. topic string
  190. partitions []*queue.TopicPartitionInfo
  191. subscribe bool
  192. mu sync.Mutex
  193. recvChan chan []byte
  194. }
  195. func (r *RabbitMqConsumer) GetTopic() string {
  196. return r.topic
  197. }
  198. func (r *RabbitMqConsumer) Subscribe() error {
  199. r.mu.Lock()
  200. defer r.mu.Unlock()
  201. r.partitions = append(r.partitions, &queue.TopicPartitionInfo{
  202. Topic: r.topic,
  203. TenantId: "*",
  204. Partition: 0,
  205. MyPartition: true,
  206. })
  207. r.subscribe = false
  208. return nil
  209. }
  210. func (r *RabbitMqConsumer) SubscribeWithPartitions(partitions []*queue.TopicPartitionInfo) error {
  211. r.mu.Lock()
  212. defer r.mu.Unlock()
  213. r.partitions = partitions
  214. r.subscribe = false
  215. return nil
  216. }
  217. func (r *RabbitMqConsumer) UnSubscribe() {
  218. _ = r.admin.Destroy()
  219. }
  220. func (r *RabbitMqConsumer) Pop(duration time.Duration) (<-chan queue.QueueMessage, error) {
  221. result := make(chan queue.QueueMessage, 10)
  222. if !r.subscribe && len(r.partitions) == 0 {
  223. time.Sleep(duration)
  224. } else {
  225. r.mu.Lock()
  226. defer r.mu.Unlock()
  227. if !r.subscribe {
  228. for _, p := range r.partitions {
  229. r.topics = append(r.topics, p.String())
  230. }
  231. r.doSubscribe(r.topics)
  232. r.subscribe = true
  233. }
  234. go r.doPop(duration)
  235. go func() {
  236. for {
  237. select {
  238. case msg := <-r.recvChan:
  239. m := &queue.GobQueueMessage{}
  240. err := m.UnMarshal(msg)
  241. if err != nil {
  242. server.Log.Error(err)
  243. continue
  244. }
  245. result <- m
  246. }
  247. }
  248. }()
  249. }
  250. return result, nil
  251. }
  252. func (r *RabbitMqConsumer) doSubscribe(topics []string) {
  253. for _, item := range topics {
  254. _ = r.admin.CreateTopicIfNotExists(item)
  255. }
  256. }
  257. func (r *RabbitMqConsumer) doPop(duration time.Duration) error {
  258. if r.admin.ch == nil || r.admin.conn == nil {
  259. return errors.New("ch and conn is not init")
  260. }
  261. for _, topic := range r.topics {
  262. go func(tpc string) {
  263. msgs, err := r.admin.ch.Consume(
  264. tpc,
  265. "", // consumer
  266. true, // auto-ack
  267. false, // exclusive
  268. false, // no-local
  269. false, // no-wait
  270. nil, // args
  271. )
  272. if err != nil {
  273. server.Log.Error(err)
  274. return
  275. }
  276. for d := range msgs {
  277. r.recvChan <- d.Body
  278. // d.Ack(true)
  279. }
  280. }(topic)
  281. }
  282. return nil
  283. }
  284. func (r *RabbitMqConsumer) Commit() error {
  285. r.mu.Lock()
  286. defer r.mu.Unlock()
  287. return r.admin.ch.Ack(0, true)
  288. }
  289. func NewRabbitConsumer(admin *RabbitMessageQueueAdmin, topic string) *RabbitMqConsumer {
  290. return &RabbitMqConsumer{
  291. admin: admin,
  292. topics: make([]string, 0),
  293. topic: topic,
  294. partitions: make([]*queue.TopicPartitionInfo, 0),
  295. recvChan: make(chan []byte, 10),
  296. }
  297. }