gtimer_queue.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gtimer
  7. import (
  8. "container/heap"
  9. "github.com/gogf/gf/container/gtype"
  10. "math"
  11. "sync"
  12. )
  13. // priorityQueue is an abstract data type similar to a regular queue or stack data structure in which
  14. // each element additionally has a "priority" associated with it. In a priority queue, an element with
  15. // high priority is served before an element with low priority.
  16. // priorityQueue is based on heap structure.
  17. type priorityQueue struct {
  18. mu sync.Mutex
  19. heap *priorityQueueHeap // the underlying queue items manager using heap.
  20. nextPriority *gtype.Int64 // nextPriority stores the next priority value of the heap, which is used to check if necessary to call the Pop of heap by Timer.
  21. }
  22. // priorityQueueHeap is a heap manager, of which the underlying `array` is a array implementing a heap structure.
  23. type priorityQueueHeap struct {
  24. array []priorityQueueItem
  25. }
  26. // priorityQueueItem stores the queue item which has a `priority` attribute to sort itself in heap.
  27. type priorityQueueItem struct {
  28. value interface{}
  29. priority int64
  30. }
  31. // newPriorityQueue creates and returns a priority queue.
  32. func newPriorityQueue() *priorityQueue {
  33. queue := &priorityQueue{
  34. heap: &priorityQueueHeap{array: make([]priorityQueueItem, 0)},
  35. nextPriority: gtype.NewInt64(math.MaxInt64),
  36. }
  37. heap.Init(queue.heap)
  38. return queue
  39. }
  40. // NextPriority retrieves and returns the minimum and the most priority value of the queue.
  41. func (q *priorityQueue) NextPriority() int64 {
  42. return q.nextPriority.Val()
  43. }
  44. // Push pushes a value to the queue.
  45. // The `priority` specifies the priority of the value.
  46. // The lesser the `priority` value the higher priority of the `value`.
  47. func (q *priorityQueue) Push(value interface{}, priority int64) {
  48. q.mu.Lock()
  49. defer q.mu.Unlock()
  50. heap.Push(q.heap, priorityQueueItem{
  51. value: value,
  52. priority: priority,
  53. })
  54. // Update the minimum priority using atomic operation.
  55. nextPriority := q.nextPriority.Val()
  56. if priority >= nextPriority {
  57. return
  58. }
  59. q.nextPriority.Set(priority)
  60. }
  61. // Pop retrieves, removes and returns the most high priority value from the queue.
  62. func (q *priorityQueue) Pop() interface{} {
  63. q.mu.Lock()
  64. defer q.mu.Unlock()
  65. if v := heap.Pop(q.heap); v != nil {
  66. var nextPriority int64 = math.MaxInt64
  67. if len(q.heap.array) > 0 {
  68. nextPriority = q.heap.array[0].priority
  69. }
  70. q.nextPriority.Set(nextPriority)
  71. return v.(priorityQueueItem).value
  72. }
  73. return nil
  74. }