task_manager.go 801 B

12345678910111213141516171819202122232425262728293031323334353637383940
  1. package manager
  2. import (
  3. "fmt"
  4. "github.com/streadway/amqp"
  5. "sparrow/pkg/rule"
  6. )
  7. // TaskManager 任务发布管理器
  8. // 发布任务消息到
  9. type TaskManager struct {
  10. ch *amqp.Channel
  11. }
  12. func NewTaskManager(ch *amqp.Channel) Producer {
  13. return &TaskManager{
  14. ch: ch,
  15. }
  16. }
  17. func (a *TaskManager) Init() error {
  18. // 定义exchange
  19. return a.ch.ExchangeDeclare(rule.TaskExchange, "topic", true, false, false, false, nil)
  20. }
  21. func (a *TaskManager) Publish(topic string, msg []byte) error {
  22. fmt.Println("==========================")
  23. err := a.ch.Publish(rule.TaskExchange, topic, false, false, amqp.Publishing{
  24. DeliveryMode: amqp.Persistent,
  25. Body: msg,
  26. })
  27. if err != nil {
  28. return err
  29. }
  30. return nil
  31. }
  32. func (a *TaskManager) CreateTopicIfNotExists(topic string) error {
  33. return nil
  34. }