pool.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705
  1. package radix
  2. import (
  3. "io"
  4. "net"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "errors"
  9. "github.com/mediocregopher/radix/v3/resp"
  10. "github.com/mediocregopher/radix/v3/trace"
  11. )
  12. // ErrPoolEmpty is used by Pools created using the PoolOnEmptyErrAfter option.
  13. var ErrPoolEmpty = errors.New("connection pool is empty")
  14. var errPoolFull = errors.New("connection pool is full")
  15. // ioErrConn is a Conn which tracks the last net.Error which was seen either
  16. // during an Encode call or a Decode call.
  17. type ioErrConn struct {
  18. Conn
  19. // The most recent network error which occurred when either reading
  20. // or writing. A critical network error is basically any non-application
  21. // level error, e.g. a timeout, disconnect, etc... Close is automatically
  22. // called on the client when it encounters a critical network error
  23. lastIOErr error
  24. // conn create time
  25. createdAt time.Time
  26. }
  27. func newIOErrConn(c Conn) *ioErrConn {
  28. return &ioErrConn{Conn: c, createdAt: time.Now()}
  29. }
  30. func (ioc *ioErrConn) Encode(m resp.Marshaler) error {
  31. if ioc.lastIOErr != nil {
  32. return ioc.lastIOErr
  33. }
  34. err := ioc.Conn.Encode(m)
  35. if nerr := net.Error(nil); errors.As(err, &nerr) {
  36. ioc.lastIOErr = err
  37. }
  38. return err
  39. }
  40. func (ioc *ioErrConn) Decode(m resp.Unmarshaler) error {
  41. if ioc.lastIOErr != nil {
  42. return ioc.lastIOErr
  43. }
  44. err := ioc.Conn.Decode(m)
  45. if nerr := net.Error(nil); errors.As(err, &nerr) {
  46. ioc.lastIOErr = err
  47. } else if err != nil && !errors.As(err, new(resp.ErrDiscarded)) {
  48. ioc.lastIOErr = err
  49. }
  50. return err
  51. }
  52. func (ioc *ioErrConn) Do(a Action) error {
  53. return a.Run(ioc)
  54. }
  55. func (ioc *ioErrConn) Close() error {
  56. ioc.lastIOErr = io.EOF
  57. return ioc.Conn.Close()
  58. }
  59. func (ioc *ioErrConn) expired(timeout time.Duration) bool {
  60. if timeout <= 0 {
  61. return false
  62. }
  63. return time.Since(ioc.createdAt) >= timeout
  64. }
  65. ////////////////////////////////////////////////////////////////////////////////
  66. type poolOpts struct {
  67. cf ConnFunc
  68. pingInterval time.Duration
  69. refillInterval time.Duration
  70. overflowDrainInterval time.Duration
  71. overflowSize int
  72. onEmptyWait time.Duration
  73. errOnEmpty error
  74. pipelineConcurrency int
  75. pipelineLimit int
  76. pipelineWindow time.Duration
  77. pt trace.PoolTrace
  78. maxLifetime time.Duration // maximum amount of time a connection may be reused
  79. }
  80. // PoolOpt is an optional behavior which can be applied to the NewPool function
  81. // to effect a Pool's behavior.
  82. type PoolOpt func(*poolOpts)
  83. // PoolMaxLifetime sets the maximum amount of time a connection may be reused.
  84. // Expired connections may be closed lazily before reuse.
  85. //
  86. // If d <= 0, connections are not closed due to a connection's age.
  87. func PoolMaxLifetime(d time.Duration) PoolOpt {
  88. return func(po *poolOpts) {
  89. po.maxLifetime = d
  90. }
  91. }
  92. // PoolConnFunc tells the Pool to use the given ConnFunc when creating new
  93. // Conns to its redis instance. The ConnFunc can be used to set timeouts,
  94. // perform AUTH, or even use custom Conn implementations.
  95. func PoolConnFunc(cf ConnFunc) PoolOpt {
  96. return func(po *poolOpts) {
  97. po.cf = cf
  98. }
  99. }
  100. // PoolPingInterval specifies the interval at which a ping event happens. On
  101. // each ping event the Pool calls the PING redis command over one of it's
  102. // available connections.
  103. //
  104. // Since connections are used in LIFO order, the ping interval * pool size is
  105. // the duration of time it takes to ping every connection once when the pool is
  106. // idle.
  107. //
  108. // A shorter interval means connections are pinged more frequently, but also
  109. // means more traffic with the server.
  110. func PoolPingInterval(d time.Duration) PoolOpt {
  111. return func(po *poolOpts) {
  112. po.pingInterval = d
  113. }
  114. }
  115. // PoolRefillInterval specifies the interval at which a refill event happens. On
  116. // each refill event the Pool checks to see if it is full, and if it's not a
  117. // single connection is created and added to it.
  118. func PoolRefillInterval(d time.Duration) PoolOpt {
  119. return func(po *poolOpts) {
  120. po.refillInterval = d
  121. }
  122. }
  123. // PoolOnEmptyWait effects the Pool's behavior when there are no available
  124. // connections in the Pool. The effect is to cause actions to block as long as
  125. // it takes until a connection becomes available.
  126. func PoolOnEmptyWait() PoolOpt {
  127. return func(po *poolOpts) {
  128. po.onEmptyWait = -1
  129. }
  130. }
  131. // PoolOnEmptyCreateAfter effects the Pool's behavior when there are no
  132. // available connections in the Pool. The effect is to cause actions to block
  133. // until a connection becomes available or until the duration has passed. If the
  134. // duration is passed a new connection is created and used.
  135. //
  136. // If wait is 0 then a new connection is created immediately upon an empty Pool.
  137. func PoolOnEmptyCreateAfter(wait time.Duration) PoolOpt {
  138. return func(po *poolOpts) {
  139. po.onEmptyWait = wait
  140. po.errOnEmpty = nil
  141. }
  142. }
  143. // PoolOnEmptyErrAfter effects the Pool's behavior when there are no
  144. // available connections in the Pool. The effect is to cause actions to block
  145. // until a connection becomes available or until the duration has passed. If the
  146. // duration is passed then ErrEmptyPool is returned.
  147. //
  148. // If wait is 0 then ErrEmptyPool is returned immediately upon an empty Pool.
  149. func PoolOnEmptyErrAfter(wait time.Duration) PoolOpt {
  150. return func(po *poolOpts) {
  151. po.onEmptyWait = wait
  152. po.errOnEmpty = ErrPoolEmpty
  153. }
  154. }
  155. // PoolOnFullClose effects the Pool's behavior when it is full. The effect is to
  156. // cause any connection which is being put back into a full pool to be closed
  157. // and discarded.
  158. func PoolOnFullClose() PoolOpt {
  159. return func(po *poolOpts) {
  160. po.overflowSize = 0
  161. po.overflowDrainInterval = 0
  162. }
  163. }
  164. // PoolOnFullBuffer effects the Pool's behavior when it is full. The effect is
  165. // to give the pool an additional buffer for connections, called the overflow.
  166. // If a connection is being put back into a full pool it will be put into the
  167. // overflow. If the overflow is also full then the connection will be closed and
  168. // discarded.
  169. //
  170. // drainInterval specifies the interval at which a drain event happens. On each
  171. // drain event a connection will be removed from the overflow buffer (if any are
  172. // present in it), closed, and discarded.
  173. //
  174. // If drainInterval is zero then drain events will never occur.
  175. //
  176. // NOTE that if used with PoolOnEmptyWait or PoolOnEmptyErrAfter this won't have
  177. // any effect, because there won't be any occasion where more connections than
  178. // the pool size will be created.
  179. func PoolOnFullBuffer(size int, drainInterval time.Duration) PoolOpt {
  180. return func(po *poolOpts) {
  181. po.overflowSize = size
  182. po.overflowDrainInterval = drainInterval
  183. }
  184. }
  185. // PoolPipelineConcurrency sets the maximum number of pipelines that can be
  186. // executed concurrently.
  187. //
  188. // If limit is greater than the pool size or less than 1, the limit will be
  189. // set to the pool size.
  190. func PoolPipelineConcurrency(limit int) PoolOpt {
  191. return func(po *poolOpts) {
  192. po.pipelineConcurrency = limit
  193. }
  194. }
  195. // PoolPipelineWindow sets the duration after which internal pipelines will be
  196. // flushed and the maximum number of commands that can be pipelined before
  197. // flushing.
  198. //
  199. // If window is zero then implicit pipelining will be disabled.
  200. // If limit is zero then no limit will be used and pipelines will only be limited
  201. // by the specified time window.
  202. func PoolPipelineWindow(window time.Duration, limit int) PoolOpt {
  203. return func(po *poolOpts) {
  204. po.pipelineLimit = limit
  205. po.pipelineWindow = window
  206. }
  207. }
  208. // PoolWithTrace tells the Pool to trace itself with the given PoolTrace
  209. // Note that PoolTrace will block every point that you set to trace.
  210. func PoolWithTrace(pt trace.PoolTrace) PoolOpt {
  211. return func(po *poolOpts) {
  212. po.pt = pt
  213. }
  214. }
  215. ////////////////////////////////////////////////////////////////////////////////
  216. // Pool is a dynamic connection pool which implements the Client interface. It
  217. // takes in a number of options which can effect its specific behavior; see the
  218. // NewPool method.
  219. //
  220. // Pool is dynamic in that it can create more connections on-the-fly to handle
  221. // increased load. The maximum number of extra connections (if any) can be
  222. // configured, along with how long they are kept after load has returned to
  223. // normal.
  224. //
  225. // Pool also takes advantage of implicit pipelining. If multiple commands are
  226. // being performed simultaneously, then Pool will write them all to a single
  227. // connection using a single system call, and read all their responses together
  228. // using another single system call. Implicit pipelining significantly improves
  229. // performance during high-concurrency usage, at the expense of slightly worse
  230. // performance during low-concurrency usage. It can be disabled using
  231. // PoolPipelineWindow(0, 0).
  232. type Pool struct {
  233. // Atomic fields must be at the beginning of the struct since they must be
  234. // correctly aligned or else access may cause panics on 32-bit architectures
  235. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  236. totalConns int64 // atomic, must only be access using functions from sync/atomic
  237. opts poolOpts
  238. network, addr string
  239. size int
  240. l sync.RWMutex
  241. // pool is read-protected by l, and should not be written to or read from
  242. // when closed is true (closed is also protected by l)
  243. pool chan *ioErrConn
  244. closed bool
  245. pipeliner *pipeliner
  246. wg sync.WaitGroup
  247. closeCh chan bool
  248. initDone chan struct{} // used for tests
  249. // Any errors encountered internally will be written to this channel. If
  250. // nothing is reading the channel the errors will be dropped. The channel
  251. // will be closed when Close is called.
  252. ErrCh chan error
  253. }
  254. // NewPool creates a *Pool which will keep open at least the given number of
  255. // connections to the redis instance at the given address.
  256. //
  257. // NewPool takes in a number of options which can overwrite its default
  258. // behavior. The default options NewPool uses are:
  259. //
  260. // PoolConnFunc(DefaultConnFunc)
  261. // PoolOnEmptyCreateAfter(1 * time.Second)
  262. // PoolRefillInterval(1 * time.Second)
  263. // PoolOnFullBuffer((size / 3)+1, 1 * time.Second)
  264. // PoolPingInterval(5 * time.Second / (size+1))
  265. // PoolPipelineConcurrency(size)
  266. // PoolPipelineWindow(150 * time.Microsecond, 0)
  267. //
  268. // The recommended size of the pool depends on the number of concurrent
  269. // goroutines that will use the pool and whether implicit pipelining is
  270. // enabled or not.
  271. //
  272. // As a general rule, when implicit pipelining is enabled (the default)
  273. // the size of the pool can be kept low without problems to reduce resource
  274. // and file descriptor usage.
  275. //
  276. func NewPool(network, addr string, size int, opts ...PoolOpt) (*Pool, error) {
  277. p := &Pool{
  278. network: network,
  279. addr: addr,
  280. size: size,
  281. closeCh: make(chan bool),
  282. initDone: make(chan struct{}),
  283. ErrCh: make(chan error, 1),
  284. }
  285. defaultPoolOpts := []PoolOpt{
  286. PoolConnFunc(DefaultConnFunc),
  287. PoolOnEmptyCreateAfter(1 * time.Second),
  288. PoolRefillInterval(1 * time.Second),
  289. PoolOnFullBuffer((size/3)+1, 1*time.Second),
  290. PoolPingInterval(5 * time.Second / time.Duration(size+1)),
  291. PoolPipelineConcurrency(size),
  292. // NOTE if 150us is changed the benchmarks need to be updated too
  293. PoolPipelineWindow(150*time.Microsecond, 0),
  294. }
  295. for _, opt := range append(defaultPoolOpts, opts...) {
  296. // the other args to NewPool used to be a ConnFunc, which someone might
  297. // have left as nil, in which case this now gives a weird panic. Just
  298. // handle it
  299. if opt != nil {
  300. opt(&(p.opts))
  301. }
  302. }
  303. totalSize := size + p.opts.overflowSize
  304. p.pool = make(chan *ioErrConn, totalSize)
  305. // make one Conn synchronously to ensure there's actually a redis instance
  306. // present. The rest will be created asynchronously.
  307. ioc, err := p.newConn(trace.PoolConnCreatedReasonInitialization)
  308. if err != nil {
  309. return nil, err
  310. }
  311. p.put(ioc)
  312. p.wg.Add(1)
  313. go func() {
  314. startTime := time.Now()
  315. defer p.wg.Done()
  316. for i := 0; i < size-1; i++ {
  317. ioc, err := p.newConn(trace.PoolConnCreatedReasonInitialization)
  318. if err != nil {
  319. p.err(err)
  320. // if there was an error connecting to the instance than it
  321. // might need a little breathing room, redis can sometimes get
  322. // sad if too many connections are created simultaneously.
  323. time.Sleep(100 * time.Millisecond)
  324. continue
  325. } else if !p.put(ioc) {
  326. // if the connection wasn't put in it could be for two reasons:
  327. // - the Pool has already started being used and is full.
  328. // - Close was called.
  329. // in any case, bail
  330. break
  331. }
  332. }
  333. close(p.initDone)
  334. p.traceInitCompleted(time.Since(startTime))
  335. }()
  336. // needs to be created before starting any background goroutines to avoid
  337. // races on p.pipeliner access
  338. if p.opts.pipelineWindow > 0 {
  339. if p.opts.pipelineConcurrency < 1 || p.opts.pipelineConcurrency > size {
  340. p.opts.pipelineConcurrency = size
  341. }
  342. p.pipeliner = newPipeliner(
  343. p,
  344. p.opts.pipelineConcurrency,
  345. p.opts.pipelineLimit,
  346. p.opts.pipelineWindow,
  347. )
  348. }
  349. if p.opts.pingInterval > 0 && size > 0 {
  350. p.atIntervalDo(p.opts.pingInterval, func() {
  351. // don't worry about the return value, the whole point is to find
  352. // erroring connections
  353. _ = p.Do(Cmd(nil, "PING"))
  354. })
  355. }
  356. if p.opts.refillInterval > 0 && size > 0 {
  357. p.atIntervalDo(p.opts.refillInterval, p.doRefill)
  358. }
  359. if p.opts.overflowSize > 0 && p.opts.overflowDrainInterval > 0 {
  360. p.atIntervalDo(p.opts.overflowDrainInterval, p.doOverflowDrain)
  361. }
  362. return p, nil
  363. }
  364. func (p *Pool) traceInitCompleted(elapsedTime time.Duration) {
  365. if p.opts.pt.InitCompleted != nil {
  366. p.opts.pt.InitCompleted(trace.PoolInitCompleted{
  367. PoolCommon: p.traceCommon(),
  368. AvailCount: len(p.pool),
  369. ElapsedTime: elapsedTime,
  370. })
  371. }
  372. }
  373. func (p *Pool) err(err error) {
  374. select {
  375. case p.ErrCh <- err:
  376. default:
  377. }
  378. }
  379. func (p *Pool) traceCommon() trace.PoolCommon {
  380. return trace.PoolCommon{
  381. Network: p.network, Addr: p.addr,
  382. PoolSize: p.size, BufferSize: p.opts.overflowSize,
  383. }
  384. }
  385. func (p *Pool) traceConnCreated(connectTime time.Duration, reason trace.PoolConnCreatedReason, err error) {
  386. if p.opts.pt.ConnCreated != nil {
  387. p.opts.pt.ConnCreated(trace.PoolConnCreated{
  388. PoolCommon: p.traceCommon(),
  389. Reason: reason,
  390. ConnectTime: connectTime,
  391. Err: err,
  392. })
  393. }
  394. }
  395. func (p *Pool) traceConnClosed(reason trace.PoolConnClosedReason) {
  396. if p.opts.pt.ConnClosed != nil {
  397. p.opts.pt.ConnClosed(trace.PoolConnClosed{
  398. PoolCommon: p.traceCommon(),
  399. AvailCount: len(p.pool),
  400. Reason: reason,
  401. })
  402. }
  403. }
  404. func (p *Pool) newConn(reason trace.PoolConnCreatedReason) (*ioErrConn, error) {
  405. start := time.Now()
  406. c, err := p.opts.cf(p.network, p.addr)
  407. elapsed := time.Since(start)
  408. p.traceConnCreated(elapsed, reason, err)
  409. if err != nil {
  410. return nil, err
  411. }
  412. ioc := newIOErrConn(c)
  413. atomic.AddInt64(&p.totalConns, 1)
  414. return ioc, nil
  415. }
  416. func (p *Pool) atIntervalDo(d time.Duration, do func()) {
  417. p.wg.Add(1)
  418. go func() {
  419. defer p.wg.Done()
  420. t := time.NewTicker(d)
  421. defer t.Stop()
  422. for {
  423. select {
  424. case <-t.C:
  425. do()
  426. case <-p.closeCh:
  427. return
  428. }
  429. }
  430. }()
  431. }
  432. func (p *Pool) doRefill() {
  433. if atomic.LoadInt64(&p.totalConns) >= int64(p.size) {
  434. return
  435. }
  436. ioc, err := p.newConn(trace.PoolConnCreatedReasonRefill)
  437. if err == nil {
  438. p.put(ioc)
  439. } else if errors.Is(err, errPoolFull) {
  440. p.err(err)
  441. }
  442. }
  443. func (p *Pool) doOverflowDrain() {
  444. // the other do* processes inherently handle this case, this one needs to do
  445. // it manually
  446. p.l.RLock()
  447. if p.closed || len(p.pool) <= p.size {
  448. p.l.RUnlock()
  449. return
  450. }
  451. // pop a connection off and close it, if there's any to pop off
  452. var ioc *ioErrConn
  453. select {
  454. case ioc = <-p.pool:
  455. default:
  456. // pool is empty, nothing to drain
  457. }
  458. p.l.RUnlock()
  459. if ioc == nil {
  460. return
  461. }
  462. ioc.Close()
  463. p.traceConnClosed(trace.PoolConnClosedReasonBufferDrain)
  464. atomic.AddInt64(&p.totalConns, -1)
  465. }
  466. func (p *Pool) getExisting() (*ioErrConn, error) {
  467. // Fast-path if the pool is not empty. Return error if pool has been closed.
  468. for {
  469. select {
  470. case ioc, ok := <-p.pool:
  471. if !ok {
  472. return nil, errClientClosed
  473. }
  474. if ioc.expired(p.opts.maxLifetime) {
  475. ioc.Close()
  476. p.traceConnClosed(trace.PoolConnClosedReasonConnExpired)
  477. atomic.AddInt64(&p.totalConns, -1)
  478. continue
  479. }
  480. return ioc, nil
  481. default:
  482. }
  483. break // Failed to get from pool, so jump out to conduct for the next move.
  484. }
  485. if p.opts.onEmptyWait == 0 {
  486. // If we should not wait we return without allocating a timer.
  487. return nil, p.opts.errOnEmpty
  488. }
  489. // only set when we have a timeout, since a nil channel always blocks which
  490. // is what we want
  491. var tc <-chan time.Time
  492. if p.opts.onEmptyWait > 0 {
  493. t := getTimer(p.opts.onEmptyWait)
  494. defer putTimer(t)
  495. tc = t.C
  496. }
  497. for {
  498. select {
  499. case ioc, ok := <-p.pool:
  500. if !ok {
  501. return nil, errClientClosed
  502. }
  503. if ioc.expired(p.opts.maxLifetime) {
  504. ioc.Close()
  505. p.traceConnClosed(trace.PoolConnClosedReasonConnExpired)
  506. atomic.AddInt64(&p.totalConns, -1)
  507. continue
  508. }
  509. return ioc, nil
  510. case <-tc:
  511. return nil, p.opts.errOnEmpty
  512. }
  513. }
  514. }
  515. func (p *Pool) get() (*ioErrConn, error) {
  516. ioc, err := p.getExisting()
  517. if err != nil {
  518. return nil, err
  519. } else if ioc != nil {
  520. return ioc, nil
  521. }
  522. return p.newConn(trace.PoolConnCreatedReasonPoolEmpty)
  523. }
  524. // returns true if the connection was put back, false if it was closed and
  525. // discarded.
  526. func (p *Pool) put(ioc *ioErrConn) bool {
  527. p.l.RLock()
  528. var expired bool
  529. if ioc.lastIOErr == nil && !p.closed {
  530. if expired = ioc.expired(p.opts.maxLifetime); !expired {
  531. select {
  532. case p.pool <- ioc:
  533. p.l.RUnlock()
  534. return true
  535. default:
  536. }
  537. }
  538. }
  539. p.l.RUnlock()
  540. // the pool might close here, but that's fine, because all that's happening
  541. // at this point is that the connection is being closed
  542. ioc.Close()
  543. if expired {
  544. p.traceConnClosed(trace.PoolConnClosedReasonConnExpired)
  545. } else {
  546. p.traceConnClosed(trace.PoolConnClosedReasonPoolFull)
  547. }
  548. atomic.AddInt64(&p.totalConns, -1)
  549. return false
  550. }
  551. // Do implements the Do method of the Client interface by retrieving a Conn out
  552. // of the pool, calling Run on the given Action with it, and returning the Conn
  553. // to the pool.
  554. //
  555. // If the given Action is a CmdAction, it will be pipelined with other concurrent
  556. // calls to Do, which can improve the performance and resource usage of the Redis
  557. // server, but will increase the latency for some of the Actions. To avoid the
  558. // implicit pipelining you can either set PoolPipelineWindow(0, 0) when creating the
  559. // Pool or use WithConn. Pipelines created manually (via Pipeline) are also excluded
  560. // from this and will be executed as if using WithConn.
  561. //
  562. // Due to a limitation in the implementation, custom CmdAction implementations
  563. // are currently not automatically pipelined.
  564. func (p *Pool) Do(a Action) error {
  565. startTime := time.Now()
  566. if p.pipeliner != nil && p.pipeliner.CanDo(a) {
  567. err := p.pipeliner.Do(a)
  568. p.traceDoCompleted(time.Since(startTime), err)
  569. return err
  570. }
  571. c, err := p.get()
  572. if err != nil {
  573. return err
  574. }
  575. err = c.Do(a)
  576. p.put(c)
  577. p.traceDoCompleted(time.Since(startTime), err)
  578. return err
  579. }
  580. func (p *Pool) traceDoCompleted(elapsedTime time.Duration, err error) {
  581. if p.opts.pt.DoCompleted != nil {
  582. p.opts.pt.DoCompleted(trace.PoolDoCompleted{
  583. PoolCommon: p.traceCommon(),
  584. AvailCount: len(p.pool),
  585. ElapsedTime: elapsedTime,
  586. Err: err,
  587. })
  588. }
  589. }
  590. // NumAvailConns returns the number of connections currently available in the
  591. // pool, as well as in the overflow buffer if that option is enabled.
  592. func (p *Pool) NumAvailConns() int {
  593. return len(p.pool)
  594. }
  595. // Close implements the Close method of the Client.
  596. func (p *Pool) Close() error {
  597. p.l.Lock()
  598. if p.closed {
  599. p.l.Unlock()
  600. return errClientClosed
  601. }
  602. p.closed = true
  603. close(p.closeCh)
  604. // at this point get and put won't work anymore, so it's safe to empty and
  605. // close the pool channel
  606. emptyLoop:
  607. for {
  608. select {
  609. case ioc := <-p.pool:
  610. ioc.Close()
  611. atomic.AddInt64(&p.totalConns, -1)
  612. p.traceConnClosed(trace.PoolConnClosedReasonPoolClosed)
  613. default:
  614. close(p.pool)
  615. break emptyLoop
  616. }
  617. }
  618. p.l.Unlock()
  619. if p.pipeliner != nil {
  620. if err := p.pipeliner.Close(); err != nil {
  621. return err
  622. }
  623. }
  624. // by now the pool's go-routines should have bailed, wait to make sure they
  625. // do
  626. p.wg.Wait()
  627. close(p.ErrCh)
  628. return nil
  629. }