package rule import ( "encoding/json" "github.com/streadway/amqp" "sparrow/pkg/server" ) type ExternalConsumer interface { AddMessageHandle(msg *TaskLifecycleMessage) error RemoveMessageHandle(msg *TaskLifecycleMessage) error UpdateMessageHandle(msg *TaskLifecycleMessage) error SnapMessageHandle(msg *TaskLifecycleMessage) error StartMessageHandle(msg *TaskLifecycleMessage) error StopMessageHandle(msg *TaskLifecycleMessage) error } const TaskLifecycleExchange = "task_lifecycle_exchange" const TaskExchange = "task_exchange" type TaskLifecycleConsumer struct { ch *amqp.Channel ec ExternalConsumer stopChan chan struct{} messageChan chan []byte } func NewTaskLifecycleConsumer(ch *amqp.Channel) *TaskLifecycleConsumer { return &TaskLifecycleConsumer{ ch: ch, stopChan: make(chan struct{}), messageChan: make(chan []byte, 10), } } func (a *TaskLifecycleConsumer) SetExternalConsumer(ec ExternalConsumer) { a.ec = ec } func (a *TaskLifecycleConsumer) Stop() { close(a.stopChan) } func (a *TaskLifecycleConsumer) Start() error { go func() { for { select { case <-a.stopChan: return case msg := <-a.messageChan: a.handleMessage(msg) } } }() return nil } func (a *TaskLifecycleConsumer) handleMessage(msg []byte) { if a.ec == nil { server.Log.Errorf("TaskLifecycleConsumer is unset") return } var tm TaskLifecycleMessage err := json.Unmarshal(msg, &tm) if err != nil { server.Log.Errorf("handle lifecycle message error :%v", err) return } switch tm.Action { case "add": err = a.ec.AddMessageHandle(&tm) case "remove": err = a.ec.RemoveMessageHandle(&tm) case "update": err = a.ec.UpdateMessageHandle(&tm) case "snap": err = a.ec.SnapMessageHandle(&tm) case "start": err = a.ec.StartMessageHandle(&tm) case "stop": err = a.ec.StopMessageHandle(&tm) } } func (a *TaskLifecycleConsumer) Init() error { err := a.ch.ExchangeDeclare( TaskLifecycleExchange, // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) if err != nil { return err } //绑定queue到交换机 q, err := a.ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) if err != nil { return err } err = a.ch.QueueBind( q.Name, // queue name "", // routing key TaskLifecycleExchange, false, nil, ) if err != nil { return err } // creat consumer msg, err := a.ch.Consume(q.Name, "", true, false, false, false, nil) if err != nil { return err } go func() { for { select { case <-a.stopChan: return case d := <-msg: a.messageChan <- d.Body } } }() return nil }