cluster.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776
  1. package radix
  2. import (
  3. "fmt"
  4. "reflect"
  5. "strings"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "errors"
  10. "github.com/mediocregopher/radix/v3/resp"
  11. "github.com/mediocregopher/radix/v3/resp/resp2"
  12. "github.com/mediocregopher/radix/v3/trace"
  13. )
  14. // dedupe is used to deduplicate a function invocation, so if multiple
  15. // go-routines call it at the same time only the first will actually run it, and
  16. // the others will block until that one is done.
  17. type dedupe struct {
  18. l sync.Mutex
  19. s *sync.Once
  20. }
  21. func newDedupe() *dedupe {
  22. return &dedupe{s: new(sync.Once)}
  23. }
  24. func (d *dedupe) do(fn func()) {
  25. d.l.Lock()
  26. s := d.s
  27. d.l.Unlock()
  28. s.Do(func() {
  29. fn()
  30. d.l.Lock()
  31. d.s = new(sync.Once)
  32. d.l.Unlock()
  33. })
  34. }
  35. ////////////////////////////////////////////////////////////////////////////////
  36. // ClusterCanRetryAction is an Action which is aware of Cluster's retry behavior
  37. // in the event of a slot migration. If an Action receives an error from a
  38. // Cluster node which is either MOVED or ASK, and that Action implements
  39. // ClusterCanRetryAction, and the ClusterCanRetry method returns true, then the
  40. // Action will be retried on the correct node.
  41. //
  42. // NOTE that the Actions which are returned by Cmd, FlatCmd, and EvalScript.Cmd
  43. // all implicitly implement this interface.
  44. type ClusterCanRetryAction interface {
  45. Action
  46. ClusterCanRetry() bool
  47. }
  48. ////////////////////////////////////////////////////////////////////////////////
  49. type clusterOpts struct {
  50. pf ClientFunc
  51. clusterDownWait time.Duration
  52. syncEvery time.Duration
  53. ct trace.ClusterTrace
  54. initAllowUnavailable bool
  55. }
  56. // ClusterOpt is an optional behavior which can be applied to the NewCluster
  57. // function to effect a Cluster's behavior.
  58. type ClusterOpt func(*clusterOpts)
  59. // ClusterPoolFunc tells the Cluster to use the given ClientFunc when creating
  60. // pools of connections to cluster members.
  61. //
  62. // This can be used to allow for secondary reads via the Cluster.DoSecondary
  63. // method by specifying a ClientFunc that internally creates connections using
  64. // DefaultClusterConnFunc or a custom ConnFunc that enables READONLY mode on each
  65. // connection.
  66. func ClusterPoolFunc(pf ClientFunc) ClusterOpt {
  67. return func(co *clusterOpts) {
  68. co.pf = pf
  69. }
  70. }
  71. // ClusterSyncEvery tells the Cluster to synchronize itself with the cluster's
  72. // topology at the given interval. On every synchronization Cluster will ask the
  73. // cluster for its topology and make/destroy its connections as necessary.
  74. func ClusterSyncEvery(d time.Duration) ClusterOpt {
  75. return func(co *clusterOpts) {
  76. co.syncEvery = d
  77. }
  78. }
  79. // ClusterOnDownDelayActionsBy tells the Cluster to delay all commands by the given
  80. // duration while the cluster is seen to be in the CLUSTERDOWN state. This
  81. // allows fewer actions to be affected by brief outages, e.g. during a failover.
  82. //
  83. // If the given duration is 0 then Cluster will not delay actions during the
  84. // CLUSTERDOWN state. Note that calls to Sync will not be delayed regardless
  85. // of this option.
  86. func ClusterOnDownDelayActionsBy(d time.Duration) ClusterOpt {
  87. return func(co *clusterOpts) {
  88. co.clusterDownWait = d
  89. }
  90. }
  91. // ClusterWithTrace tells the Cluster to trace itself with the given
  92. // ClusterTrace. Note that ClusterTrace will block every point that you set to
  93. // trace.
  94. func ClusterWithTrace(ct trace.ClusterTrace) ClusterOpt {
  95. return func(co *clusterOpts) {
  96. co.ct = ct
  97. }
  98. }
  99. // ClusterOnInitAllowUnavailable tells NewCluster to succeed
  100. // and not return an error as long as at least one redis instance
  101. // in the cluster can be successfully connected to.
  102. func ClusterOnInitAllowUnavailable(initAllowUnavailable bool) ClusterOpt {
  103. return func(co *clusterOpts) {
  104. co.initAllowUnavailable = initAllowUnavailable
  105. }
  106. }
  107. // Cluster contains all information about a redis cluster needed to interact
  108. // with it, including a set of pools to each of its instances. All methods on
  109. // Cluster are thread-safe.
  110. type Cluster struct {
  111. // Atomic fields must be at the beginning of the struct since they must be
  112. // correctly aligned or else access may cause panics on 32-bit architectures
  113. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  114. lastClusterdown int64 // unix timestamp in milliseconds, atomic
  115. co clusterOpts
  116. // used to deduplicate calls to sync
  117. syncDedupe *dedupe
  118. l sync.RWMutex
  119. pools map[string]Client
  120. primTopo, topo ClusterTopo
  121. secondaries map[string]map[string]ClusterNode
  122. closeCh chan struct{}
  123. closeWG sync.WaitGroup
  124. closeOnce sync.Once
  125. // Any errors encountered internally will be written to this channel. If
  126. // nothing is reading the channel the errors will be dropped. The channel
  127. // will be closed when the Close method is called.
  128. ErrCh chan error
  129. }
  130. // DefaultClusterConnFunc is a ConnFunc which will return a Conn for a node in a
  131. // redis cluster using sane defaults and which has READONLY mode enabled, allowing
  132. // read-only commands on the connection even if the connected instance is currently
  133. // a replica, either by explicitly sending commands on the connection or by using
  134. // the DoSecondary method on the Cluster that owns the connection.
  135. var DefaultClusterConnFunc = func(network, addr string) (Conn, error) {
  136. c, err := DefaultConnFunc(network, addr)
  137. if err != nil {
  138. return nil, err
  139. } else if err := c.Do(Cmd(nil, "READONLY")); err != nil {
  140. c.Close()
  141. return nil, err
  142. }
  143. return c, nil
  144. }
  145. // NewCluster initializes and returns a Cluster instance. It will try every
  146. // address given until it finds a usable one. From there it uses CLUSTER SLOTS
  147. // to discover the cluster topology and make all the necessary connections.
  148. //
  149. // NewCluster takes in a number of options which can overwrite its default
  150. // behavior. The default options NewCluster uses are:
  151. //
  152. // ClusterPoolFunc(DefaultClientFunc)
  153. // ClusterSyncEvery(5 * time.Second)
  154. // ClusterOnDownDelayActionsBy(100 * time.Millisecond)
  155. //
  156. func NewCluster(clusterAddrs []string, opts ...ClusterOpt) (*Cluster, error) {
  157. c := &Cluster{
  158. syncDedupe: newDedupe(),
  159. pools: map[string]Client{},
  160. closeCh: make(chan struct{}),
  161. ErrCh: make(chan error, 1),
  162. }
  163. defaultClusterOpts := []ClusterOpt{
  164. ClusterPoolFunc(DefaultClientFunc),
  165. ClusterSyncEvery(5 * time.Second),
  166. ClusterOnDownDelayActionsBy(100 * time.Millisecond),
  167. }
  168. for _, opt := range append(defaultClusterOpts, opts...) {
  169. // the other args to NewCluster used to be a ClientFunc, which someone
  170. // might have left as nil, in which case this now gives a weird panic.
  171. // Just handle it
  172. if opt != nil {
  173. opt(&(c.co))
  174. }
  175. }
  176. var err error
  177. // make a pool to base the cluster on
  178. for _, addr := range clusterAddrs {
  179. var p Client
  180. if p, err = c.co.pf("tcp", addr); err != nil {
  181. continue
  182. }
  183. c.pools[addr] = p
  184. break
  185. }
  186. if len(c.pools) == 0 {
  187. return nil, fmt.Errorf("could not connect to any redis instances, last error was: %w", err)
  188. }
  189. p, err := c.pool("")
  190. if err != nil {
  191. for _, p := range c.pools {
  192. p.Close()
  193. }
  194. return nil, err
  195. }
  196. if err := c.sync(p, c.co.initAllowUnavailable); err != nil {
  197. for _, p := range c.pools {
  198. p.Close()
  199. }
  200. return nil, err
  201. }
  202. c.syncEvery(c.co.syncEvery)
  203. return c, nil
  204. }
  205. func (c *Cluster) err(err error) {
  206. select {
  207. case c.ErrCh <- err:
  208. default:
  209. }
  210. }
  211. func assertKeysSlot(keys []string) error {
  212. var ok bool
  213. var prevKey string
  214. var slot uint16
  215. for _, key := range keys {
  216. thisSlot := ClusterSlot([]byte(key))
  217. if !ok {
  218. ok = true
  219. } else if slot != thisSlot {
  220. return fmt.Errorf("keys %q and %q do not belong to the same slot", prevKey, key)
  221. }
  222. prevKey = key
  223. slot = thisSlot
  224. }
  225. return nil
  226. }
  227. // may return nil, nil if no pool for the addr.
  228. func (c *Cluster) rpool(addr string) (Client, error) {
  229. c.l.RLock()
  230. defer c.l.RUnlock()
  231. if addr == "" {
  232. for _, p := range c.pools {
  233. return p, nil
  234. }
  235. return nil, errors.New("no pools available")
  236. } else if p, ok := c.pools[addr]; ok {
  237. return p, nil
  238. }
  239. return nil, nil
  240. }
  241. var errUnknownAddress = errors.New("unknown address")
  242. // Client returns a Client for the given address, which could be either the
  243. // primary or one of the secondaries (see Topo method for retrieving known
  244. // addresses).
  245. //
  246. // NOTE that if there is a failover while a Client returned by this method is
  247. // being used the Client may or may not continue to work as expected, depending
  248. // on the nature of the failover.
  249. //
  250. // NOTE the Client should _not_ be closed.
  251. func (c *Cluster) Client(addr string) (Client, error) {
  252. // rpool allows the address to be "", handle that case manually
  253. if addr == "" {
  254. return nil, errUnknownAddress
  255. }
  256. cl, err := c.rpool(addr)
  257. if err != nil {
  258. return nil, err
  259. } else if cl == nil {
  260. return nil, errUnknownAddress
  261. }
  262. return cl, nil
  263. }
  264. // if addr is "" returns a random pool. If addr is given but there's no pool for
  265. // it one will be created on-the-fly.
  266. func (c *Cluster) pool(addr string) (Client, error) {
  267. p, err := c.rpool(addr)
  268. if p != nil || err != nil {
  269. return p, err
  270. }
  271. // if the pool isn't available make it on-the-fly. This behavior isn't
  272. // _great_, but theoretically the syncEvery process should clean up any
  273. // extraneous pools which aren't really needed
  274. // it's important that the cluster pool set isn't locked while this is
  275. // happening, because this could block for a while
  276. if p, err = c.co.pf("tcp", addr); err != nil {
  277. return nil, err
  278. }
  279. // we've made a new pool, but we need to double-check someone else didn't
  280. // make one at the same time and add it in first. If they did, close this
  281. // one and return that one
  282. c.l.Lock()
  283. if p2, ok := c.pools[addr]; ok {
  284. c.l.Unlock()
  285. p.Close()
  286. return p2, nil
  287. }
  288. c.pools[addr] = p
  289. c.l.Unlock()
  290. return p, nil
  291. }
  292. // Topo returns the Cluster's topology as it currently knows it. See
  293. // ClusterTopo's docs for more on its default order.
  294. func (c *Cluster) Topo() ClusterTopo {
  295. c.l.RLock()
  296. defer c.l.RUnlock()
  297. return c.topo
  298. }
  299. func (c *Cluster) getTopo(p Client) (ClusterTopo, error) {
  300. var tt ClusterTopo
  301. err := p.Do(Cmd(&tt, "CLUSTER", "SLOTS"))
  302. if len(tt) == 0 && err == nil {
  303. //This will happen between when nodes starts coming up after cluster goes down and
  304. //Cluster swarm yet not ready using those nodes.
  305. err = errors.New("no cluster slots assigned")
  306. }
  307. return tt, err
  308. }
  309. // Sync will synchronize the Cluster with the actual cluster, making new pools
  310. // to new instances and removing ones from instances no longer in the cluster.
  311. // This will be called periodically automatically, but you can manually call it
  312. // at any time as well.
  313. func (c *Cluster) Sync() error {
  314. p, err := c.pool("")
  315. if err != nil {
  316. return err
  317. }
  318. c.syncDedupe.do(func() {
  319. err = c.sync(p, false)
  320. })
  321. return err
  322. }
  323. func nodeInfoFromNode(node ClusterNode) trace.ClusterNodeInfo {
  324. return trace.ClusterNodeInfo{
  325. Addr: node.Addr,
  326. Slots: node.Slots,
  327. IsPrimary: node.SecondaryOfAddr == "",
  328. }
  329. }
  330. func (c *Cluster) traceTopoChanged(prevTopo ClusterTopo, newTopo ClusterTopo) {
  331. if c.co.ct.TopoChanged != nil {
  332. var addedNodes []trace.ClusterNodeInfo
  333. var removedNodes []trace.ClusterNodeInfo
  334. var changedNodes []trace.ClusterNodeInfo
  335. prevTopoMap := prevTopo.Map()
  336. newTopoMap := newTopo.Map()
  337. for addr, newNode := range newTopoMap {
  338. if prevNode, ok := prevTopoMap[addr]; ok {
  339. // Check whether two nodes which have the same address changed its value or not
  340. if !reflect.DeepEqual(prevNode, newNode) {
  341. changedNodes = append(changedNodes, nodeInfoFromNode(newNode))
  342. }
  343. // No need to handle this address for finding removed nodes
  344. delete(prevTopoMap, addr)
  345. } else {
  346. // The node's address not found from prevTopo is newly added node
  347. addedNodes = append(addedNodes, nodeInfoFromNode(newNode))
  348. }
  349. }
  350. // Find removed nodes, prevTopoMap has reduced
  351. for addr, prevNode := range prevTopoMap {
  352. if _, ok := newTopoMap[addr]; !ok {
  353. removedNodes = append(removedNodes, nodeInfoFromNode(prevNode))
  354. }
  355. }
  356. // Callback when any changes detected
  357. if len(addedNodes) != 0 || len(removedNodes) != 0 || len(changedNodes) != 0 {
  358. c.co.ct.TopoChanged(trace.ClusterTopoChanged{
  359. Added: addedNodes,
  360. Removed: removedNodes,
  361. Changed: changedNodes,
  362. })
  363. }
  364. }
  365. }
  366. // while this method is normally deduplicated by the Sync method's use of
  367. // dedupe it is perfectly thread-safe on its own and can be used whenever.
  368. func (c *Cluster) sync(p Client, silenceFlag bool) error {
  369. tt, err := c.getTopo(p)
  370. if err != nil {
  371. return err
  372. }
  373. for _, t := range tt {
  374. // call pool just to ensure one exists for this addr
  375. if _, err := c.pool(t.Addr); err != nil {
  376. if silenceFlag {
  377. continue
  378. } else {
  379. return fmt.Errorf("error connecting to %s: %w", t.Addr, err)
  380. }
  381. }
  382. }
  383. c.traceTopoChanged(c.topo, tt)
  384. var toclose []Client
  385. func() {
  386. c.l.Lock()
  387. defer c.l.Unlock()
  388. c.topo = tt
  389. c.primTopo = tt.Primaries()
  390. c.secondaries = make(map[string]map[string]ClusterNode, len(c.primTopo))
  391. for _, node := range c.topo {
  392. if node.SecondaryOfAddr != "" {
  393. m := c.secondaries[node.SecondaryOfAddr]
  394. if m == nil {
  395. m = make(map[string]ClusterNode, len(c.topo)/len(c.primTopo))
  396. c.secondaries[node.SecondaryOfAddr] = m
  397. }
  398. m[node.Addr] = node
  399. }
  400. }
  401. tm := tt.Map()
  402. for addr, p := range c.pools {
  403. if _, ok := tm[addr]; !ok {
  404. toclose = append(toclose, p)
  405. delete(c.pools, addr)
  406. }
  407. }
  408. }()
  409. for _, p := range toclose {
  410. p.Close()
  411. }
  412. return nil
  413. }
  414. func (c *Cluster) syncEvery(d time.Duration) {
  415. c.closeWG.Add(1)
  416. go func() {
  417. defer c.closeWG.Done()
  418. t := time.NewTicker(d)
  419. defer t.Stop()
  420. for {
  421. select {
  422. case <-t.C:
  423. if err := c.Sync(); err != nil {
  424. c.err(err)
  425. }
  426. case <-c.closeCh:
  427. return
  428. }
  429. }
  430. }()
  431. }
  432. // v3.8.5 add the getting master node without lock to fix the fix deadlock.
  433. func (c *Cluster) addrForKeyWithNoLock(key string) string {
  434. s := ClusterSlot([]byte(key))
  435. for _, t := range c.primTopo {
  436. for _, slot := range t.Slots {
  437. if s >= slot[0] && s < slot[1] {
  438. return t.Addr
  439. }
  440. }
  441. }
  442. return ""
  443. }
  444. func (c *Cluster) addrForKey(key string) string {
  445. c.l.RLock()
  446. defer c.l.RUnlock()
  447. return c.addrForKeyWithNoLock(key)
  448. }
  449. func (c *Cluster) secondaryAddrForKey(key string) string {
  450. c.l.RLock()
  451. defer c.l.RUnlock()
  452. primAddr := c.addrForKeyWithNoLock(key)
  453. for addr := range c.secondaries[primAddr] {
  454. return addr
  455. }
  456. return primAddr
  457. }
  458. type askConn struct {
  459. Conn
  460. }
  461. func (ac askConn) Encode(m resp.Marshaler) error {
  462. if err := ac.Conn.Encode(Cmd(nil, "ASKING")); err != nil {
  463. return err
  464. }
  465. return ac.Conn.Encode(m)
  466. }
  467. func (ac askConn) Decode(um resp.Unmarshaler) error {
  468. if err := ac.Conn.Decode(resp2.Any{}); err != nil {
  469. return err
  470. }
  471. return ac.Conn.Decode(um)
  472. }
  473. func (ac askConn) Do(a Action) error {
  474. return a.Run(ac)
  475. }
  476. const doAttempts = 5
  477. // Do performs an Action on a redis instance in the cluster, with the instance
  478. // being determeined by the key returned from the Action's Key() method.
  479. //
  480. // This method handles MOVED and ASK errors automatically in most cases, see
  481. // ClusterCanRetryAction's docs for more.
  482. func (c *Cluster) Do(a Action) error {
  483. var addr, key string
  484. keys := a.Keys()
  485. if len(keys) == 0 {
  486. // that's ok, key will then just be ""
  487. } else if err := assertKeysSlot(keys); err != nil {
  488. return err
  489. } else {
  490. key = keys[0]
  491. addr = c.addrForKey(key)
  492. }
  493. return c.doInner(a, addr, key, false, doAttempts)
  494. }
  495. // DoSecondary is like Do but executes the Action on a random secondary for the affected keys.
  496. //
  497. // For DoSecondary to work, all connections must be created in read-only mode, by using a
  498. // custom ClusterPoolFunc that executes the READONLY command on each new connection.
  499. //
  500. // See ClusterPoolFunc for an example using the global DefaultClusterConnFunc.
  501. //
  502. // If the Action can not be handled by a secondary the Action will be send to the primary instead.
  503. func (c *Cluster) DoSecondary(a Action) error {
  504. var addr, key string
  505. keys := a.Keys()
  506. if len(keys) == 0 {
  507. // that's ok, key will then just be ""
  508. } else if err := assertKeysSlot(keys); err != nil {
  509. return err
  510. } else {
  511. key = keys[0]
  512. addr = c.secondaryAddrForKey(key)
  513. }
  514. return c.doInner(a, addr, key, false, doAttempts)
  515. }
  516. func (c *Cluster) getClusterDownSince() int64 {
  517. return atomic.LoadInt64(&c.lastClusterdown)
  518. }
  519. func (c *Cluster) setClusterDown(down bool) (changed bool) {
  520. // There is a race when calling this method concurrently when the cluster
  521. // healed after being down.
  522. //
  523. // If we have 2 goroutines, one that sends a command before the cluster
  524. // heals and once that sends a command after the cluster healed, both
  525. // goroutines will call this method, but with different values
  526. // (down == true and down == false).
  527. //
  528. // Since there is bi ordering between the two goroutines, it can happen
  529. // that the call to setClusterDown in the second goroutine runs before
  530. // the call in the first goroutine. In that case the state would be
  531. // changed from down to up by the second goroutine, as it should, only
  532. // for the first goroutine to set it back to down a few microseconds later.
  533. //
  534. // If this happens other commands will be needlessly delayed until
  535. // another goroutine sets the state to up again and we will trace two
  536. // unnecessary state transitions.
  537. //
  538. // We can not reliably avoid this race without more complex tracking of
  539. // previous states, which would be rather complex and possibly expensive.
  540. // Swapping values is expensive (on amd64, an uncontended swap can be 10x
  541. // slower than a load) and can easily become quite contended when we have
  542. // many goroutines trying to update the value concurrently, which would
  543. // slow it down even more.
  544. //
  545. // We avoid the overhead of swapping when not necessary by loading the
  546. // value first and checking if the value is already what we want it to be.
  547. //
  548. // Since atomic loads are fast (on amd64 an atomic load can be as fast as
  549. // a non-atomic load, and is perfectly scalable as long as there are no
  550. // writes to the same cache line), we can safely do this without adding
  551. // unnecessary extra latency.
  552. prevVal := atomic.LoadInt64(&c.lastClusterdown)
  553. var newVal int64
  554. if down {
  555. newVal = time.Now().UnixNano() / 1000 / 1000
  556. // Since the exact value is only used for delaying commands small
  557. // differences don't matter much and we can avoid many updates by
  558. // ignoring small differences (<5ms).
  559. if prevVal != 0 && newVal-prevVal < 5 {
  560. return false
  561. }
  562. } else {
  563. if prevVal == 0 {
  564. return false
  565. }
  566. }
  567. prevVal = atomic.SwapInt64(&c.lastClusterdown, newVal)
  568. changed = (prevVal == 0 && newVal != 0) || (prevVal != 0 && newVal == 0)
  569. if changed && c.co.ct.StateChange != nil {
  570. c.co.ct.StateChange(trace.ClusterStateChange{IsDown: down})
  571. }
  572. return changed
  573. }
  574. func (c *Cluster) traceRedirected(addr, key string, moved, ask bool, count int, final bool) {
  575. if c.co.ct.Redirected != nil {
  576. c.co.ct.Redirected(trace.ClusterRedirected{
  577. Addr: addr,
  578. Key: key,
  579. Moved: moved,
  580. Ask: ask,
  581. RedirectCount: count,
  582. Final: final,
  583. })
  584. }
  585. }
  586. func (c *Cluster) doInner(a Action, addr, key string, ask bool, attempts int) error {
  587. if downSince := c.getClusterDownSince(); downSince > 0 && c.co.clusterDownWait > 0 {
  588. // only wait when the last command was not too long, because
  589. // otherwise the chance it high that the cluster already healed
  590. elapsed := (time.Now().UnixNano() / 1000 / 1000) - downSince
  591. if elapsed < int64(c.co.clusterDownWait/time.Millisecond) {
  592. time.Sleep(c.co.clusterDownWait)
  593. }
  594. }
  595. p, err := c.pool(addr)
  596. if err != nil {
  597. return err
  598. }
  599. // We only need to use WithConn if we want to send an ASKING command before
  600. // our Action a. If ask is false we can thus skip the WithConn call, which
  601. // avoids a few allocations, and execute our Action directly on p. This
  602. // helps with most calls since ask will only be true when a key gets
  603. // migrated between nodes.
  604. thisA := a
  605. if ask {
  606. thisA = WithConn(key, func(conn Conn) error {
  607. return askConn{conn}.Do(a)
  608. })
  609. }
  610. err = p.Do(thisA)
  611. if err == nil {
  612. c.setClusterDown(false)
  613. return nil
  614. }
  615. var respErr resp2.Error
  616. if !errors.As(err, &respErr) {
  617. return err
  618. }
  619. msg := respErr.Error()
  620. clusterDown := strings.HasPrefix(msg, "CLUSTERDOWN ")
  621. clusterDownChanged := c.setClusterDown(clusterDown)
  622. if clusterDown && c.co.clusterDownWait > 0 && clusterDownChanged {
  623. return c.doInner(a, addr, key, ask, 1)
  624. }
  625. // if the error was a MOVED or ASK we can potentially retry
  626. moved := strings.HasPrefix(msg, "MOVED ")
  627. ask = strings.HasPrefix(msg, "ASK ")
  628. if !moved && !ask {
  629. return err
  630. }
  631. // if we get an ASK there's no need to do a sync quite yet, we can continue
  632. // normally. But MOVED always prompts a sync. In the section after this one
  633. // we figure out what address to use based on the returned error so the sync
  634. // isn't used _immediately_, but it still needs to happen.
  635. //
  636. // Also, even if the Action isn't a ClusterCanRetryAction we want a MOVED to
  637. // prompt a Sync
  638. if moved {
  639. if serr := c.Sync(); serr != nil {
  640. return serr
  641. }
  642. }
  643. if ccra, ok := a.(ClusterCanRetryAction); !ok || !ccra.ClusterCanRetry() {
  644. return err
  645. }
  646. msgParts := strings.Split(msg, " ")
  647. if len(msgParts) < 3 {
  648. return fmt.Errorf("malformed MOVED/ASK error %q", msg)
  649. }
  650. ogAddr, addr := addr, msgParts[2]
  651. c.traceRedirected(ogAddr, key, moved, ask, doAttempts-attempts+1, attempts <= 1)
  652. if attempts--; attempts <= 0 {
  653. return errors.New("cluster action redirected too many times")
  654. }
  655. return c.doInner(a, addr, key, ask, attempts)
  656. }
  657. // Close cleans up all goroutines spawned by Cluster and closes all of its
  658. // Pools.
  659. func (c *Cluster) Close() error {
  660. closeErr := errClientClosed
  661. c.closeOnce.Do(func() {
  662. close(c.closeCh)
  663. c.closeWG.Wait()
  664. close(c.ErrCh)
  665. c.l.Lock()
  666. defer c.l.Unlock()
  667. var pErr error
  668. for _, p := range c.pools {
  669. if err := p.Close(); pErr == nil && err != nil {
  670. pErr = err
  671. }
  672. }
  673. closeErr = pErr
  674. })
  675. return closeErr
  676. }