123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- package msgQueue
- import (
- "errors"
- "fmt"
- "github.com/streadway/amqp"
- "sparrow/pkg/protocol"
- "sparrow/pkg/queue"
- "sparrow/pkg/server"
- "sync"
- "time"
- )
- // RabbitMessageQueueAdmin rabbit mq 管理器
- type RabbitMessageQueueAdmin struct {
- conn *amqp.Connection
- ch *amqp.Channel
- isReady bool
- done chan bool
- arguments map[string]interface{}
- notifyCloseChan chan *amqp.Error
- notifyChanClose chan *amqp.Error
- setting *RabbitMqSettings
- }
- func (r *RabbitMessageQueueAdmin) GetChannel() *amqp.Channel {
- return r.ch
- }
- func (r *RabbitMessageQueueAdmin) GetConn() *amqp.Connection {
- return r.conn
- }
- // RabbitMqSettings 配置
- type RabbitMqSettings struct {
- Host string
- }
- func NewRabbitMessageQueueAdmin(settings *RabbitMqSettings, args map[string]interface{}) *RabbitMessageQueueAdmin {
- rdm := &RabbitMessageQueueAdmin{
- done: make(chan bool),
- arguments: args,
- }
- rdm.setting = settings
- go rdm.handleReconnect()
- return rdm
- }
- func (r *RabbitMessageQueueAdmin) handleReconnect() {
- for {
- r.isReady = false
- conn, err := r.connect(r.setting.Host)
- fmt.Println("handleReconnect")
- if err != nil {
- server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
- select {
- case <-r.done:
- return
- case <-time.After(4 * time.Second):
- }
- continue
- }
- if done := r.handleReInit(conn); done {
- break
- }
- }
- }
- func (r *RabbitMessageQueueAdmin) handleReInit(conn *amqp.Connection) bool {
- for {
- r.isReady = false
- err := r.init(conn)
- fmt.Println("handleReInit")
- if err != nil {
- select {
- case <-r.done:
- return true
- case <-time.After(time.Second * 3):
- }
- continue
- }
- fmt.Println("handleReInit1")
- select {
- case <-r.done:
- return true
- case err :=<-r.notifyCloseChan:
- fmt.Println("Connection closed. Reconnecting..."+ err.Error())
- return false
- case err :=<-r.notifyChanClose:
- fmt.Println("Channel closed. Re-running init..." + err.Error())
- }
- }
- }
- func (r *RabbitMessageQueueAdmin) init(conn *amqp.Connection) error {
- fmt.Println("init")
- ch, err := conn.Channel()
- if err != nil {
- return err
- }
- //err = ch.Confirm(false)
- //if err != nil {
- // return err
- //}
- r.ch = ch
- r.notifyChanClose = make(chan *amqp.Error)
- r.ch.NotifyClose(r.notifyChanClose)
- r.isReady = true
- return nil
- }
- func (r *RabbitMessageQueueAdmin) connect(addr string) (*amqp.Connection, error) {
- conn, err := amqp.Dial(addr)
- if err != nil {
- return nil, err
- }
- r.conn = conn
- r.notifyCloseChan = make(chan *amqp.Error)
- r.conn.NotifyClose(r.notifyCloseChan)
- return conn, err
- }
- func (r *RabbitMessageQueueAdmin) CreateTopicIfNotExists(topic string) error {
- _, err := r.ch.QueueDeclare(topic,
- true, // durable
- false, // delete when unused
- false, // exclusive
- false, // no-wait
- nil)
- return err
- }
- func (r *RabbitMessageQueueAdmin) Destroy() error {
- if r.ch != nil {
- if err := r.ch.Close(); err != nil {
- return err
- }
- }
- if r.conn != nil {
- return r.conn.Close()
- }
- return nil
- }
- // RabbitMqProducer rabbit mq message producer
- type RabbitMqProducer struct {
- defaultTopic string
- admin *RabbitMessageQueueAdmin
- settings *RabbitMqSettings
- channel *amqp.Channel
- conn *amqp.Connection
- topics map[string]*queue.TopicPartitionInfo
- }
- func NewRabbitMqProducer(admin *RabbitMessageQueueAdmin, defaultTopic string) *RabbitMqProducer {
- result := new(RabbitMqProducer)
- result.admin = admin
- result.defaultTopic = defaultTopic
- result.topics = make(map[string]*queue.TopicPartitionInfo)
- return result
- }
- func (r *RabbitMqProducer) Init() error {
- return nil
- }
- func (r *RabbitMqProducer) GetDefaultTopic() string {
- return r.defaultTopic
- }
- func (r *RabbitMqProducer) Send(info *queue.TopicPartitionInfo, payload protocol.Payload, callback queue.Callback) error {
- r.createTopicIfNoExist(info)
- bytes, err := payload.Marshal()
- if err != nil {
- server.Log.Errorf("queue message marshal error:%s", err.Error())
- return err
- }
- server.Log.Debugf("publish message to %s", info.String())
- err = r.admin.ch.Publish("", info.String(), false, false,
- amqp.Publishing{
- DeliveryMode: amqp.Persistent,
- Body: bytes,
- })
- if err != nil {
- if callback != nil {
- callback.OnFailure(err)
- }
- return err
- }
- if callback != nil {
- callback.OnSuccess()
- }
- return nil
- }
- func (r *RabbitMqProducer) createTopicIfNoExist(tpi *queue.TopicPartitionInfo) {
- if _, ok := r.topics[tpi.String()]; !ok {
- _ = r.admin.CreateTopicIfNotExists(tpi.String())
- r.topics[tpi.String()] = tpi
- }
- }
- func (r *RabbitMqProducer) Stop() error {
- if r.admin != nil {
- return r.admin.Destroy()
- }
- return nil
- }
- type RabbitMqConsumer struct {
- admin *RabbitMessageQueueAdmin
- topics []string
- topic string
- partitions []*queue.TopicPartitionInfo
- subscribe bool
- mu sync.Mutex
- recvChan chan []byte
- }
- func (r *RabbitMqConsumer) GetTopic() string {
- return r.topic
- }
- func (r *RabbitMqConsumer) Subscribe() error {
- r.mu.Lock()
- defer r.mu.Unlock()
- r.partitions = append(r.partitions, &queue.TopicPartitionInfo{
- Topic: r.topic,
- TenantId: "*",
- Partition: 0,
- MyPartition: true,
- })
- r.subscribe = false
- return nil
- }
- func (r *RabbitMqConsumer) SubscribeWithPartitions(partitions []*queue.TopicPartitionInfo) error {
- r.mu.Lock()
- defer r.mu.Unlock()
- r.partitions = partitions
- r.subscribe = false
- return nil
- }
- func (r *RabbitMqConsumer) UnSubscribe() {
- _ = r.admin.Destroy()
- }
- func (r *RabbitMqConsumer) Pop(duration time.Duration) (<-chan queue.QueueMessage, error) {
- result := make(chan queue.QueueMessage, 10)
- if !r.subscribe && len(r.partitions) == 0 {
- time.Sleep(duration)
- } else {
- r.mu.Lock()
- defer r.mu.Unlock()
- if !r.subscribe {
- for _, p := range r.partitions {
- r.topics = append(r.topics, p.String())
- }
- r.doSubscribe(r.topics)
- r.subscribe = true
- }
- go r.doPop(duration)
- go func() {
- for {
- select {
- case msg := <-r.recvChan:
- m := &queue.GobQueueMessage{}
- err := m.UnMarshal(msg)
- if err != nil {
- server.Log.Error(err)
- continue
- }
- result <- m
- }
- }
- }()
- }
- return result, nil
- }
- func (r *RabbitMqConsumer) doSubscribe(topics []string) {
- for _, item := range topics {
- _ = r.admin.CreateTopicIfNotExists(item)
- }
- }
- func (r *RabbitMqConsumer) doPop(duration time.Duration) error {
- if r.admin.ch == nil || r.admin.conn == nil {
- return errors.New("ch and conn is not init")
- }
- for _, topic := range r.topics {
- go func(tpc string) {
- msgs, err := r.admin.ch.Consume(
- tpc,
- "", // consumer
- true, // auto-ack
- false, // exclusive
- false, // no-local
- false, // no-wait
- nil, // args
- )
- if err != nil {
- server.Log.Error(err)
- return
- }
- for d := range msgs {
- r.recvChan <- d.Body
- // d.Ack(true)
- }
- }(topic)
- }
- return nil
- }
- func (r *RabbitMqConsumer) Commit() error {
- r.mu.Lock()
- defer r.mu.Unlock()
- return r.admin.ch.Ack(0, true)
- }
- func NewRabbitConsumer(admin *RabbitMessageQueueAdmin, topic string) *RabbitMqConsumer {
- return &RabbitMqConsumer{
- admin: admin,
- topics: make([]string, 0),
- topic: topic,
- partitions: make([]*queue.TopicPartitionInfo, 0),
- recvChan: make(chan []byte, 10),
- }
- }
|