confirms.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package amqp
  2. import "sync"
  3. // confirms resequences and notifies one or multiple publisher confirmation listeners
  4. type confirms struct {
  5. m sync.Mutex
  6. listeners []chan Confirmation
  7. sequencer map[uint64]Confirmation
  8. published uint64
  9. expecting uint64
  10. }
  11. // newConfirms allocates a confirms
  12. func newConfirms() *confirms {
  13. return &confirms{
  14. sequencer: map[uint64]Confirmation{},
  15. published: 0,
  16. expecting: 1,
  17. }
  18. }
  19. func (c *confirms) Listen(l chan Confirmation) {
  20. c.m.Lock()
  21. defer c.m.Unlock()
  22. c.listeners = append(c.listeners, l)
  23. }
  24. // publish increments the publishing counter
  25. func (c *confirms) Publish() uint64 {
  26. c.m.Lock()
  27. defer c.m.Unlock()
  28. c.published++
  29. return c.published
  30. }
  31. // confirm confirms one publishing, increments the expecting delivery tag, and
  32. // removes bookkeeping for that delivery tag.
  33. func (c *confirms) confirm(confirmation Confirmation) {
  34. delete(c.sequencer, c.expecting)
  35. c.expecting++
  36. for _, l := range c.listeners {
  37. l <- confirmation
  38. }
  39. }
  40. // resequence confirms any out of order delivered confirmations
  41. func (c *confirms) resequence() {
  42. for c.expecting <= c.published {
  43. sequenced, found := c.sequencer[c.expecting]
  44. if !found {
  45. return
  46. }
  47. c.confirm(sequenced)
  48. }
  49. }
  50. // one confirms one publishing and all following in the publishing sequence
  51. func (c *confirms) One(confirmed Confirmation) {
  52. c.m.Lock()
  53. defer c.m.Unlock()
  54. if c.expecting == confirmed.DeliveryTag {
  55. c.confirm(confirmed)
  56. } else {
  57. c.sequencer[confirmed.DeliveryTag] = confirmed
  58. }
  59. c.resequence()
  60. }
  61. // multiple confirms all publishings up until the delivery tag
  62. func (c *confirms) Multiple(confirmed Confirmation) {
  63. c.m.Lock()
  64. defer c.m.Unlock()
  65. for c.expecting <= confirmed.DeliveryTag {
  66. c.confirm(Confirmation{c.expecting, confirmed.Ack})
  67. }
  68. }
  69. // Close closes all listeners, discarding any out of sequence confirmations
  70. func (c *confirms) Close() error {
  71. c.m.Lock()
  72. defer c.m.Unlock()
  73. for _, l := range c.listeners {
  74. close(l)
  75. }
  76. c.listeners = nil
  77. return nil
  78. }