grpool.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. // Package grpool implements a goroutine reusable pool.
  7. package grpool
  8. import (
  9. "context"
  10. "github.com/gogf/gf/v2/container/glist"
  11. "github.com/gogf/gf/v2/container/gtype"
  12. "github.com/gogf/gf/v2/errors/gcode"
  13. "github.com/gogf/gf/v2/errors/gerror"
  14. )
  15. // Func is the pool function which contains context parameter.
  16. type Func func(ctx context.Context)
  17. // RecoverFunc is the pool runtime panic recover function which contains context parameter.
  18. type RecoverFunc func(ctx context.Context, err error)
  19. // Pool manages the goroutines using pool.
  20. type Pool struct {
  21. limit int // Max goroutine count limit.
  22. count *gtype.Int // Current running goroutine count.
  23. list *glist.List // Job list for asynchronous job adding purpose.
  24. closed *gtype.Bool // Is pool closed or not.
  25. }
  26. type internalPoolItem struct {
  27. Ctx context.Context
  28. Func Func
  29. }
  30. // Default goroutine pool.
  31. var (
  32. pool = New()
  33. )
  34. // New creates and returns a new goroutine pool object.
  35. // The parameter `limit` is used to limit the max goroutine count,
  36. // which is not limited in default.
  37. func New(limit ...int) *Pool {
  38. p := &Pool{
  39. limit: -1,
  40. count: gtype.NewInt(),
  41. list: glist.New(true),
  42. closed: gtype.NewBool(),
  43. }
  44. if len(limit) > 0 && limit[0] > 0 {
  45. p.limit = limit[0]
  46. }
  47. return p
  48. }
  49. // Add pushes a new job to the pool using default goroutine pool.
  50. // The job will be executed asynchronously.
  51. func Add(ctx context.Context, f Func) error {
  52. return pool.Add(ctx, f)
  53. }
  54. // AddWithRecover pushes a new job to the pool with specified recover function.
  55. // The optional `recoverFunc` is called when any panic during executing of `userFunc`.
  56. // If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`.
  57. // The job will be executed asynchronously.
  58. func AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...RecoverFunc) error {
  59. return pool.AddWithRecover(ctx, userFunc, recoverFunc...)
  60. }
  61. // Size returns current goroutine count of default goroutine pool.
  62. func Size() int {
  63. return pool.Size()
  64. }
  65. // Jobs returns current job count of default goroutine pool.
  66. func Jobs() int {
  67. return pool.Jobs()
  68. }
  69. // Add pushes a new job to the pool.
  70. // The job will be executed asynchronously.
  71. func (p *Pool) Add(ctx context.Context, f Func) error {
  72. for p.closed.Val() {
  73. return gerror.NewCode(gcode.CodeInvalidOperation, "pool closed")
  74. }
  75. p.list.PushFront(&internalPoolItem{
  76. Ctx: ctx,
  77. Func: f,
  78. })
  79. // Check whether fork new goroutine or not.
  80. var n int
  81. for {
  82. n = p.count.Val()
  83. if p.limit != -1 && n >= p.limit {
  84. // No need fork new goroutine.
  85. return nil
  86. }
  87. if p.count.Cas(n, n+1) {
  88. // Use CAS to guarantee atomicity.
  89. break
  90. }
  91. }
  92. p.fork()
  93. return nil
  94. }
  95. // AddWithRecover pushes a new job to the pool with specified recover function.
  96. // The optional `recoverFunc` is called when any panic during executing of `userFunc`.
  97. // If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`.
  98. // The job will be executed asynchronously.
  99. func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...RecoverFunc) error {
  100. return p.Add(ctx, func(ctx context.Context) {
  101. defer func() {
  102. if exception := recover(); exception != nil {
  103. if len(recoverFunc) > 0 && recoverFunc[0] != nil {
  104. if v, ok := exception.(error); ok && gerror.HasStack(v) {
  105. recoverFunc[0](ctx, v)
  106. } else {
  107. recoverFunc[0](ctx, gerror.Newf(`%+v`, exception))
  108. }
  109. }
  110. }
  111. }()
  112. userFunc(ctx)
  113. })
  114. }
  115. // Cap returns the capacity of the pool.
  116. // This capacity is defined when pool is created.
  117. // It returns -1 if there's no limit.
  118. func (p *Pool) Cap() int {
  119. return p.limit
  120. }
  121. // Size returns current goroutine count of the pool.
  122. func (p *Pool) Size() int {
  123. return p.count.Val()
  124. }
  125. // Jobs returns current job count of the pool.
  126. // Note that, it does not return worker/goroutine count but the job/task count.
  127. func (p *Pool) Jobs() int {
  128. return p.list.Size()
  129. }
  130. // fork creates a new goroutine worker.
  131. // Note that the worker dies if the job function panics.
  132. func (p *Pool) fork() {
  133. go func() {
  134. defer p.count.Add(-1)
  135. var (
  136. listItem interface{}
  137. poolItem *internalPoolItem
  138. )
  139. for !p.closed.Val() {
  140. if listItem = p.list.PopBack(); listItem != nil {
  141. poolItem = listItem.(*internalPoolItem)
  142. poolItem.Func(poolItem.Ctx)
  143. } else {
  144. return
  145. }
  146. }
  147. }()
  148. }
  149. // IsClosed returns if pool is closed.
  150. func (p *Pool) IsClosed() bool {
  151. return p.closed.Val()
  152. }
  153. // Close closes the goroutine pool, which makes all goroutines exit.
  154. func (p *Pool) Close() {
  155. p.closed.Set(true)
  156. }