123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- // package queue implement a message queque api with rabbitmq
- package queue
- import (
- "errors"
- "sparrow/pkg/serializer"
- "github.com/streadway/amqp"
- )
- const defaultRecvChanLen = 8
- type Queue struct {
- rabbithost string
- conn *amqp.Connection
- ch *amqp.Channel
- queue amqp.Queue
- recvChan chan ([]byte)
- beginReceive bool
- }
- func New(rabbithost string, name string) (*Queue, error) {
- conn, err := amqp.Dial(rabbithost)
- if err != nil {
- return nil, err
- }
- ch, err := conn.Channel()
- if err != nil {
- return nil, err
- }
- queue, err := ch.QueueDeclare(
- name, // name
- true, // durable
- false, // delete when unused
- false, // exclusive
- false, // no-wait
- nil, // arguments
- )
- if err != nil {
- return nil, errors.New("Failed to declare a queue")
- }
- err = ch.Qos(
- 1, // prefetch count
- 0, // prefetch size
- false, // global
- )
- if err != nil {
- return nil, errors.New("Failed to set QoS")
- }
- q := &Queue{rabbithost, conn, ch, queue, nil, false}
- return q, nil
- }
- func (q *Queue) keepReceivingFromQueue() {
- if q.ch == nil || q.recvChan == nil {
- //Message Queue Not Initialzed.
- return
- }
- defer func() {
- if q.recvChan != nil {
- close(q.recvChan)
- }
- }()
- msgs, err := q.ch.Consume(
- q.queue.Name, // queue
- "", // consumer
- false, // auto-ack
- false, // exclusive
- false, // no-local
- false, // no-wait
- nil, // args
- )
- if err != nil {
- return
- }
- for d := range msgs {
- q.recvChan <- d.Body
- d.Ack(false)
- }
- }
- // Send will send a message to the queue.
- func (q *Queue) Send(msg interface{}) error {
- if q.ch == nil {
- return errors.New("Message Queue Not Initialzed.")
- }
- msgStr, err := serializer.Struct2String(msg)
- if err != nil {
- return err
- }
- err = q.ch.Publish(
- "", // exchange
- q.queue.Name, // routing key
- false, // mandatory
- false,
- amqp.Publishing{
- DeliveryMode: amqp.Persistent,
- ContentType: "text/plain",
- Body: []byte(msgStr),
- })
- return nil
- }
- // Receive will reveive a message from the queue. may be blocked if there is no message in queue.
- func (q *Queue) Receive(target interface{}) error {
- if !q.beginReceive {
- q.recvChan = make(chan ([]byte), defaultRecvChanLen)
- go q.keepReceivingFromQueue()
- q.beginReceive = true
- }
- if q.recvChan == nil {
- return errors.New("Message Queue Has Not Been Initialized.")
- }
- msg, ok := <-q.recvChan
- if !ok {
- return errors.New("Message Queue Has Been Closed.")
- }
- strMsg := string(msg)
- err := serializer.String2Struct(strMsg, target)
- if err != nil {
- return err
- }
- return nil
- }
|