task_lifecycle_manager.go 782 B

12345678910111213141516171819202122232425262728293031
  1. package manager
  2. import (
  3. "github.com/streadway/amqp"
  4. "sparrow/pkg/rule"
  5. )
  6. // TaskLifecycleManager 任务生命周期发布管理器
  7. type TaskLifecycleManager struct {
  8. ch *amqp.Channel
  9. }
  10. func NewTaskLifecycleManager(ch *amqp.Channel) Producer {
  11. return &TaskLifecycleManager{ch: ch}
  12. }
  13. func (t *TaskLifecycleManager) Init() error {
  14. return t.ch.ExchangeDeclare(rule.TaskLifecycleExchange, "fanout", true, false, false, false, nil)
  15. }
  16. func (t *TaskLifecycleManager) Publish(topic string, msg []byte) error {
  17. return t.ch.Publish(rule.TaskLifecycleExchange, topic, false, false, amqp.Publishing{
  18. Body: msg,
  19. DeliveryMode: amqp.Persistent,
  20. })
  21. }
  22. func (t *TaskLifecycleManager) CreateTopicIfNotExists(topic string) error {
  23. //TODO implement me
  24. panic("implement me")
  25. }