grpool_pool.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package grpool
  7. import (
  8. "context"
  9. "github.com/gogf/gf/v2/errors/gcode"
  10. "github.com/gogf/gf/v2/errors/gerror"
  11. )
  12. // Add pushes a new job to the pool.
  13. // The job will be executed asynchronously.
  14. func (p *Pool) Add(ctx context.Context, f Func) error {
  15. for p.closed.Val() {
  16. return gerror.NewCode(
  17. gcode.CodeInvalidOperation,
  18. "goroutine defaultPool is already closed",
  19. )
  20. }
  21. p.list.PushFront(&localPoolItem{
  22. Ctx: ctx,
  23. Func: f,
  24. })
  25. // Check and fork new worker.
  26. p.checkAndForkNewGoroutineWorker()
  27. return nil
  28. }
  29. // AddWithRecover pushes a new job to the pool with specified recover function.
  30. //
  31. // The optional `recoverFunc` is called when any panic during executing of `userFunc`.
  32. // If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`.
  33. // The job will be executed asynchronously.
  34. func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error {
  35. return p.Add(ctx, func(ctx context.Context) {
  36. defer func() {
  37. if exception := recover(); exception != nil {
  38. if recoverFunc != nil {
  39. if v, ok := exception.(error); ok && gerror.HasStack(v) {
  40. recoverFunc(ctx, v)
  41. } else {
  42. recoverFunc(ctx, gerror.NewCodef(gcode.CodeInternalPanic, "%+v", exception))
  43. }
  44. }
  45. }
  46. }()
  47. userFunc(ctx)
  48. })
  49. }
  50. // Cap returns the capacity of the pool.
  51. // This capacity is defined when pool is created.
  52. // It returns -1 if there's no limit.
  53. func (p *Pool) Cap() int {
  54. return p.limit
  55. }
  56. // Size returns current goroutine count of the pool.
  57. func (p *Pool) Size() int {
  58. return p.count.Val()
  59. }
  60. // Jobs returns current job count of the pool.
  61. // Note that, it does not return worker/goroutine count but the job/task count.
  62. func (p *Pool) Jobs() int {
  63. return p.list.Size()
  64. }
  65. // IsClosed returns if pool is closed.
  66. func (p *Pool) IsClosed() bool {
  67. return p.closed.Val()
  68. }
  69. // Close closes the goroutine pool, which makes all goroutines exit.
  70. func (p *Pool) Close() {
  71. p.closed.Set(true)
  72. }
  73. // checkAndForkNewGoroutineWorker checks and creates a new goroutine worker.
  74. // Note that the worker dies if the job function panics and the job has no recover handling.
  75. func (p *Pool) checkAndForkNewGoroutineWorker() {
  76. // Check whether fork new goroutine or not.
  77. var n int
  78. for {
  79. n = p.count.Val()
  80. if p.limit != -1 && n >= p.limit {
  81. // No need fork new goroutine.
  82. return
  83. }
  84. if p.count.Cas(n, n+1) {
  85. // Use CAS to guarantee atomicity.
  86. break
  87. }
  88. }
  89. // Create job function in goroutine.
  90. go func() {
  91. defer p.count.Add(-1)
  92. var (
  93. listItem interface{}
  94. poolItem *localPoolItem
  95. )
  96. // Harding working, one by one, job never empty, worker never die.
  97. for !p.closed.Val() {
  98. listItem = p.list.PopBack()
  99. if listItem == nil {
  100. return
  101. }
  102. poolItem = listItem.(*localPoolItem)
  103. poolItem.Func(poolItem.Ctx)
  104. }
  105. }()
  106. }