pool_single.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package pool
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. )
  6. const (
  7. stateDefault = 0
  8. stateInited = 1
  9. stateClosed = 2
  10. )
  11. type BadConnError struct {
  12. wrapped error
  13. }
  14. var _ error = (*BadConnError)(nil)
  15. func (e BadConnError) Error() string {
  16. return "pg: Conn is in a bad state"
  17. }
  18. func (e BadConnError) Unwrap() error {
  19. return e.wrapped
  20. }
  21. type SingleConnPool struct {
  22. pool Pooler
  23. level int32 // atomic
  24. state uint32 // atomic
  25. ch chan *Conn
  26. _badConnError atomic.Value
  27. }
  28. var _ Pooler = (*SingleConnPool)(nil)
  29. func NewSingleConnPool(pool Pooler) *SingleConnPool {
  30. p, ok := pool.(*SingleConnPool)
  31. if !ok {
  32. p = &SingleConnPool{
  33. pool: pool,
  34. ch: make(chan *Conn, 1),
  35. }
  36. }
  37. atomic.AddInt32(&p.level, 1)
  38. return p
  39. }
  40. func (p *SingleConnPool) SetConn(cn *Conn) {
  41. if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
  42. p.ch <- cn
  43. } else {
  44. panic("not reached")
  45. }
  46. }
  47. func (p *SingleConnPool) NewConn() (*Conn, error) {
  48. return p.pool.NewConn()
  49. }
  50. func (p *SingleConnPool) CloseConn(cn *Conn) error {
  51. return p.pool.CloseConn(cn)
  52. }
  53. func (p *SingleConnPool) Get() (*Conn, error) {
  54. // In worst case this races with Close which is not a very common operation.
  55. for i := 0; i < 1000; i++ {
  56. switch atomic.LoadUint32(&p.state) {
  57. case stateDefault:
  58. cn, err := p.pool.Get()
  59. if err != nil {
  60. return nil, err
  61. }
  62. if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
  63. return cn, nil
  64. }
  65. p.pool.Remove(cn, ErrClosed)
  66. case stateInited:
  67. if err := p.badConnError(); err != nil {
  68. return nil, err
  69. }
  70. cn, ok := <-p.ch
  71. if !ok {
  72. return nil, ErrClosed
  73. }
  74. return cn, nil
  75. case stateClosed:
  76. return nil, ErrClosed
  77. default:
  78. panic("not reached")
  79. }
  80. }
  81. return nil, fmt.Errorf("pg: SingleConnPool.Get: infinite loop")
  82. }
  83. func (p *SingleConnPool) Put(cn *Conn) {
  84. defer func() {
  85. if recover() != nil {
  86. p.freeConn(cn)
  87. }
  88. }()
  89. p.ch <- cn
  90. }
  91. func (p *SingleConnPool) freeConn(cn *Conn) {
  92. if err := p.badConnError(); err != nil {
  93. p.pool.Remove(cn, err)
  94. } else {
  95. p.pool.Put(cn)
  96. }
  97. }
  98. func (p *SingleConnPool) Remove(cn *Conn, reason error) {
  99. defer func() {
  100. if recover() != nil {
  101. p.pool.Remove(cn, ErrClosed)
  102. }
  103. }()
  104. p._badConnError.Store(BadConnError{wrapped: reason})
  105. p.ch <- cn
  106. }
  107. func (p *SingleConnPool) Len() int {
  108. switch atomic.LoadUint32(&p.state) {
  109. case stateDefault:
  110. return 0
  111. case stateInited:
  112. return 1
  113. case stateClosed:
  114. return 0
  115. default:
  116. panic("not reached")
  117. }
  118. }
  119. func (p *SingleConnPool) IdleLen() int {
  120. return len(p.ch)
  121. }
  122. func (p *SingleConnPool) Stats() *Stats {
  123. return &Stats{}
  124. }
  125. func (p *SingleConnPool) Close() error {
  126. level := atomic.AddInt32(&p.level, -1)
  127. if level > 0 {
  128. return nil
  129. }
  130. for i := 0; i < 1000; i++ {
  131. state := atomic.LoadUint32(&p.state)
  132. if state == stateClosed {
  133. return ErrClosed
  134. }
  135. if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) {
  136. close(p.ch)
  137. cn, ok := <-p.ch
  138. if ok {
  139. p.freeConn(cn)
  140. }
  141. return nil
  142. }
  143. }
  144. return fmt.Errorf("pg: SingleConnPool.Close: infinite loop")
  145. }
  146. func (p *SingleConnPool) Reset() error {
  147. if p.badConnError() == nil {
  148. return nil
  149. }
  150. select {
  151. case cn, ok := <-p.ch:
  152. if !ok {
  153. return ErrClosed
  154. }
  155. p.pool.Remove(cn, ErrClosed)
  156. p._badConnError.Store(BadConnError{wrapped: nil})
  157. default:
  158. return fmt.Errorf("pg: SingleConnPool does not have a Conn")
  159. }
  160. if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) {
  161. state := atomic.LoadUint32(&p.state)
  162. return fmt.Errorf("pg: invalid SingleConnPool state: %d", state)
  163. }
  164. return nil
  165. }
  166. func (p *SingleConnPool) badConnError() error {
  167. if v := p._badConnError.Load(); v != nil {
  168. err := v.(BadConnError)
  169. if err.wrapped != nil {
  170. return err
  171. }
  172. }
  173. return nil
  174. }