gqueue.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. // Package gqueue provides dynamic/static concurrent-safe queue.
  7. //
  8. // Features:
  9. //
  10. // 1. FIFO queue(data -> list -> chan);
  11. //
  12. // 2. Fast creation and initialization;
  13. //
  14. // 3. Support dynamic queue size(unlimited queue size);
  15. //
  16. // 4. Blocking when reading data from queue;
  17. package gqueue
  18. import (
  19. "math"
  20. "github.com/gogf/gf/v2/container/glist"
  21. "github.com/gogf/gf/v2/container/gtype"
  22. )
  23. // Queue is a concurrent-safe queue built on doubly linked list and channel.
  24. type Queue struct {
  25. limit int // Limit for queue size.
  26. list *glist.List // Underlying list structure for data maintaining.
  27. closed *gtype.Bool // Whether queue is closed.
  28. events chan struct{} // Events for data writing.
  29. C chan interface{} // Underlying channel for data reading.
  30. }
  31. const (
  32. defaultQueueSize = 10000 // Size for queue buffer.
  33. defaultBatchSize = 10 // Max batch size per-fetching from list.
  34. )
  35. // New returns an empty queue object.
  36. // Optional parameter `limit` is used to limit the size of the queue, which is unlimited in default.
  37. // When `limit` is given, the queue will be static and high performance which is comparable with stdlib channel.
  38. func New(limit ...int) *Queue {
  39. q := &Queue{
  40. closed: gtype.NewBool(),
  41. }
  42. if len(limit) > 0 && limit[0] > 0 {
  43. q.limit = limit[0]
  44. q.C = make(chan interface{}, limit[0])
  45. } else {
  46. q.list = glist.New(true)
  47. q.events = make(chan struct{}, math.MaxInt32)
  48. q.C = make(chan interface{}, defaultQueueSize)
  49. go q.asyncLoopFromListToChannel()
  50. }
  51. return q
  52. }
  53. // asyncLoopFromListToChannel starts an asynchronous goroutine,
  54. // which handles the data synchronization from list `q.list` to channel `q.C`.
  55. func (q *Queue) asyncLoopFromListToChannel() {
  56. defer func() {
  57. if q.closed.Val() {
  58. _ = recover()
  59. }
  60. }()
  61. for !q.closed.Val() {
  62. <-q.events
  63. for !q.closed.Val() {
  64. if length := q.list.Len(); length > 0 {
  65. if length > defaultBatchSize {
  66. length = defaultBatchSize
  67. }
  68. for _, v := range q.list.PopFronts(length) {
  69. // When q.C is closed, it will panic here, especially q.C is being blocked for writing.
  70. // If any error occurs here, it will be caught by recover and be ignored.
  71. q.C <- v
  72. }
  73. } else {
  74. break
  75. }
  76. }
  77. // Clear q.events to remain just one event to do the next synchronization check.
  78. for i := 0; i < len(q.events)-1; i++ {
  79. <-q.events
  80. }
  81. }
  82. // It should be here to close `q.C` if `q` is unlimited size.
  83. // It's the sender's responsibility to close channel when it should be closed.
  84. close(q.C)
  85. }
  86. // Push pushes the data `v` into the queue.
  87. // Note that it would panic if Push is called after the queue is closed.
  88. func (q *Queue) Push(v interface{}) {
  89. if q.limit > 0 {
  90. q.C <- v
  91. } else {
  92. q.list.PushBack(v)
  93. if len(q.events) < defaultQueueSize {
  94. q.events <- struct{}{}
  95. }
  96. }
  97. }
  98. // Pop pops an item from the queue in FIFO way.
  99. // Note that it would return nil immediately if Pop is called after the queue is closed.
  100. func (q *Queue) Pop() interface{} {
  101. return <-q.C
  102. }
  103. // Close closes the queue.
  104. // Notice: It would notify all goroutines return immediately,
  105. // which are being blocked reading using Pop method.
  106. func (q *Queue) Close() {
  107. q.closed.Set(true)
  108. if q.events != nil {
  109. close(q.events)
  110. }
  111. if q.limit > 0 {
  112. close(q.C)
  113. } else {
  114. for i := 0; i < defaultBatchSize; i++ {
  115. q.Pop()
  116. }
  117. }
  118. }
  119. // Len returns the length of the queue.
  120. // Note that the result might not be accurate as there's an
  121. // asynchronous channel reading the list constantly.
  122. func (q *Queue) Len() (length int) {
  123. if q.list != nil {
  124. length += q.list.Len()
  125. }
  126. length += len(q.C)
  127. return
  128. }
  129. // Size is alias of Len.
  130. func (q *Queue) Size() int {
  131. return q.Len()
  132. }