grpool.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. // Copyright 2017-2019 gf Author(https://github.com/gogf/gf). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. // Package grpool implements a goroutine reusable pool.
  7. package grpool
  8. import (
  9. "errors"
  10. "fmt"
  11. "github.com/gogf/gf/container/glist"
  12. "github.com/gogf/gf/container/gtype"
  13. )
  14. // Goroutine Pool
  15. type Pool struct {
  16. limit int // Max goroutine count limit.
  17. count *gtype.Int // Current running goroutine count.
  18. list *glist.List // Job list for asynchronous job adding purpose.
  19. closed *gtype.Bool // Is pool closed or not.
  20. }
  21. // Default goroutine pool.
  22. var pool = New()
  23. // New creates and returns a new goroutine pool object.
  24. // The parameter <limit> is used to limit the max goroutine count,
  25. // which is not limited in default.
  26. func New(limit ...int) *Pool {
  27. p := &Pool{
  28. limit: -1,
  29. count: gtype.NewInt(),
  30. list: glist.New(true),
  31. closed: gtype.NewBool(),
  32. }
  33. if len(limit) > 0 && limit[0] > 0 {
  34. p.limit = limit[0]
  35. }
  36. return p
  37. }
  38. // Add pushes a new job to the pool using default goroutine pool.
  39. // The job will be executed asynchronously.
  40. func Add(f func()) error {
  41. return pool.Add(f)
  42. }
  43. // AddWithRecover pushes a new job to the pool with specified recover function.
  44. // The optional <recoverFunc> is called when any panic during executing of <userFunc>.
  45. // If <recoverFunc> is not passed or given nil, it ignores the panic from <userFunc>.
  46. // The job will be executed asynchronously.
  47. func AddWithRecover(userFunc func(), recoverFunc ...func(err error)) error {
  48. return pool.AddWithRecover(userFunc, recoverFunc...)
  49. }
  50. // Size returns current goroutine count of default goroutine pool.
  51. func Size() int {
  52. return pool.Size()
  53. }
  54. // Jobs returns current job count of default goroutine pool.
  55. func Jobs() int {
  56. return pool.Jobs()
  57. }
  58. // Add pushes a new job to the pool.
  59. // The job will be executed asynchronously.
  60. func (p *Pool) Add(f func()) error {
  61. for p.closed.Val() {
  62. return errors.New("pool closed")
  63. }
  64. p.list.PushFront(f)
  65. // Check whether fork new goroutine or not.
  66. var n int
  67. for {
  68. n = p.count.Val()
  69. if p.limit != -1 && n >= p.limit {
  70. // No need fork new goroutine.
  71. return nil
  72. }
  73. if p.count.Cas(n, n+1) {
  74. // Use CAS to guarantee atomicity.
  75. break
  76. }
  77. }
  78. p.fork()
  79. return nil
  80. }
  81. // AddWithRecover pushes a new job to the pool with specified recover function.
  82. // The optional <recoverFunc> is called when any panic during executing of <userFunc>.
  83. // If <recoverFunc> is not passed or given nil, it ignores the panic from <userFunc>.
  84. // The job will be executed asynchronously.
  85. func (p *Pool) AddWithRecover(userFunc func(), recoverFunc ...func(err error)) error {
  86. return p.Add(func() {
  87. defer func() {
  88. if err := recover(); err != nil {
  89. if len(recoverFunc) > 0 && recoverFunc[0] != nil {
  90. recoverFunc[0](errors.New(fmt.Sprintf(`%v`, err)))
  91. }
  92. }
  93. }()
  94. userFunc()
  95. })
  96. }
  97. // Cap returns the capacity of the pool.
  98. // This capacity is defined when pool is created.
  99. // It returns -1 if there's no limit.
  100. func (p *Pool) Cap() int {
  101. return p.limit
  102. }
  103. // Size returns current goroutine count of the pool.
  104. func (p *Pool) Size() int {
  105. return p.count.Val()
  106. }
  107. // Jobs returns current job count of the pool.
  108. // Note that, it does not return worker/goroutine count but the job/task count.
  109. func (p *Pool) Jobs() int {
  110. return p.list.Size()
  111. }
  112. // fork creates a new goroutine worker.
  113. // Note that the worker dies if the job function panics.
  114. func (p *Pool) fork() {
  115. go func() {
  116. defer p.count.Add(-1)
  117. var job interface{}
  118. for !p.closed.Val() {
  119. if job = p.list.PopBack(); job != nil {
  120. job.(func())()
  121. } else {
  122. return
  123. }
  124. }
  125. }()
  126. }
  127. // IsClosed returns if pool is closed.
  128. func (p *Pool) IsClosed() bool {
  129. return p.closed.Val()
  130. }
  131. // Close closes the goroutine pool, which makes all goroutines exit.
  132. func (p *Pool) Close() {
  133. p.closed.Set(true)
  134. }