consumers.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Source code and contact info at http://github.com/streadway/amqp
  5. package amqp
  6. import (
  7. "os"
  8. "strconv"
  9. "sync"
  10. "sync/atomic"
  11. )
  12. var consumerSeq uint64
  13. const consumerTagLengthMax = 0xFF // see writeShortstr
  14. func uniqueConsumerTag() string {
  15. return commandNameBasedUniqueConsumerTag(os.Args[0])
  16. }
  17. func commandNameBasedUniqueConsumerTag(commandName string) string {
  18. tagPrefix := "ctag-"
  19. tagInfix := commandName
  20. tagSuffix := "-" + strconv.FormatUint(atomic.AddUint64(&consumerSeq, 1), 10)
  21. if len(tagPrefix)+len(tagInfix)+len(tagSuffix) > consumerTagLengthMax {
  22. tagInfix = "streadway/amqp"
  23. }
  24. return tagPrefix + tagInfix + tagSuffix
  25. }
  26. type consumerBuffers map[string]chan *Delivery
  27. // Concurrent type that manages the consumerTag ->
  28. // ingress consumerBuffer mapping
  29. type consumers struct {
  30. sync.WaitGroup // one for buffer
  31. closed chan struct{} // signal buffer
  32. sync.Mutex // protects below
  33. chans consumerBuffers
  34. }
  35. func makeConsumers() *consumers {
  36. return &consumers{
  37. closed: make(chan struct{}),
  38. chans: make(consumerBuffers),
  39. }
  40. }
  41. func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) {
  42. defer close(out)
  43. defer subs.Done()
  44. var inflight = in
  45. var queue []*Delivery
  46. for delivery := range in {
  47. queue = append(queue, delivery)
  48. for len(queue) > 0 {
  49. select {
  50. case <-subs.closed:
  51. // closed before drained, drop in-flight
  52. return
  53. case delivery, consuming := <-inflight:
  54. if consuming {
  55. queue = append(queue, delivery)
  56. } else {
  57. inflight = nil
  58. }
  59. case out <- *queue[0]:
  60. queue = queue[1:]
  61. }
  62. }
  63. }
  64. }
  65. // On key conflict, close the previous channel.
  66. func (subs *consumers) add(tag string, consumer chan Delivery) {
  67. subs.Lock()
  68. defer subs.Unlock()
  69. if prev, found := subs.chans[tag]; found {
  70. close(prev)
  71. }
  72. in := make(chan *Delivery)
  73. subs.chans[tag] = in
  74. subs.Add(1)
  75. go subs.buffer(in, consumer)
  76. }
  77. func (subs *consumers) cancel(tag string) (found bool) {
  78. subs.Lock()
  79. defer subs.Unlock()
  80. ch, found := subs.chans[tag]
  81. if found {
  82. delete(subs.chans, tag)
  83. close(ch)
  84. }
  85. return found
  86. }
  87. func (subs *consumers) close() {
  88. subs.Lock()
  89. defer subs.Unlock()
  90. close(subs.closed)
  91. for tag, ch := range subs.chans {
  92. delete(subs.chans, tag)
  93. close(ch)
  94. }
  95. subs.Wait()
  96. }
  97. // Sends a delivery to a the consumer identified by `tag`.
  98. // If unbuffered channels are used for Consume this method
  99. // could block all deliveries until the consumer
  100. // receives on the other end of the channel.
  101. func (subs *consumers) send(tag string, msg *Delivery) bool {
  102. subs.Lock()
  103. defer subs.Unlock()
  104. buffer, found := subs.chans[tag]
  105. if found {
  106. buffer <- msg
  107. }
  108. return found
  109. }