123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- package msgQueue
- import (
- "errors"
- "github.com/streadway/amqp"
- "sparrow/pkg/protocol"
- "sparrow/pkg/queue"
- "sparrow/pkg/server"
- "sync"
- "time"
- )
- // RabbitMessageQueueAdmin rabbit mq 管理器
- type RabbitMessageQueueAdmin struct {
- conn *amqp.Connection
- ch *amqp.Channel
- arguments map[string]interface{}
- }
- func (r *RabbitMessageQueueAdmin) GetChannel() *amqp.Channel {
- return r.ch
- }
- func (r *RabbitMessageQueueAdmin) GetConn() *amqp.Connection {
- return r.conn
- }
- // RabbitMqSettings 配置
- type RabbitMqSettings struct {
- Host string
- }
- func NewRabbitMessageQueueAdmin(settings *RabbitMqSettings, args map[string]interface{}) *RabbitMessageQueueAdmin {
- conn, err := amqp.Dial(settings.Host)
- if err != nil {
- panic(err)
- }
- ch, err := conn.Channel()
- if err != nil {
- panic(err)
- }
- return &RabbitMessageQueueAdmin{
- conn: conn,
- ch: ch,
- arguments: args,
- }
- }
- func (r *RabbitMessageQueueAdmin) CreateTopicIfNotExists(topic string) error {
- _, err := r.ch.QueueDeclare(topic,
- true, // durable
- false, // delete when unused
- false, // exclusive
- false, // no-wait
- nil)
- return err
- }
- func (r *RabbitMessageQueueAdmin) Destroy() error {
- if r.ch != nil {
- if err := r.ch.Close(); err != nil {
- return err
- }
- }
- if r.conn != nil {
- return r.conn.Close()
- }
- return nil
- }
- // RabbitMqProducer rabbit mq message producer
- type RabbitMqProducer struct {
- defaultTopic string
- admin *RabbitMessageQueueAdmin
- settings *RabbitMqSettings
- channel *amqp.Channel
- conn *amqp.Connection
- topics map[string]*queue.TopicPartitionInfo
- }
- func NewRabbitMqProducer(admin *RabbitMessageQueueAdmin, defaultTopic string) *RabbitMqProducer {
- result := new(RabbitMqProducer)
- result.admin = admin
- result.defaultTopic = defaultTopic
- result.conn = admin.conn
- result.channel = admin.ch
- result.topics = make(map[string]*queue.TopicPartitionInfo)
- return result
- }
- func (r *RabbitMqProducer) Init() error {
- return nil
- }
- func (r *RabbitMqProducer) GetDefaultTopic() string {
- return r.defaultTopic
- }
- func (r *RabbitMqProducer) Send(info *queue.TopicPartitionInfo, payload protocol.Payload, callback queue.Callback) error {
- r.createTopicIfNoExist(info)
- if r.channel == nil {
- return errors.New("rabbit mq channel is not initialized")
- }
- bytes, err := payload.Marshal()
- if err != nil {
- server.Log.Errorf("queue message marshal error:%s", err.Error())
- return err
- }
- err = r.channel.Publish("", info.String(), false, false,
- amqp.Publishing{
- DeliveryMode: amqp.Persistent,
- Body: bytes,
- })
- if err != nil {
- if callback != nil {
- callback.OnFailure(err)
- }
- server.Log.Errorf("rabbit mq message publish error:%s", err.Error())
- return err
- }
- if callback != nil {
- callback.OnSuccess()
- }
- return nil
- }
- func (r *RabbitMqProducer) createTopicIfNoExist(tpi *queue.TopicPartitionInfo) {
- if _, ok := r.topics[tpi.String()]; !ok {
- _ = r.admin.CreateTopicIfNotExists(tpi.String())
- r.topics[tpi.String()] = tpi
- }
- }
- func (r *RabbitMqProducer) Stop() error {
- if r.admin != nil {
- return r.admin.Destroy()
- }
- return nil
- }
- type RabbitMqConsumer struct {
- admin *RabbitMessageQueueAdmin
- topics []string
- topic string
- partitions []*queue.TopicPartitionInfo
- subscribe bool
- mu sync.Mutex
- recvChan chan []byte
- }
- func (r *RabbitMqConsumer) GetTopic() string {
- return r.topic
- }
- func (r *RabbitMqConsumer) Subscribe() error {
- r.mu.Lock()
- defer r.mu.Unlock()
- r.partitions = append(r.partitions, &queue.TopicPartitionInfo{
- Topic: r.topic,
- TenantId: "",
- Partition: 0,
- MyPartition: true,
- })
- r.subscribe = false
- return nil
- }
- func (r *RabbitMqConsumer) SubscribeWithPartitions(partitions []*queue.TopicPartitionInfo) error {
- r.mu.Lock()
- defer r.mu.Unlock()
- r.partitions = partitions
- r.subscribe = false
- return nil
- }
- func (r *RabbitMqConsumer) UnSubscribe() {
- _ = r.admin.Destroy()
- }
- func (r *RabbitMqConsumer) Pop(duration time.Duration) (<-chan queue.QueueMessage, error) {
- result := make(chan queue.QueueMessage, 10)
- if !r.subscribe && len(r.partitions) == 0 {
- time.Sleep(duration)
- } else {
- r.mu.Lock()
- defer r.mu.Unlock()
- if !r.subscribe {
- for _, p := range r.partitions {
- r.topics = append(r.topics, p.String())
- }
- r.doSubscribe(r.topics)
- r.subscribe = true
- }
- go r.doPop(duration)
- go func() {
- for {
- select {
- case msg := <-r.recvChan:
- m := &queue.GobQueueMessage{}
- err := m.UnMarshal(msg)
- if err != nil {
- server.Log.Error(err)
- continue
- }
- result <- m
- }
- }
- }()
- }
- return result, nil
- }
- func (r *RabbitMqConsumer) doSubscribe(topics []string) {
- for _, item := range topics {
- _ = r.admin.CreateTopicIfNotExists(item)
- }
- }
- func (r *RabbitMqConsumer) doPop(duration time.Duration) error {
- if r.admin.ch == nil || r.admin.conn == nil {
- return errors.New("ch and conn is not init")
- }
- for _, topic := range r.topics {
- go func() {
- msgs, err := r.admin.ch.Consume(
- topic,
- "", // consumer
- false, // auto-ack
- false, // exclusive
- false, // no-local
- false, // no-wait
- nil, // args
- )
- if err != nil {
- server.Log.Error(err)
- return
- }
- for d := range msgs {
- r.recvChan <- d.Body
- d.Ack(true)
- }
- }()
- }
- return nil
- }
- func (r *RabbitMqConsumer) Commit() error {
- r.mu.Lock()
- defer r.mu.Unlock()
- return r.admin.ch.Ack(0, true)
- }
- func NewRabbitConsumer(admin *RabbitMessageQueueAdmin, topic string) *RabbitMqConsumer {
- return &RabbitMqConsumer{
- admin: admin,
- topics: make([]string, 0),
- topic: topic,
- partitions: make([]*queue.TopicPartitionInfo, 0),
- recvChan: make(chan []byte, 10),
- }
- }
|