queue.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. // package queue implement a message queque api with rabbitmq
  2. package queue
  3. import (
  4. "errors"
  5. "sparrow/pkg/serializer"
  6. "github.com/streadway/amqp"
  7. )
  8. const defaultRecvChanLen = 8
  9. type Queue struct {
  10. rabbithost string
  11. conn *amqp.Connection
  12. ch *amqp.Channel
  13. queue amqp.Queue
  14. recvChan chan ([]byte)
  15. beginReceive bool
  16. }
  17. func New(rabbithost string, name string) (*Queue, error) {
  18. conn, err := amqp.Dial(rabbithost)
  19. if err != nil {
  20. return nil, err
  21. }
  22. ch, err := conn.Channel()
  23. if err != nil {
  24. return nil, err
  25. }
  26. queue, err := ch.QueueDeclare(
  27. name, // name
  28. true, // durable
  29. false, // delete when unused
  30. false, // exclusive
  31. false, // no-wait
  32. nil, // arguments
  33. )
  34. if err != nil {
  35. return nil, errors.New("Failed to declare a queue")
  36. }
  37. err = ch.Qos(
  38. 1, // prefetch count
  39. 0, // prefetch size
  40. false, // global
  41. )
  42. if err != nil {
  43. return nil, errors.New("Failed to set QoS")
  44. }
  45. q := &Queue{rabbithost, conn, ch, queue, nil, false}
  46. return q, nil
  47. }
  48. func (q *Queue) keepReceivingFromQueue() {
  49. if q.ch == nil || q.recvChan == nil {
  50. //Message Queue Not Initialzed.
  51. return
  52. }
  53. defer func() {
  54. if q.recvChan != nil {
  55. close(q.recvChan)
  56. }
  57. }()
  58. msgs, err := q.ch.Consume(
  59. q.queue.Name, // queue
  60. "", // consumer
  61. false, // auto-ack
  62. false, // exclusive
  63. false, // no-local
  64. false, // no-wait
  65. nil, // args
  66. )
  67. if err != nil {
  68. return
  69. }
  70. for d := range msgs {
  71. q.recvChan <- d.Body
  72. d.Ack(false)
  73. }
  74. }
  75. // Send will send a message to the queue.
  76. func (q *Queue) Send(msg interface{}) error {
  77. if q.ch == nil {
  78. return errors.New("Message Queue Not Initialzed.")
  79. }
  80. msgStr, err := serializer.Struct2String(msg)
  81. if err != nil {
  82. return err
  83. }
  84. err = q.ch.Publish(
  85. "", // exchange
  86. q.queue.Name, // routing key
  87. false, // mandatory
  88. false,
  89. amqp.Publishing{
  90. DeliveryMode: amqp.Persistent,
  91. ContentType: "text/plain",
  92. Body: []byte(msgStr),
  93. })
  94. return nil
  95. }
  96. // Receive will reveive a message from the queue. may be blocked if there is no message in queue.
  97. func (q *Queue) Receive(target interface{}) error {
  98. if !q.beginReceive {
  99. q.recvChan = make(chan ([]byte), defaultRecvChanLen)
  100. go q.keepReceivingFromQueue()
  101. q.beginReceive = true
  102. }
  103. if q.recvChan == nil {
  104. return errors.New("Message Queue Has Not Been Initialized.")
  105. }
  106. msg, ok := <-q.recvChan
  107. if !ok {
  108. return errors.New("Message Queue Has Been Closed.")
  109. }
  110. strMsg := string(msg)
  111. err := serializer.String2Struct(strMsg, target)
  112. if err != nil {
  113. return err
  114. }
  115. return nil
  116. }