package manager import ( "fmt" "github.com/streadway/amqp" "sparrow/pkg/rule" ) // TaskManager 任务发布管理器 // 发布任务消息到 type TaskManager struct { ch *amqp.Channel } func NewTaskManager(ch *amqp.Channel) Producer { return &TaskManager{ ch: ch, } } func (a *TaskManager) Init() error { // 定义exchange return a.ch.ExchangeDeclare(rule.TaskExchange, "topic", true, false, false, false, nil) } func (a *TaskManager) Publish(topic string, msg []byte) error { fmt.Println("==========================") err := a.ch.Publish(rule.TaskExchange, topic, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, Body: msg, }) if err != nil { return err } return nil } func (a *TaskManager) CreateTopicIfNotExists(topic string) error { return nil }