package msgQueue import ( "errors" "github.com/streadway/amqp" "sparrow/pkg/protocol" "sparrow/pkg/queue" "sparrow/pkg/server" "sync" "time" ) // RabbitMessageQueueAdmin rabbit mq 管理器 type RabbitMessageQueueAdmin struct { conn *amqp.Connection ch *amqp.Channel arguments map[string]interface{} } func (r *RabbitMessageQueueAdmin) GetChannel() *amqp.Channel { return r.ch } func (r *RabbitMessageQueueAdmin) GetConn() *amqp.Connection { return r.conn } // RabbitMqSettings 配置 type RabbitMqSettings struct { Host string } func NewRabbitMessageQueueAdmin(settings *RabbitMqSettings, args map[string]interface{}) *RabbitMessageQueueAdmin { conn, err := amqp.Dial(settings.Host) if err != nil { panic(err) } ch, err := conn.Channel() if err != nil { panic(err) } return &RabbitMessageQueueAdmin{ conn: conn, ch: ch, arguments: args, } } func (r *RabbitMessageQueueAdmin) CreateTopicIfNotExists(topic string) error { _, err := r.ch.QueueDeclare(topic, true, // durable false, // delete when unused false, // exclusive false, // no-wait nil) return err } func (r *RabbitMessageQueueAdmin) Destroy() error { if r.ch != nil { if err := r.ch.Close(); err != nil { return err } } if r.conn != nil { return r.conn.Close() } return nil } // RabbitMqProducer rabbit mq message producer type RabbitMqProducer struct { defaultTopic string admin *RabbitMessageQueueAdmin settings *RabbitMqSettings channel *amqp.Channel conn *amqp.Connection topics map[string]*queue.TopicPartitionInfo } func NewRabbitMqProducer(admin *RabbitMessageQueueAdmin, defaultTopic string) *RabbitMqProducer { result := new(RabbitMqProducer) result.admin = admin result.defaultTopic = defaultTopic result.conn = admin.conn result.channel = admin.ch result.topics = make(map[string]*queue.TopicPartitionInfo) return result } func (r *RabbitMqProducer) Init() error { return nil } func (r *RabbitMqProducer) GetDefaultTopic() string { return r.defaultTopic } func (r *RabbitMqProducer) Send(info *queue.TopicPartitionInfo, payload protocol.Payload, callback queue.Callback) error { r.createTopicIfNoExist(info) if r.channel == nil { return errors.New("rabbit mq channel is not initialized") } bytes, err := payload.Marshal() if err != nil { server.Log.Errorf("queue message marshal error:%s", err.Error()) return err } server.Log.Debugf("publish message to %s", info.String()) err = r.channel.Publish("", info.String(), false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, Body: bytes, }) if err != nil { if callback != nil { callback.OnFailure(err) } server.Log.Errorf("rabbit mq message publish error:%s", err.Error()) return err } if callback != nil { callback.OnSuccess() } return nil } func (r *RabbitMqProducer) createTopicIfNoExist(tpi *queue.TopicPartitionInfo) { if _, ok := r.topics[tpi.String()]; !ok { _ = r.admin.CreateTopicIfNotExists(tpi.String()) r.topics[tpi.String()] = tpi } } func (r *RabbitMqProducer) Stop() error { if r.admin != nil { return r.admin.Destroy() } return nil } type RabbitMqConsumer struct { admin *RabbitMessageQueueAdmin topics []string topic string partitions []*queue.TopicPartitionInfo subscribe bool mu sync.Mutex recvChan chan []byte } func (r *RabbitMqConsumer) GetTopic() string { return r.topic } func (r *RabbitMqConsumer) Subscribe() error { r.mu.Lock() defer r.mu.Unlock() r.partitions = append(r.partitions, &queue.TopicPartitionInfo{ Topic: r.topic, TenantId: "1ps9djpswi0cds7cofynkso300eql4iu", Partition: 0, MyPartition: true, }) r.subscribe = false return nil } func (r *RabbitMqConsumer) SubscribeWithPartitions(partitions []*queue.TopicPartitionInfo) error { r.mu.Lock() defer r.mu.Unlock() r.partitions = partitions r.subscribe = false return nil } func (r *RabbitMqConsumer) UnSubscribe() { _ = r.admin.Destroy() } func (r *RabbitMqConsumer) Pop(duration time.Duration) (<-chan queue.QueueMessage, error) { result := make(chan queue.QueueMessage, 10) if !r.subscribe && len(r.partitions) == 0 { time.Sleep(duration) } else { r.mu.Lock() defer r.mu.Unlock() if !r.subscribe { for _, p := range r.partitions { r.topics = append(r.topics, p.String()) } r.doSubscribe(r.topics) r.subscribe = true } go r.doPop(duration) go func() { for { select { case msg := <-r.recvChan: m := &queue.GobQueueMessage{} err := m.UnMarshal(msg) if err != nil { server.Log.Error(err) continue } result <- m } } }() } return result, nil } func (r *RabbitMqConsumer) doSubscribe(topics []string) { for _, item := range topics { _ = r.admin.CreateTopicIfNotExists(item) } } func (r *RabbitMqConsumer) doPop(duration time.Duration) error { if r.admin.ch == nil || r.admin.conn == nil { return errors.New("ch and conn is not init") } for _, topic := range r.topics { go func() { msgs, err := r.admin.ch.Consume( topic, "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { server.Log.Error(err) return } for d := range msgs { r.recvChan <- d.Body d.Ack(true) } }() } return nil } func (r *RabbitMqConsumer) Commit() error { r.mu.Lock() defer r.mu.Unlock() return r.admin.ch.Ack(0, true) } func NewRabbitConsumer(admin *RabbitMessageQueueAdmin, topic string) *RabbitMqConsumer { return &RabbitMqConsumer{ admin: admin, topics: make([]string, 0), topic: topic, partitions: make([]*queue.TopicPartitionInfo, 0), recvChan: make(chan []byte, 10), } }