gqueue.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. // Copyright 2017 gf Author(https://github.com/gogf/gf). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. // Package gqueue provides dynamic/static concurrent-safe queue.
  7. //
  8. // Features:
  9. //
  10. // 1. FIFO queue(data -> list -> chan);
  11. //
  12. // 2. Fast creation and initialization;
  13. //
  14. // 3. Support dynamic queue size(unlimited queue size);
  15. //
  16. // 4. Blocking when reading data from queue;
  17. //
  18. package gqueue
  19. import (
  20. "math"
  21. "github.com/gogf/gf/container/glist"
  22. "github.com/gogf/gf/container/gtype"
  23. )
  24. // Queue is a concurrent-safe queue built on doubly linked list and channel.
  25. type Queue struct {
  26. limit int // Limit for queue size.
  27. list *glist.List // Underlying list structure for data maintaining.
  28. closed *gtype.Bool // Whether queue is closed.
  29. events chan struct{} // Events for data writing.
  30. C chan interface{} // Underlying channel for data reading.
  31. }
  32. const (
  33. // Size for queue buffer.
  34. gDEFAULT_QUEUE_SIZE = 10000
  35. // Max batch size per-fetching from list.
  36. gDEFAULT_MAX_BATCH_SIZE = 10
  37. )
  38. // New returns an empty queue object.
  39. // Optional parameter <limit> is used to limit the size of the queue, which is unlimited in default.
  40. // When <limit> is given, the queue will be static and high performance which is comparable with stdlib channel.
  41. func New(limit ...int) *Queue {
  42. q := &Queue{
  43. closed: gtype.NewBool(),
  44. }
  45. if len(limit) > 0 && limit[0] > 0 {
  46. q.limit = limit[0]
  47. q.C = make(chan interface{}, limit[0])
  48. } else {
  49. q.list = glist.New(true)
  50. q.events = make(chan struct{}, math.MaxInt32)
  51. q.C = make(chan interface{}, gDEFAULT_QUEUE_SIZE)
  52. go q.asyncLoopFromListToChannel()
  53. }
  54. return q
  55. }
  56. // asyncLoopFromListToChannel starts an asynchronous goroutine,
  57. // which handles the data synchronization from list <q.list> to channel <q.C>.
  58. func (q *Queue) asyncLoopFromListToChannel() {
  59. defer func() {
  60. if q.closed.Val() {
  61. _ = recover()
  62. }
  63. }()
  64. for !q.closed.Val() {
  65. <-q.events
  66. for !q.closed.Val() {
  67. if length := q.list.Len(); length > 0 {
  68. if length > gDEFAULT_MAX_BATCH_SIZE {
  69. length = gDEFAULT_MAX_BATCH_SIZE
  70. }
  71. for _, v := range q.list.PopFronts(length) {
  72. // When q.C is closed, it will panic here, especially q.C is being blocked for writing.
  73. // If any error occurs here, it will be caught by recover and be ignored.
  74. q.C <- v
  75. }
  76. } else {
  77. break
  78. }
  79. }
  80. // Clear q.events to remain just one event to do the next synchronization check.
  81. for i := 0; i < len(q.events)-1; i++ {
  82. <-q.events
  83. }
  84. }
  85. // It should be here to close q.C if <q> is unlimited size.
  86. // It's the sender's responsibility to close channel when it should be closed.
  87. close(q.C)
  88. }
  89. // Push pushes the data <v> into the queue.
  90. // Note that it would panics if Push is called after the queue is closed.
  91. func (q *Queue) Push(v interface{}) {
  92. if q.limit > 0 {
  93. q.C <- v
  94. } else {
  95. q.list.PushBack(v)
  96. if len(q.events) < gDEFAULT_QUEUE_SIZE {
  97. q.events <- struct{}{}
  98. }
  99. }
  100. }
  101. // Pop pops an item from the queue in FIFO way.
  102. // Note that it would return nil immediately if Pop is called after the queue is closed.
  103. func (q *Queue) Pop() interface{} {
  104. return <-q.C
  105. }
  106. // Close closes the queue.
  107. // Notice: It would notify all goroutines return immediately,
  108. // which are being blocked reading using Pop method.
  109. func (q *Queue) Close() {
  110. q.closed.Set(true)
  111. if q.events != nil {
  112. close(q.events)
  113. }
  114. if q.limit > 0 {
  115. close(q.C)
  116. }
  117. for i := 0; i < gDEFAULT_MAX_BATCH_SIZE; i++ {
  118. q.Pop()
  119. }
  120. }
  121. // Len returns the length of the queue.
  122. // Note that the result might not be accurate as there's a
  123. // asynchronize channel reading the list constantly.
  124. func (q *Queue) Len() (length int) {
  125. if q.list != nil {
  126. length += q.list.Len()
  127. }
  128. length += len(q.C)
  129. return
  130. }
  131. // Size is alias of Len.
  132. func (q *Queue) Size() int {
  133. return q.Len()
  134. }