task_manager.go 751 B

1234567891011121314151617181920212223242526272829303132333435363738
  1. package manager
  2. import (
  3. "github.com/streadway/amqp"
  4. "sparrow/pkg/rule"
  5. )
  6. // TaskManager 任务发布管理器
  7. // 发布任务消息到
  8. type TaskManager struct {
  9. ch *amqp.Channel
  10. }
  11. func NewTaskManager(ch *amqp.Channel) Producer {
  12. return &TaskManager{
  13. ch: ch,
  14. }
  15. }
  16. func (a *TaskManager) Init() error {
  17. // 定义exchange
  18. return a.ch.ExchangeDeclare(rule.TaskExchange, "topic", true, false, false, false, nil)
  19. }
  20. func (a *TaskManager) Publish(topic string, msg []byte) error {
  21. err := a.ch.Publish(rule.TaskExchange, topic, false, false, amqp.Publishing{
  22. DeliveryMode: amqp.Persistent,
  23. Body: msg,
  24. })
  25. if err != nil {
  26. return err
  27. }
  28. return nil
  29. }
  30. func (a *TaskManager) CreateTopicIfNotExists(topic string) error {
  31. return nil
  32. }