|
@@ -2,6 +2,7 @@ package msgQueue
|
|
|
|
|
|
import (
|
|
|
"errors"
|
|
|
+ "fmt"
|
|
|
"github.com/streadway/amqp"
|
|
|
"sparrow/pkg/protocol"
|
|
|
"sparrow/pkg/queue"
|
|
@@ -12,9 +13,14 @@ import (
|
|
|
|
|
|
// RabbitMessageQueueAdmin rabbit mq 管理器
|
|
|
type RabbitMessageQueueAdmin struct {
|
|
|
- conn *amqp.Connection
|
|
|
- ch *amqp.Channel
|
|
|
- arguments map[string]interface{}
|
|
|
+ conn *amqp.Connection
|
|
|
+ ch *amqp.Channel
|
|
|
+ isReady bool
|
|
|
+ done chan bool
|
|
|
+ arguments map[string]interface{}
|
|
|
+ notifyCloseChan chan *amqp.Error
|
|
|
+ notifyChanClose chan *amqp.Error
|
|
|
+ setting *RabbitMqSettings
|
|
|
}
|
|
|
|
|
|
func (r *RabbitMessageQueueAdmin) GetChannel() *amqp.Channel {
|
|
@@ -31,20 +37,88 @@ type RabbitMqSettings struct {
|
|
|
}
|
|
|
|
|
|
func NewRabbitMessageQueueAdmin(settings *RabbitMqSettings, args map[string]interface{}) *RabbitMessageQueueAdmin {
|
|
|
- conn, err := amqp.Dial(settings.Host)
|
|
|
- if err != nil {
|
|
|
- panic(err)
|
|
|
+ rdm := &RabbitMessageQueueAdmin{
|
|
|
+ done: make(chan bool),
|
|
|
+ arguments: args,
|
|
|
+ }
|
|
|
+ rdm.setting = settings
|
|
|
+ go rdm.handleReconnect()
|
|
|
+ return rdm
|
|
|
+}
|
|
|
+
|
|
|
+func (r *RabbitMessageQueueAdmin) handleReconnect() {
|
|
|
+ for {
|
|
|
+ r.isReady = false
|
|
|
+ conn, err := r.connect(r.setting.Host)
|
|
|
+ fmt.Println("handleReconnect")
|
|
|
+ if err != nil {
|
|
|
+ server.Log.Errorf("connect to rabbitmq error:%s", err.Error())
|
|
|
+ select {
|
|
|
+ case <-r.done:
|
|
|
+ return
|
|
|
+ case <-time.After(4 * time.Second):
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if done := r.handleReInit(conn); done {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (r *RabbitMessageQueueAdmin) handleReInit(conn *amqp.Connection) bool {
|
|
|
+ for {
|
|
|
+ r.isReady = false
|
|
|
+ err := r.init(conn)
|
|
|
+ fmt.Println("handleReInit")
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ select {
|
|
|
+ case <-r.done:
|
|
|
+ return true
|
|
|
+ case <-time.After(time.Second * 3):
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ fmt.Println("handleReInit1")
|
|
|
+ select {
|
|
|
+ case <-r.done:
|
|
|
+ return true
|
|
|
+ case <-r.notifyCloseChan:
|
|
|
+ fmt.Println("Connection closed. Reconnecting...")
|
|
|
+ return false
|
|
|
+ case <-r.notifyChanClose:
|
|
|
+ fmt.Println("Channel closed. Re-running init...")
|
|
|
+ }
|
|
|
}
|
|
|
+}
|
|
|
|
|
|
+func (r *RabbitMessageQueueAdmin) init(conn *amqp.Connection) error {
|
|
|
+ fmt.Println("init")
|
|
|
ch, err := conn.Channel()
|
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
+ return err
|
|
|
}
|
|
|
- return &RabbitMessageQueueAdmin{
|
|
|
- conn: conn,
|
|
|
- ch: ch,
|
|
|
- arguments: args,
|
|
|
+ //err = ch.Confirm(false)
|
|
|
+ //if err != nil {
|
|
|
+ // return err
|
|
|
+ //}
|
|
|
+ r.ch = ch
|
|
|
+ r.notifyChanClose = make(chan *amqp.Error)
|
|
|
+ r.ch.NotifyClose(r.notifyChanClose)
|
|
|
+ r.isReady = true
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (r *RabbitMessageQueueAdmin) connect(addr string) (*amqp.Connection, error) {
|
|
|
+ conn, err := amqp.Dial(addr)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
}
|
|
|
+ r.conn = conn
|
|
|
+ r.notifyCloseChan = make(chan *amqp.Error)
|
|
|
+ r.conn.NotifyClose(r.notifyCloseChan)
|
|
|
+ return conn, err
|
|
|
}
|
|
|
|
|
|
func (r *RabbitMessageQueueAdmin) CreateTopicIfNotExists(topic string) error {
|
|
@@ -83,8 +157,6 @@ func NewRabbitMqProducer(admin *RabbitMessageQueueAdmin, defaultTopic string) *R
|
|
|
result := new(RabbitMqProducer)
|
|
|
result.admin = admin
|
|
|
result.defaultTopic = defaultTopic
|
|
|
- result.conn = admin.conn
|
|
|
- result.channel = admin.ch
|
|
|
result.topics = make(map[string]*queue.TopicPartitionInfo)
|
|
|
return result
|
|
|
}
|
|
@@ -99,16 +171,13 @@ func (r *RabbitMqProducer) GetDefaultTopic() string {
|
|
|
|
|
|
func (r *RabbitMqProducer) Send(info *queue.TopicPartitionInfo, payload protocol.Payload, callback queue.Callback) error {
|
|
|
r.createTopicIfNoExist(info)
|
|
|
- if r.channel == nil {
|
|
|
- return errors.New("rabbit mq channel is not initialized")
|
|
|
- }
|
|
|
bytes, err := payload.Marshal()
|
|
|
if err != nil {
|
|
|
server.Log.Errorf("queue message marshal error:%s", err.Error())
|
|
|
return err
|
|
|
}
|
|
|
server.Log.Debugf("publish message to %s", info.String())
|
|
|
- err = r.channel.Publish("", info.String(), false, false,
|
|
|
+ err = r.admin.ch.Publish("", info.String(), false, false,
|
|
|
amqp.Publishing{
|
|
|
DeliveryMode: amqp.Persistent,
|
|
|
Body: bytes,
|
|
@@ -117,7 +186,6 @@ func (r *RabbitMqProducer) Send(info *queue.TopicPartitionInfo, payload protocol
|
|
|
if callback != nil {
|
|
|
callback.OnFailure(err)
|
|
|
}
|
|
|
- server.Log.Errorf("rabbit mq message publish error:%s", err.Error())
|
|
|
return err
|
|
|
}
|
|
|
if callback != nil {
|