rabbitmq.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package msgQueue
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/streadway/amqp"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/queue"
  8. "sparrow/pkg/server"
  9. "sync"
  10. "time"
  11. )
  12. // RabbitMessageQueueAdmin rabbit mq 管理器
  13. type RabbitMessageQueueAdmin struct {
  14. conn *amqp.Connection
  15. ch *amqp.Channel
  16. isReady bool
  17. done chan bool
  18. arguments map[string]interface{}
  19. notifyCloseChan chan *amqp.Error
  20. notifyChanClose chan *amqp.Error
  21. setting *RabbitMqSettings
  22. }
  23. func (r *RabbitMessageQueueAdmin) GetChannel() *amqp.Channel {
  24. return r.ch
  25. }
  26. func (r *RabbitMessageQueueAdmin) GetConn() *amqp.Connection {
  27. return r.conn
  28. }
  29. // RabbitMqSettings 配置
  30. type RabbitMqSettings struct {
  31. Host string
  32. }
  33. func NewRabbitMessageQueueAdmin(settings *RabbitMqSettings, args map[string]interface{}) *RabbitMessageQueueAdmin {
  34. rdm := &RabbitMessageQueueAdmin{
  35. done: make(chan bool),
  36. arguments: args,
  37. }
  38. rdm.setting = settings
  39. go rdm.handleReconnect()
  40. return rdm
  41. }
  42. func (r *RabbitMessageQueueAdmin) handleReconnect() {
  43. for {
  44. r.isReady = false
  45. conn, err := r.connect(r.setting.Host)
  46. fmt.Println("handleReconnect")
  47. if err != nil {
  48. server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
  49. select {
  50. case <-r.done:
  51. return
  52. case <-time.After(4 * time.Second):
  53. }
  54. continue
  55. }
  56. if done := r.handleReInit(conn); done {
  57. break
  58. }
  59. }
  60. }
  61. func (r *RabbitMessageQueueAdmin) handleReInit(conn *amqp.Connection) bool {
  62. for {
  63. r.isReady = false
  64. err := r.init(conn)
  65. fmt.Println("handleReInit")
  66. if err != nil {
  67. select {
  68. case <-r.done:
  69. return true
  70. case <-time.After(time.Second * 3):
  71. }
  72. continue
  73. }
  74. fmt.Println("handleReInit1")
  75. select {
  76. case <-r.done:
  77. return true
  78. case err :=<-r.notifyCloseChan:
  79. fmt.Println("Connection closed. Reconnecting..."+ err.Error())
  80. return false
  81. case err :=<-r.notifyChanClose:
  82. fmt.Println("Channel closed. Re-running init..." + err.Error())
  83. }
  84. }
  85. }
  86. func (r *RabbitMessageQueueAdmin) init(conn *amqp.Connection) error {
  87. fmt.Println("init")
  88. ch, err := conn.Channel()
  89. if err != nil {
  90. return err
  91. }
  92. //err = ch.Confirm(false)
  93. //if err != nil {
  94. // return err
  95. //}
  96. r.ch = ch
  97. r.notifyChanClose = make(chan *amqp.Error)
  98. r.ch.NotifyClose(r.notifyChanClose)
  99. r.isReady = true
  100. return nil
  101. }
  102. func (r *RabbitMessageQueueAdmin) connect(addr string) (*amqp.Connection, error) {
  103. conn, err := amqp.Dial(addr)
  104. if err != nil {
  105. return nil, err
  106. }
  107. r.conn = conn
  108. r.notifyCloseChan = make(chan *amqp.Error)
  109. r.conn.NotifyClose(r.notifyCloseChan)
  110. return conn, err
  111. }
  112. func (r *RabbitMessageQueueAdmin) CreateTopicIfNotExists(topic string) error {
  113. _, err := r.ch.QueueDeclare(topic,
  114. true, // durable
  115. false, // delete when unused
  116. false, // exclusive
  117. false, // no-wait
  118. nil)
  119. return err
  120. }
  121. func (r *RabbitMessageQueueAdmin) Destroy() error {
  122. if r.ch != nil {
  123. if err := r.ch.Close(); err != nil {
  124. return err
  125. }
  126. }
  127. if r.conn != nil {
  128. return r.conn.Close()
  129. }
  130. return nil
  131. }
  132. // RabbitMqProducer rabbit mq message producer
  133. type RabbitMqProducer struct {
  134. defaultTopic string
  135. admin *RabbitMessageQueueAdmin
  136. settings *RabbitMqSettings
  137. channel *amqp.Channel
  138. conn *amqp.Connection
  139. topics map[string]*queue.TopicPartitionInfo
  140. }
  141. func NewRabbitMqProducer(admin *RabbitMessageQueueAdmin, defaultTopic string) *RabbitMqProducer {
  142. result := new(RabbitMqProducer)
  143. result.admin = admin
  144. result.defaultTopic = defaultTopic
  145. result.topics = make(map[string]*queue.TopicPartitionInfo)
  146. return result
  147. }
  148. func (r *RabbitMqProducer) Init() error {
  149. return nil
  150. }
  151. func (r *RabbitMqProducer) GetDefaultTopic() string {
  152. return r.defaultTopic
  153. }
  154. func (r *RabbitMqProducer) Send(info *queue.TopicPartitionInfo, payload protocol.Payload, callback queue.Callback) error {
  155. r.createTopicIfNoExist(info)
  156. bytes, err := payload.Marshal()
  157. if err != nil {
  158. server.Log.Errorf("queue message marshal error:%s", err.Error())
  159. return err
  160. }
  161. server.Log.Debugf("publish message to %s", info.String())
  162. err = r.admin.ch.Publish("", info.String(), false, false,
  163. amqp.Publishing{
  164. DeliveryMode: amqp.Persistent,
  165. Body: bytes,
  166. })
  167. if err != nil {
  168. if callback != nil {
  169. callback.OnFailure(err)
  170. }
  171. return err
  172. }
  173. if callback != nil {
  174. callback.OnSuccess()
  175. }
  176. return nil
  177. }
  178. func (r *RabbitMqProducer) createTopicIfNoExist(tpi *queue.TopicPartitionInfo) {
  179. if _, ok := r.topics[tpi.String()]; !ok {
  180. _ = r.admin.CreateTopicIfNotExists(tpi.String())
  181. r.topics[tpi.String()] = tpi
  182. }
  183. }
  184. func (r *RabbitMqProducer) Stop() error {
  185. if r.admin != nil {
  186. return r.admin.Destroy()
  187. }
  188. return nil
  189. }
  190. type RabbitMqConsumer struct {
  191. admin *RabbitMessageQueueAdmin
  192. topics []string
  193. topic string
  194. partitions []*queue.TopicPartitionInfo
  195. subscribe bool
  196. mu sync.Mutex
  197. recvChan chan []byte
  198. }
  199. func (r *RabbitMqConsumer) GetTopic() string {
  200. return r.topic
  201. }
  202. func (r *RabbitMqConsumer) Subscribe() error {
  203. r.mu.Lock()
  204. defer r.mu.Unlock()
  205. r.partitions = append(r.partitions, &queue.TopicPartitionInfo{
  206. Topic: r.topic,
  207. TenantId: "*",
  208. Partition: 0,
  209. MyPartition: true,
  210. })
  211. r.subscribe = false
  212. return nil
  213. }
  214. func (r *RabbitMqConsumer) SubscribeWithPartitions(partitions []*queue.TopicPartitionInfo) error {
  215. r.mu.Lock()
  216. defer r.mu.Unlock()
  217. r.partitions = partitions
  218. r.subscribe = false
  219. return nil
  220. }
  221. func (r *RabbitMqConsumer) UnSubscribe() {
  222. _ = r.admin.Destroy()
  223. }
  224. func (r *RabbitMqConsumer) Pop(duration time.Duration) (<-chan queue.QueueMessage, error) {
  225. result := make(chan queue.QueueMessage, 10)
  226. if !r.subscribe && len(r.partitions) == 0 {
  227. time.Sleep(duration)
  228. } else {
  229. r.mu.Lock()
  230. defer r.mu.Unlock()
  231. if !r.subscribe {
  232. for _, p := range r.partitions {
  233. r.topics = append(r.topics, p.String())
  234. }
  235. r.doSubscribe(r.topics)
  236. r.subscribe = true
  237. }
  238. go r.doPop(duration)
  239. go func() {
  240. for {
  241. select {
  242. case msg := <-r.recvChan:
  243. m := &queue.GobQueueMessage{}
  244. err := m.UnMarshal(msg)
  245. if err != nil {
  246. server.Log.Error(err)
  247. continue
  248. }
  249. result <- m
  250. }
  251. }
  252. }()
  253. }
  254. return result, nil
  255. }
  256. func (r *RabbitMqConsumer) doSubscribe(topics []string) {
  257. for _, item := range topics {
  258. _ = r.admin.CreateTopicIfNotExists(item)
  259. }
  260. }
  261. func (r *RabbitMqConsumer) doPop(duration time.Duration) error {
  262. if r.admin.ch == nil || r.admin.conn == nil {
  263. return errors.New("ch and conn is not init")
  264. }
  265. for _, topic := range r.topics {
  266. go func(tpc string) {
  267. msgs, err := r.admin.ch.Consume(
  268. tpc,
  269. "", // consumer
  270. true, // auto-ack
  271. false, // exclusive
  272. false, // no-local
  273. false, // no-wait
  274. nil, // args
  275. )
  276. if err != nil {
  277. server.Log.Error(err)
  278. return
  279. }
  280. for d := range msgs {
  281. r.recvChan <- d.Body
  282. // d.Ack(true)
  283. }
  284. }(topic)
  285. }
  286. return nil
  287. }
  288. func (r *RabbitMqConsumer) Commit() error {
  289. r.mu.Lock()
  290. defer r.mu.Unlock()
  291. return r.admin.ch.Ack(0, true)
  292. }
  293. func NewRabbitConsumer(admin *RabbitMessageQueueAdmin, topic string) *RabbitMqConsumer {
  294. return &RabbitMqConsumer{
  295. admin: admin,
  296. topics: make([]string, 0),
  297. topic: topic,
  298. partitions: make([]*queue.TopicPartitionInfo, 0),
  299. recvChan: make(chan []byte, 10),
  300. }
  301. }