123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776 |
- package radix
- import (
- "fmt"
- "reflect"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "errors"
- "github.com/mediocregopher/radix/v3/resp"
- "github.com/mediocregopher/radix/v3/resp/resp2"
- "github.com/mediocregopher/radix/v3/trace"
- )
- // dedupe is used to deduplicate a function invocation, so if multiple
- // go-routines call it at the same time only the first will actually run it, and
- // the others will block until that one is done.
- type dedupe struct {
- l sync.Mutex
- s *sync.Once
- }
- func newDedupe() *dedupe {
- return &dedupe{s: new(sync.Once)}
- }
- func (d *dedupe) do(fn func()) {
- d.l.Lock()
- s := d.s
- d.l.Unlock()
- s.Do(func() {
- fn()
- d.l.Lock()
- d.s = new(sync.Once)
- d.l.Unlock()
- })
- }
- ////////////////////////////////////////////////////////////////////////////////
- // ClusterCanRetryAction is an Action which is aware of Cluster's retry behavior
- // in the event of a slot migration. If an Action receives an error from a
- // Cluster node which is either MOVED or ASK, and that Action implements
- // ClusterCanRetryAction, and the ClusterCanRetry method returns true, then the
- // Action will be retried on the correct node.
- //
- // NOTE that the Actions which are returned by Cmd, FlatCmd, and EvalScript.Cmd
- // all implicitly implement this interface.
- type ClusterCanRetryAction interface {
- Action
- ClusterCanRetry() bool
- }
- ////////////////////////////////////////////////////////////////////////////////
- type clusterOpts struct {
- pf ClientFunc
- clusterDownWait time.Duration
- syncEvery time.Duration
- ct trace.ClusterTrace
- initAllowUnavailable bool
- }
- // ClusterOpt is an optional behavior which can be applied to the NewCluster
- // function to effect a Cluster's behavior.
- type ClusterOpt func(*clusterOpts)
- // ClusterPoolFunc tells the Cluster to use the given ClientFunc when creating
- // pools of connections to cluster members.
- //
- // This can be used to allow for secondary reads via the Cluster.DoSecondary
- // method by specifying a ClientFunc that internally creates connections using
- // DefaultClusterConnFunc or a custom ConnFunc that enables READONLY mode on each
- // connection.
- func ClusterPoolFunc(pf ClientFunc) ClusterOpt {
- return func(co *clusterOpts) {
- co.pf = pf
- }
- }
- // ClusterSyncEvery tells the Cluster to synchronize itself with the cluster's
- // topology at the given interval. On every synchronization Cluster will ask the
- // cluster for its topology and make/destroy its connections as necessary.
- func ClusterSyncEvery(d time.Duration) ClusterOpt {
- return func(co *clusterOpts) {
- co.syncEvery = d
- }
- }
- // ClusterOnDownDelayActionsBy tells the Cluster to delay all commands by the given
- // duration while the cluster is seen to be in the CLUSTERDOWN state. This
- // allows fewer actions to be affected by brief outages, e.g. during a failover.
- //
- // If the given duration is 0 then Cluster will not delay actions during the
- // CLUSTERDOWN state. Note that calls to Sync will not be delayed regardless
- // of this option.
- func ClusterOnDownDelayActionsBy(d time.Duration) ClusterOpt {
- return func(co *clusterOpts) {
- co.clusterDownWait = d
- }
- }
- // ClusterWithTrace tells the Cluster to trace itself with the given
- // ClusterTrace. Note that ClusterTrace will block every point that you set to
- // trace.
- func ClusterWithTrace(ct trace.ClusterTrace) ClusterOpt {
- return func(co *clusterOpts) {
- co.ct = ct
- }
- }
- // ClusterOnInitAllowUnavailable tells NewCluster to succeed
- // and not return an error as long as at least one redis instance
- // in the cluster can be successfully connected to.
- func ClusterOnInitAllowUnavailable(initAllowUnavailable bool) ClusterOpt {
- return func(co *clusterOpts) {
- co.initAllowUnavailable = initAllowUnavailable
- }
- }
- // Cluster contains all information about a redis cluster needed to interact
- // with it, including a set of pools to each of its instances. All methods on
- // Cluster are thread-safe.
- type Cluster struct {
- // Atomic fields must be at the beginning of the struct since they must be
- // correctly aligned or else access may cause panics on 32-bit architectures
- // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
- lastClusterdown int64 // unix timestamp in milliseconds, atomic
- co clusterOpts
- // used to deduplicate calls to sync
- syncDedupe *dedupe
- l sync.RWMutex
- pools map[string]Client
- primTopo, topo ClusterTopo
- secondaries map[string]map[string]ClusterNode
- closeCh chan struct{}
- closeWG sync.WaitGroup
- closeOnce sync.Once
- // Any errors encountered internally will be written to this channel. If
- // nothing is reading the channel the errors will be dropped. The channel
- // will be closed when the Close method is called.
- ErrCh chan error
- }
- // DefaultClusterConnFunc is a ConnFunc which will return a Conn for a node in a
- // redis cluster using sane defaults and which has READONLY mode enabled, allowing
- // read-only commands on the connection even if the connected instance is currently
- // a replica, either by explicitly sending commands on the connection or by using
- // the DoSecondary method on the Cluster that owns the connection.
- var DefaultClusterConnFunc = func(network, addr string) (Conn, error) {
- c, err := DefaultConnFunc(network, addr)
- if err != nil {
- return nil, err
- } else if err := c.Do(Cmd(nil, "READONLY")); err != nil {
- c.Close()
- return nil, err
- }
- return c, nil
- }
- // NewCluster initializes and returns a Cluster instance. It will try every
- // address given until it finds a usable one. From there it uses CLUSTER SLOTS
- // to discover the cluster topology and make all the necessary connections.
- //
- // NewCluster takes in a number of options which can overwrite its default
- // behavior. The default options NewCluster uses are:
- //
- // ClusterPoolFunc(DefaultClientFunc)
- // ClusterSyncEvery(5 * time.Second)
- // ClusterOnDownDelayActionsBy(100 * time.Millisecond)
- //
- func NewCluster(clusterAddrs []string, opts ...ClusterOpt) (*Cluster, error) {
- c := &Cluster{
- syncDedupe: newDedupe(),
- pools: map[string]Client{},
- closeCh: make(chan struct{}),
- ErrCh: make(chan error, 1),
- }
- defaultClusterOpts := []ClusterOpt{
- ClusterPoolFunc(DefaultClientFunc),
- ClusterSyncEvery(5 * time.Second),
- ClusterOnDownDelayActionsBy(100 * time.Millisecond),
- }
- for _, opt := range append(defaultClusterOpts, opts...) {
- // the other args to NewCluster used to be a ClientFunc, which someone
- // might have left as nil, in which case this now gives a weird panic.
- // Just handle it
- if opt != nil {
- opt(&(c.co))
- }
- }
- var err error
- // make a pool to base the cluster on
- for _, addr := range clusterAddrs {
- var p Client
- if p, err = c.co.pf("tcp", addr); err != nil {
- continue
- }
- c.pools[addr] = p
- break
- }
- if len(c.pools) == 0 {
- return nil, fmt.Errorf("could not connect to any redis instances, last error was: %w", err)
- }
- p, err := c.pool("")
- if err != nil {
- for _, p := range c.pools {
- p.Close()
- }
- return nil, err
- }
- if err := c.sync(p, c.co.initAllowUnavailable); err != nil {
- for _, p := range c.pools {
- p.Close()
- }
- return nil, err
- }
- c.syncEvery(c.co.syncEvery)
- return c, nil
- }
- func (c *Cluster) err(err error) {
- select {
- case c.ErrCh <- err:
- default:
- }
- }
- func assertKeysSlot(keys []string) error {
- var ok bool
- var prevKey string
- var slot uint16
- for _, key := range keys {
- thisSlot := ClusterSlot([]byte(key))
- if !ok {
- ok = true
- } else if slot != thisSlot {
- return fmt.Errorf("keys %q and %q do not belong to the same slot", prevKey, key)
- }
- prevKey = key
- slot = thisSlot
- }
- return nil
- }
- // may return nil, nil if no pool for the addr.
- func (c *Cluster) rpool(addr string) (Client, error) {
- c.l.RLock()
- defer c.l.RUnlock()
- if addr == "" {
- for _, p := range c.pools {
- return p, nil
- }
- return nil, errors.New("no pools available")
- } else if p, ok := c.pools[addr]; ok {
- return p, nil
- }
- return nil, nil
- }
- var errUnknownAddress = errors.New("unknown address")
- // Client returns a Client for the given address, which could be either the
- // primary or one of the secondaries (see Topo method for retrieving known
- // addresses).
- //
- // NOTE that if there is a failover while a Client returned by this method is
- // being used the Client may or may not continue to work as expected, depending
- // on the nature of the failover.
- //
- // NOTE the Client should _not_ be closed.
- func (c *Cluster) Client(addr string) (Client, error) {
- // rpool allows the address to be "", handle that case manually
- if addr == "" {
- return nil, errUnknownAddress
- }
- cl, err := c.rpool(addr)
- if err != nil {
- return nil, err
- } else if cl == nil {
- return nil, errUnknownAddress
- }
- return cl, nil
- }
- // if addr is "" returns a random pool. If addr is given but there's no pool for
- // it one will be created on-the-fly.
- func (c *Cluster) pool(addr string) (Client, error) {
- p, err := c.rpool(addr)
- if p != nil || err != nil {
- return p, err
- }
- // if the pool isn't available make it on-the-fly. This behavior isn't
- // _great_, but theoretically the syncEvery process should clean up any
- // extraneous pools which aren't really needed
- // it's important that the cluster pool set isn't locked while this is
- // happening, because this could block for a while
- if p, err = c.co.pf("tcp", addr); err != nil {
- return nil, err
- }
- // we've made a new pool, but we need to double-check someone else didn't
- // make one at the same time and add it in first. If they did, close this
- // one and return that one
- c.l.Lock()
- if p2, ok := c.pools[addr]; ok {
- c.l.Unlock()
- p.Close()
- return p2, nil
- }
- c.pools[addr] = p
- c.l.Unlock()
- return p, nil
- }
- // Topo returns the Cluster's topology as it currently knows it. See
- // ClusterTopo's docs for more on its default order.
- func (c *Cluster) Topo() ClusterTopo {
- c.l.RLock()
- defer c.l.RUnlock()
- return c.topo
- }
- func (c *Cluster) getTopo(p Client) (ClusterTopo, error) {
- var tt ClusterTopo
- err := p.Do(Cmd(&tt, "CLUSTER", "SLOTS"))
- if len(tt) == 0 && err == nil {
- //This will happen between when nodes starts coming up after cluster goes down and
- //Cluster swarm yet not ready using those nodes.
- err = errors.New("no cluster slots assigned")
- }
- return tt, err
- }
- // Sync will synchronize the Cluster with the actual cluster, making new pools
- // to new instances and removing ones from instances no longer in the cluster.
- // This will be called periodically automatically, but you can manually call it
- // at any time as well.
- func (c *Cluster) Sync() error {
- p, err := c.pool("")
- if err != nil {
- return err
- }
- c.syncDedupe.do(func() {
- err = c.sync(p, false)
- })
- return err
- }
- func nodeInfoFromNode(node ClusterNode) trace.ClusterNodeInfo {
- return trace.ClusterNodeInfo{
- Addr: node.Addr,
- Slots: node.Slots,
- IsPrimary: node.SecondaryOfAddr == "",
- }
- }
- func (c *Cluster) traceTopoChanged(prevTopo ClusterTopo, newTopo ClusterTopo) {
- if c.co.ct.TopoChanged != nil {
- var addedNodes []trace.ClusterNodeInfo
- var removedNodes []trace.ClusterNodeInfo
- var changedNodes []trace.ClusterNodeInfo
- prevTopoMap := prevTopo.Map()
- newTopoMap := newTopo.Map()
- for addr, newNode := range newTopoMap {
- if prevNode, ok := prevTopoMap[addr]; ok {
- // Check whether two nodes which have the same address changed its value or not
- if !reflect.DeepEqual(prevNode, newNode) {
- changedNodes = append(changedNodes, nodeInfoFromNode(newNode))
- }
- // No need to handle this address for finding removed nodes
- delete(prevTopoMap, addr)
- } else {
- // The node's address not found from prevTopo is newly added node
- addedNodes = append(addedNodes, nodeInfoFromNode(newNode))
- }
- }
- // Find removed nodes, prevTopoMap has reduced
- for addr, prevNode := range prevTopoMap {
- if _, ok := newTopoMap[addr]; !ok {
- removedNodes = append(removedNodes, nodeInfoFromNode(prevNode))
- }
- }
- // Callback when any changes detected
- if len(addedNodes) != 0 || len(removedNodes) != 0 || len(changedNodes) != 0 {
- c.co.ct.TopoChanged(trace.ClusterTopoChanged{
- Added: addedNodes,
- Removed: removedNodes,
- Changed: changedNodes,
- })
- }
- }
- }
- // while this method is normally deduplicated by the Sync method's use of
- // dedupe it is perfectly thread-safe on its own and can be used whenever.
- func (c *Cluster) sync(p Client, silenceFlag bool) error {
- tt, err := c.getTopo(p)
- if err != nil {
- return err
- }
- for _, t := range tt {
- // call pool just to ensure one exists for this addr
- if _, err := c.pool(t.Addr); err != nil {
- if silenceFlag {
- continue
- } else {
- return fmt.Errorf("error connecting to %s: %w", t.Addr, err)
- }
- }
- }
- c.traceTopoChanged(c.topo, tt)
- var toclose []Client
- func() {
- c.l.Lock()
- defer c.l.Unlock()
- c.topo = tt
- c.primTopo = tt.Primaries()
- c.secondaries = make(map[string]map[string]ClusterNode, len(c.primTopo))
- for _, node := range c.topo {
- if node.SecondaryOfAddr != "" {
- m := c.secondaries[node.SecondaryOfAddr]
- if m == nil {
- m = make(map[string]ClusterNode, len(c.topo)/len(c.primTopo))
- c.secondaries[node.SecondaryOfAddr] = m
- }
- m[node.Addr] = node
- }
- }
- tm := tt.Map()
- for addr, p := range c.pools {
- if _, ok := tm[addr]; !ok {
- toclose = append(toclose, p)
- delete(c.pools, addr)
- }
- }
- }()
- for _, p := range toclose {
- p.Close()
- }
- return nil
- }
- func (c *Cluster) syncEvery(d time.Duration) {
- c.closeWG.Add(1)
- go func() {
- defer c.closeWG.Done()
- t := time.NewTicker(d)
- defer t.Stop()
- for {
- select {
- case <-t.C:
- if err := c.Sync(); err != nil {
- c.err(err)
- }
- case <-c.closeCh:
- return
- }
- }
- }()
- }
- // v3.8.5 add the getting master node without lock to fix the fix deadlock.
- func (c *Cluster) addrForKeyWithNoLock(key string) string {
- s := ClusterSlot([]byte(key))
- for _, t := range c.primTopo {
- for _, slot := range t.Slots {
- if s >= slot[0] && s < slot[1] {
- return t.Addr
- }
- }
- }
- return ""
- }
- func (c *Cluster) addrForKey(key string) string {
- c.l.RLock()
- defer c.l.RUnlock()
- return c.addrForKeyWithNoLock(key)
- }
- func (c *Cluster) secondaryAddrForKey(key string) string {
- c.l.RLock()
- defer c.l.RUnlock()
- primAddr := c.addrForKeyWithNoLock(key)
- for addr := range c.secondaries[primAddr] {
- return addr
- }
- return primAddr
- }
- type askConn struct {
- Conn
- }
- func (ac askConn) Encode(m resp.Marshaler) error {
- if err := ac.Conn.Encode(Cmd(nil, "ASKING")); err != nil {
- return err
- }
- return ac.Conn.Encode(m)
- }
- func (ac askConn) Decode(um resp.Unmarshaler) error {
- if err := ac.Conn.Decode(resp2.Any{}); err != nil {
- return err
- }
- return ac.Conn.Decode(um)
- }
- func (ac askConn) Do(a Action) error {
- return a.Run(ac)
- }
- const doAttempts = 5
- // Do performs an Action on a redis instance in the cluster, with the instance
- // being determeined by the key returned from the Action's Key() method.
- //
- // This method handles MOVED and ASK errors automatically in most cases, see
- // ClusterCanRetryAction's docs for more.
- func (c *Cluster) Do(a Action) error {
- var addr, key string
- keys := a.Keys()
- if len(keys) == 0 {
- // that's ok, key will then just be ""
- } else if err := assertKeysSlot(keys); err != nil {
- return err
- } else {
- key = keys[0]
- addr = c.addrForKey(key)
- }
- return c.doInner(a, addr, key, false, doAttempts)
- }
- // DoSecondary is like Do but executes the Action on a random secondary for the affected keys.
- //
- // For DoSecondary to work, all connections must be created in read-only mode, by using a
- // custom ClusterPoolFunc that executes the READONLY command on each new connection.
- //
- // See ClusterPoolFunc for an example using the global DefaultClusterConnFunc.
- //
- // If the Action can not be handled by a secondary the Action will be send to the primary instead.
- func (c *Cluster) DoSecondary(a Action) error {
- var addr, key string
- keys := a.Keys()
- if len(keys) == 0 {
- // that's ok, key will then just be ""
- } else if err := assertKeysSlot(keys); err != nil {
- return err
- } else {
- key = keys[0]
- addr = c.secondaryAddrForKey(key)
- }
- return c.doInner(a, addr, key, false, doAttempts)
- }
- func (c *Cluster) getClusterDownSince() int64 {
- return atomic.LoadInt64(&c.lastClusterdown)
- }
- func (c *Cluster) setClusterDown(down bool) (changed bool) {
- // There is a race when calling this method concurrently when the cluster
- // healed after being down.
- //
- // If we have 2 goroutines, one that sends a command before the cluster
- // heals and once that sends a command after the cluster healed, both
- // goroutines will call this method, but with different values
- // (down == true and down == false).
- //
- // Since there is bi ordering between the two goroutines, it can happen
- // that the call to setClusterDown in the second goroutine runs before
- // the call in the first goroutine. In that case the state would be
- // changed from down to up by the second goroutine, as it should, only
- // for the first goroutine to set it back to down a few microseconds later.
- //
- // If this happens other commands will be needlessly delayed until
- // another goroutine sets the state to up again and we will trace two
- // unnecessary state transitions.
- //
- // We can not reliably avoid this race without more complex tracking of
- // previous states, which would be rather complex and possibly expensive.
- // Swapping values is expensive (on amd64, an uncontended swap can be 10x
- // slower than a load) and can easily become quite contended when we have
- // many goroutines trying to update the value concurrently, which would
- // slow it down even more.
- //
- // We avoid the overhead of swapping when not necessary by loading the
- // value first and checking if the value is already what we want it to be.
- //
- // Since atomic loads are fast (on amd64 an atomic load can be as fast as
- // a non-atomic load, and is perfectly scalable as long as there are no
- // writes to the same cache line), we can safely do this without adding
- // unnecessary extra latency.
- prevVal := atomic.LoadInt64(&c.lastClusterdown)
- var newVal int64
- if down {
- newVal = time.Now().UnixNano() / 1000 / 1000
- // Since the exact value is only used for delaying commands small
- // differences don't matter much and we can avoid many updates by
- // ignoring small differences (<5ms).
- if prevVal != 0 && newVal-prevVal < 5 {
- return false
- }
- } else {
- if prevVal == 0 {
- return false
- }
- }
- prevVal = atomic.SwapInt64(&c.lastClusterdown, newVal)
- changed = (prevVal == 0 && newVal != 0) || (prevVal != 0 && newVal == 0)
- if changed && c.co.ct.StateChange != nil {
- c.co.ct.StateChange(trace.ClusterStateChange{IsDown: down})
- }
- return changed
- }
- func (c *Cluster) traceRedirected(addr, key string, moved, ask bool, count int, final bool) {
- if c.co.ct.Redirected != nil {
- c.co.ct.Redirected(trace.ClusterRedirected{
- Addr: addr,
- Key: key,
- Moved: moved,
- Ask: ask,
- RedirectCount: count,
- Final: final,
- })
- }
- }
- func (c *Cluster) doInner(a Action, addr, key string, ask bool, attempts int) error {
- if downSince := c.getClusterDownSince(); downSince > 0 && c.co.clusterDownWait > 0 {
- // only wait when the last command was not too long, because
- // otherwise the chance it high that the cluster already healed
- elapsed := (time.Now().UnixNano() / 1000 / 1000) - downSince
- if elapsed < int64(c.co.clusterDownWait/time.Millisecond) {
- time.Sleep(c.co.clusterDownWait)
- }
- }
- p, err := c.pool(addr)
- if err != nil {
- return err
- }
- // We only need to use WithConn if we want to send an ASKING command before
- // our Action a. If ask is false we can thus skip the WithConn call, which
- // avoids a few allocations, and execute our Action directly on p. This
- // helps with most calls since ask will only be true when a key gets
- // migrated between nodes.
- thisA := a
- if ask {
- thisA = WithConn(key, func(conn Conn) error {
- return askConn{conn}.Do(a)
- })
- }
- err = p.Do(thisA)
- if err == nil {
- c.setClusterDown(false)
- return nil
- }
- var respErr resp2.Error
- if !errors.As(err, &respErr) {
- return err
- }
- msg := respErr.Error()
- clusterDown := strings.HasPrefix(msg, "CLUSTERDOWN ")
- clusterDownChanged := c.setClusterDown(clusterDown)
- if clusterDown && c.co.clusterDownWait > 0 && clusterDownChanged {
- return c.doInner(a, addr, key, ask, 1)
- }
- // if the error was a MOVED or ASK we can potentially retry
- moved := strings.HasPrefix(msg, "MOVED ")
- ask = strings.HasPrefix(msg, "ASK ")
- if !moved && !ask {
- return err
- }
- // if we get an ASK there's no need to do a sync quite yet, we can continue
- // normally. But MOVED always prompts a sync. In the section after this one
- // we figure out what address to use based on the returned error so the sync
- // isn't used _immediately_, but it still needs to happen.
- //
- // Also, even if the Action isn't a ClusterCanRetryAction we want a MOVED to
- // prompt a Sync
- if moved {
- if serr := c.Sync(); serr != nil {
- return serr
- }
- }
- if ccra, ok := a.(ClusterCanRetryAction); !ok || !ccra.ClusterCanRetry() {
- return err
- }
- msgParts := strings.Split(msg, " ")
- if len(msgParts) < 3 {
- return fmt.Errorf("malformed MOVED/ASK error %q", msg)
- }
- ogAddr, addr := addr, msgParts[2]
- c.traceRedirected(ogAddr, key, moved, ask, doAttempts-attempts+1, attempts <= 1)
- if attempts--; attempts <= 0 {
- return errors.New("cluster action redirected too many times")
- }
- return c.doInner(a, addr, key, ask, attempts)
- }
- // Close cleans up all goroutines spawned by Cluster and closes all of its
- // Pools.
- func (c *Cluster) Close() error {
- closeErr := errClientClosed
- c.closeOnce.Do(func() {
- close(c.closeCh)
- c.closeWG.Wait()
- close(c.ErrCh)
- c.l.Lock()
- defer c.l.Unlock()
- var pErr error
- for _, p := range c.pools {
- if err := p.Close(); pErr == nil && err != nil {
- pErr = err
- }
- }
- closeErr = pErr
- })
- return closeErr
- }
|