consumers.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Source code and contact info at http://github.com/streadway/amqp
  5. package amqp
  6. import (
  7. "fmt"
  8. "os"
  9. "sync"
  10. "sync/atomic"
  11. )
  12. var consumerSeq uint64
  13. func uniqueConsumerTag() string {
  14. return fmt.Sprintf("ctag-%s-%d", os.Args[0], atomic.AddUint64(&consumerSeq, 1))
  15. }
  16. type consumerBuffers map[string]chan *Delivery
  17. // Concurrent type that manages the consumerTag ->
  18. // ingress consumerBuffer mapping
  19. type consumers struct {
  20. sync.Mutex
  21. chans consumerBuffers
  22. }
  23. func makeConsumers() *consumers {
  24. return &consumers{chans: make(consumerBuffers)}
  25. }
  26. func bufferDeliveries(in chan *Delivery, out chan Delivery) {
  27. var queue []*Delivery
  28. var queueIn = in
  29. for delivery := range in {
  30. select {
  31. case out <- *delivery:
  32. // delivered immediately while the consumer chan can receive
  33. default:
  34. queue = append(queue, delivery)
  35. }
  36. for len(queue) > 0 {
  37. select {
  38. case out <- *queue[0]:
  39. queue = queue[1:]
  40. case delivery, open := <-queueIn:
  41. if open {
  42. queue = append(queue, delivery)
  43. } else {
  44. // stop receiving to drain the queue
  45. queueIn = nil
  46. }
  47. }
  48. }
  49. }
  50. close(out)
  51. }
  52. // On key conflict, close the previous channel.
  53. func (me *consumers) add(tag string, consumer chan Delivery) {
  54. me.Lock()
  55. defer me.Unlock()
  56. if prev, found := me.chans[tag]; found {
  57. close(prev)
  58. }
  59. in := make(chan *Delivery)
  60. go bufferDeliveries(in, consumer)
  61. me.chans[tag] = in
  62. }
  63. func (me *consumers) close(tag string) (found bool) {
  64. me.Lock()
  65. defer me.Unlock()
  66. ch, found := me.chans[tag]
  67. if found {
  68. delete(me.chans, tag)
  69. close(ch)
  70. }
  71. return found
  72. }
  73. func (me *consumers) closeAll() {
  74. me.Lock()
  75. defer me.Unlock()
  76. for _, ch := range me.chans {
  77. close(ch)
  78. }
  79. me.chans = make(consumerBuffers)
  80. }
  81. // Sends a delivery to a the consumer identified by `tag`.
  82. // If unbuffered channels are used for Consume this method
  83. // could block all deliveries until the consumer
  84. // receives on the other end of the channel.
  85. func (me *consumers) send(tag string, msg *Delivery) bool {
  86. me.Lock()
  87. defer me.Unlock()
  88. buffer, found := me.chans[tag]
  89. if found {
  90. buffer <- msg
  91. }
  92. return found
  93. }