gqueue.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. // Package gqueue provides dynamic/static concurrent-safe queue.
  7. //
  8. // Features:
  9. //
  10. // 1. FIFO queue(data -> list -> chan);
  11. //
  12. // 2. Fast creation and initialization;
  13. //
  14. // 3. Support dynamic queue size(unlimited queue size);
  15. //
  16. // 4. Blocking when reading data from queue;
  17. package gqueue
  18. import (
  19. "math"
  20. "github.com/gogf/gf/v2/container/glist"
  21. "github.com/gogf/gf/v2/container/gtype"
  22. )
  23. // Queue is a concurrent-safe queue built on doubly linked list and channel.
  24. type Queue struct {
  25. limit int // Limit for queue size.
  26. list *glist.List // Underlying list structure for data maintaining.
  27. closed *gtype.Bool // Whether queue is closed.
  28. events chan struct{} // Events for data writing.
  29. C chan interface{} // Underlying channel for data reading.
  30. }
  31. const (
  32. defaultQueueSize = 10000 // Size for queue buffer.
  33. defaultBatchSize = 10 // Max batch size per-fetching from list.
  34. )
  35. // New returns an empty queue object.
  36. // Optional parameter `limit` is used to limit the size of the queue, which is unlimited in default.
  37. // When `limit` is given, the queue will be static and high performance which is comparable with stdlib channel.
  38. func New(limit ...int) *Queue {
  39. q := &Queue{
  40. closed: gtype.NewBool(),
  41. }
  42. if len(limit) > 0 && limit[0] > 0 {
  43. q.limit = limit[0]
  44. q.C = make(chan interface{}, limit[0])
  45. } else {
  46. q.list = glist.New(true)
  47. q.events = make(chan struct{}, math.MaxInt32)
  48. q.C = make(chan interface{}, defaultQueueSize)
  49. go q.asyncLoopFromListToChannel()
  50. }
  51. return q
  52. }
  53. // Push pushes the data `v` into the queue.
  54. // Note that it would panic if Push is called after the queue is closed.
  55. func (q *Queue) Push(v interface{}) {
  56. if q.limit > 0 {
  57. q.C <- v
  58. } else {
  59. q.list.PushBack(v)
  60. if len(q.events) < defaultQueueSize {
  61. q.events <- struct{}{}
  62. }
  63. }
  64. }
  65. // Pop pops an item from the queue in FIFO way.
  66. // Note that it would return nil immediately if Pop is called after the queue is closed.
  67. func (q *Queue) Pop() interface{} {
  68. return <-q.C
  69. }
  70. // Close closes the queue.
  71. // Notice: It would notify all goroutines return immediately,
  72. // which are being blocked reading using Pop method.
  73. func (q *Queue) Close() {
  74. if !q.closed.Cas(false, true) {
  75. return
  76. }
  77. if q.events != nil {
  78. close(q.events)
  79. }
  80. if q.limit > 0 {
  81. close(q.C)
  82. } else {
  83. for i := 0; i < defaultBatchSize; i++ {
  84. q.Pop()
  85. }
  86. }
  87. }
  88. // Len returns the length of the queue.
  89. // Note that the result might not be accurate if using unlimited queue size as there's an
  90. // asynchronous channel reading the list constantly.
  91. func (q *Queue) Len() (length int64) {
  92. bufferedSize := int64(len(q.C))
  93. if q.limit > 0 {
  94. return bufferedSize
  95. }
  96. return int64(q.list.Size()) + bufferedSize
  97. }
  98. // Size is alias of Len.
  99. // Deprecated: use Len instead.
  100. func (q *Queue) Size() int64 {
  101. return q.Len()
  102. }
  103. // asyncLoopFromListToChannel starts an asynchronous goroutine,
  104. // which handles the data synchronization from list `q.list` to channel `q.C`.
  105. func (q *Queue) asyncLoopFromListToChannel() {
  106. defer func() {
  107. if q.closed.Val() {
  108. _ = recover()
  109. }
  110. }()
  111. for !q.closed.Val() {
  112. <-q.events
  113. for !q.closed.Val() {
  114. if bufferLength := q.list.Len(); bufferLength > 0 {
  115. // When q.C is closed, it will panic here, especially q.C is being blocked for writing.
  116. // If any error occurs here, it will be caught by recover and be ignored.
  117. for i := 0; i < bufferLength; i++ {
  118. q.C <- q.list.PopFront()
  119. }
  120. } else {
  121. break
  122. }
  123. }
  124. // Clear q.events to remain just one event to do the next synchronization check.
  125. for i := 0; i < len(q.events)-1; i++ {
  126. <-q.events
  127. }
  128. }
  129. // It should be here to close `q.C` if `q` is unlimited size.
  130. // It's the sender's responsibility to close channel when it should be closed.
  131. close(q.C)
  132. }