123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- package rule
- import (
- "encoding/json"
- "github.com/streadway/amqp"
- "sparrow/pkg/server"
- )
- type ExternalConsumer interface {
- AddMessageHandle(msg *TaskLifecycleMessage) error
- RemoveMessageHandle(msg *TaskLifecycleMessage) error
- UpdateMessageHandle(msg *TaskLifecycleMessage) error
- SnapMessageHandle(msg *TaskLifecycleMessage) error
- StartMessageHandle(msg *TaskLifecycleMessage) error
- StopMessageHandle(msg *TaskLifecycleMessage) error
- }
- const TaskLifecycleExchange = "task_lifecycle_exchange"
- const TaskExchange = "task_exchange"
- type TaskLifecycleConsumer struct {
- ch *amqp.Channel
- ec ExternalConsumer
- stopChan chan struct{}
- messageChan chan []byte
- }
- func NewTaskLifecycleConsumer(ch *amqp.Channel) *TaskLifecycleConsumer {
- return &TaskLifecycleConsumer{
- ch: ch,
- stopChan: make(chan struct{}),
- messageChan: make(chan []byte, 10),
- }
- }
- func (a *TaskLifecycleConsumer) SetExternalConsumer(ec ExternalConsumer) {
- a.ec = ec
- }
- func (a *TaskLifecycleConsumer) Stop() {
- close(a.stopChan)
- }
- func (a *TaskLifecycleConsumer) Start() error {
- go func() {
- for {
- select {
- case <-a.stopChan:
- return
- case msg := <-a.messageChan:
- a.handleMessage(msg)
- }
- }
- }()
- return nil
- }
- func (a *TaskLifecycleConsumer) handleMessage(msg []byte) {
- if a.ec == nil {
- server.Log.Errorf("TaskLifecycleConsumer is unset")
- return
- }
- var tm TaskLifecycleMessage
- err := json.Unmarshal(msg, &tm)
- if err != nil {
- server.Log.Errorf("handle lifecycle message error :%v", err)
- return
- }
- switch tm.Action {
- case "add":
- err = a.ec.AddMessageHandle(&tm)
- case "remove":
- err = a.ec.RemoveMessageHandle(&tm)
- case "update":
- err = a.ec.UpdateMessageHandle(&tm)
- case "snap":
- err = a.ec.SnapMessageHandle(&tm)
- case "start":
- err = a.ec.StartMessageHandle(&tm)
- case "stop":
- err = a.ec.StopMessageHandle(&tm)
- }
- }
- func (a *TaskLifecycleConsumer) Init() error {
- err := a.ch.ExchangeDeclare(
- TaskLifecycleExchange, // name
- "fanout", // type
- true, // durable
- false, // auto-deleted
- false, // internal
- false, // no-wait
- nil, // arguments
- )
- if err != nil {
- return err
- }
- //绑定queue到交换机
- q, err := a.ch.QueueDeclare(
- "", // name
- false, // durable
- false, // delete when unused
- true, // exclusive
- false, // no-wait
- nil, // arguments
- )
- if err != nil {
- return err
- }
- err = a.ch.QueueBind(
- q.Name, // queue name
- "", // routing key
- TaskLifecycleExchange,
- false,
- nil,
- )
- if err != nil {
- return err
- }
- // creat consumer
- msg, err := a.ch.Consume(q.Name, "", true, false, false, false, nil)
- if err != nil {
- return err
- }
- go func() {
- for {
- select {
- case <-a.stopChan:
- return
- case d := <-msg:
- a.messageChan <- d.Body
- }
- }
- }()
- return nil
- }
|