ring.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702
  1. package redis
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/rand"
  7. "strconv"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/go-redis/redis/internal"
  12. "github.com/go-redis/redis/internal/consistenthash"
  13. "github.com/go-redis/redis/internal/hashtag"
  14. "github.com/go-redis/redis/internal/pool"
  15. )
  16. // Hash is type of hash function used in consistent hash.
  17. type Hash consistenthash.Hash
  18. var errRingShardsDown = errors.New("redis: all ring shards are down")
  19. // RingOptions are used to configure a ring client and should be
  20. // passed to NewRing.
  21. type RingOptions struct {
  22. // Map of name => host:port addresses of ring shards.
  23. Addrs map[string]string
  24. // Frequency of PING commands sent to check shards availability.
  25. // Shard is considered down after 3 subsequent failed checks.
  26. HeartbeatFrequency time.Duration
  27. // Hash function used in consistent hash.
  28. // Default is crc32.ChecksumIEEE.
  29. Hash Hash
  30. // Number of replicas in consistent hash.
  31. // Default is 100 replicas.
  32. //
  33. // Higher number of replicas will provide less deviation, that is keys will be
  34. // distributed to nodes more evenly.
  35. //
  36. // Following is deviation for common nreplicas:
  37. // --------------------------------------------------------
  38. // | nreplicas | standard error | 99% confidence interval |
  39. // | 10 | 0.3152 | (0.37, 1.98) |
  40. // | 100 | 0.0997 | (0.76, 1.28) |
  41. // | 1000 | 0.0316 | (0.92, 1.09) |
  42. // --------------------------------------------------------
  43. //
  44. // See https://arxiv.org/abs/1406.2294 for reference
  45. HashReplicas int
  46. // Following options are copied from Options struct.
  47. OnConnect func(*Conn) error
  48. DB int
  49. Password string
  50. MaxRetries int
  51. MinRetryBackoff time.Duration
  52. MaxRetryBackoff time.Duration
  53. DialTimeout time.Duration
  54. ReadTimeout time.Duration
  55. WriteTimeout time.Duration
  56. PoolSize int
  57. MinIdleConns int
  58. MaxConnAge time.Duration
  59. PoolTimeout time.Duration
  60. IdleTimeout time.Duration
  61. IdleCheckFrequency time.Duration
  62. }
  63. func (opt *RingOptions) init() {
  64. if opt.HeartbeatFrequency == 0 {
  65. opt.HeartbeatFrequency = 500 * time.Millisecond
  66. }
  67. if opt.HashReplicas == 0 {
  68. opt.HashReplicas = 100
  69. }
  70. switch opt.MinRetryBackoff {
  71. case -1:
  72. opt.MinRetryBackoff = 0
  73. case 0:
  74. opt.MinRetryBackoff = 8 * time.Millisecond
  75. }
  76. switch opt.MaxRetryBackoff {
  77. case -1:
  78. opt.MaxRetryBackoff = 0
  79. case 0:
  80. opt.MaxRetryBackoff = 512 * time.Millisecond
  81. }
  82. }
  83. func (opt *RingOptions) clientOptions() *Options {
  84. return &Options{
  85. OnConnect: opt.OnConnect,
  86. DB: opt.DB,
  87. Password: opt.Password,
  88. DialTimeout: opt.DialTimeout,
  89. ReadTimeout: opt.ReadTimeout,
  90. WriteTimeout: opt.WriteTimeout,
  91. PoolSize: opt.PoolSize,
  92. MinIdleConns: opt.MinIdleConns,
  93. MaxConnAge: opt.MaxConnAge,
  94. PoolTimeout: opt.PoolTimeout,
  95. IdleTimeout: opt.IdleTimeout,
  96. IdleCheckFrequency: opt.IdleCheckFrequency,
  97. }
  98. }
  99. //------------------------------------------------------------------------------
  100. type ringShard struct {
  101. Client *Client
  102. down int32
  103. }
  104. func (shard *ringShard) String() string {
  105. var state string
  106. if shard.IsUp() {
  107. state = "up"
  108. } else {
  109. state = "down"
  110. }
  111. return fmt.Sprintf("%s is %s", shard.Client, state)
  112. }
  113. func (shard *ringShard) IsDown() bool {
  114. const threshold = 3
  115. return atomic.LoadInt32(&shard.down) >= threshold
  116. }
  117. func (shard *ringShard) IsUp() bool {
  118. return !shard.IsDown()
  119. }
  120. // Vote votes to set shard state and returns true if state was changed.
  121. func (shard *ringShard) Vote(up bool) bool {
  122. if up {
  123. changed := shard.IsDown()
  124. atomic.StoreInt32(&shard.down, 0)
  125. return changed
  126. }
  127. if shard.IsDown() {
  128. return false
  129. }
  130. atomic.AddInt32(&shard.down, 1)
  131. return shard.IsDown()
  132. }
  133. //------------------------------------------------------------------------------
  134. type ringShards struct {
  135. opt *RingOptions
  136. mu sync.RWMutex
  137. hash *consistenthash.Map
  138. shards map[string]*ringShard // read only
  139. list []*ringShard // read only
  140. len int
  141. closed bool
  142. }
  143. func newRingShards(opt *RingOptions) *ringShards {
  144. return &ringShards{
  145. opt: opt,
  146. hash: newConsistentHash(opt),
  147. shards: make(map[string]*ringShard),
  148. }
  149. }
  150. func (c *ringShards) Add(name string, cl *Client) {
  151. shard := &ringShard{Client: cl}
  152. c.hash.Add(name)
  153. c.shards[name] = shard
  154. c.list = append(c.list, shard)
  155. }
  156. func (c *ringShards) List() []*ringShard {
  157. c.mu.RLock()
  158. list := c.list
  159. c.mu.RUnlock()
  160. return list
  161. }
  162. func (c *ringShards) Hash(key string) string {
  163. c.mu.RLock()
  164. hash := c.hash.Get(key)
  165. c.mu.RUnlock()
  166. return hash
  167. }
  168. func (c *ringShards) GetByKey(key string) (*ringShard, error) {
  169. key = hashtag.Key(key)
  170. c.mu.RLock()
  171. if c.closed {
  172. c.mu.RUnlock()
  173. return nil, pool.ErrClosed
  174. }
  175. hash := c.hash.Get(key)
  176. if hash == "" {
  177. c.mu.RUnlock()
  178. return nil, errRingShardsDown
  179. }
  180. shard := c.shards[hash]
  181. c.mu.RUnlock()
  182. return shard, nil
  183. }
  184. func (c *ringShards) GetByHash(name string) (*ringShard, error) {
  185. if name == "" {
  186. return c.Random()
  187. }
  188. c.mu.RLock()
  189. shard := c.shards[name]
  190. c.mu.RUnlock()
  191. return shard, nil
  192. }
  193. func (c *ringShards) Random() (*ringShard, error) {
  194. return c.GetByKey(strconv.Itoa(rand.Int()))
  195. }
  196. // heartbeat monitors state of each shard in the ring.
  197. func (c *ringShards) Heartbeat(frequency time.Duration) {
  198. ticker := time.NewTicker(frequency)
  199. defer ticker.Stop()
  200. for range ticker.C {
  201. var rebalance bool
  202. c.mu.RLock()
  203. if c.closed {
  204. c.mu.RUnlock()
  205. break
  206. }
  207. shards := c.list
  208. c.mu.RUnlock()
  209. for _, shard := range shards {
  210. err := shard.Client.Ping().Err()
  211. if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
  212. internal.Logf("ring shard state changed: %s", shard)
  213. rebalance = true
  214. }
  215. }
  216. if rebalance {
  217. c.rebalance()
  218. }
  219. }
  220. }
  221. // rebalance removes dead shards from the Ring.
  222. func (c *ringShards) rebalance() {
  223. c.mu.RLock()
  224. shards := c.shards
  225. c.mu.RUnlock()
  226. hash := newConsistentHash(c.opt)
  227. var shardsNum int
  228. for name, shard := range shards {
  229. if shard.IsUp() {
  230. hash.Add(name)
  231. shardsNum++
  232. }
  233. }
  234. c.mu.Lock()
  235. c.hash = hash
  236. c.len = shardsNum
  237. c.mu.Unlock()
  238. }
  239. func (c *ringShards) Len() int {
  240. c.mu.RLock()
  241. l := c.len
  242. c.mu.RUnlock()
  243. return l
  244. }
  245. func (c *ringShards) Close() error {
  246. c.mu.Lock()
  247. defer c.mu.Unlock()
  248. if c.closed {
  249. return nil
  250. }
  251. c.closed = true
  252. var firstErr error
  253. for _, shard := range c.shards {
  254. if err := shard.Client.Close(); err != nil && firstErr == nil {
  255. firstErr = err
  256. }
  257. }
  258. c.hash = nil
  259. c.shards = nil
  260. c.list = nil
  261. return firstErr
  262. }
  263. //------------------------------------------------------------------------------
  264. // Ring is a Redis client that uses consistent hashing to distribute
  265. // keys across multiple Redis servers (shards). It's safe for
  266. // concurrent use by multiple goroutines.
  267. //
  268. // Ring monitors the state of each shard and removes dead shards from
  269. // the ring. When a shard comes online it is added back to the ring. This
  270. // gives you maximum availability and partition tolerance, but no
  271. // consistency between different shards or even clients. Each client
  272. // uses shards that are available to the client and does not do any
  273. // coordination when shard state is changed.
  274. //
  275. // Ring should be used when you need multiple Redis servers for caching
  276. // and can tolerate losing data when one of the servers dies.
  277. // Otherwise you should use Redis Cluster.
  278. type Ring struct {
  279. cmdable
  280. ctx context.Context
  281. opt *RingOptions
  282. shards *ringShards
  283. cmdsInfoCache *cmdsInfoCache
  284. process func(Cmder) error
  285. processPipeline func([]Cmder) error
  286. }
  287. func NewRing(opt *RingOptions) *Ring {
  288. opt.init()
  289. ring := &Ring{
  290. opt: opt,
  291. shards: newRingShards(opt),
  292. }
  293. ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
  294. ring.process = ring.defaultProcess
  295. ring.processPipeline = ring.defaultProcessPipeline
  296. ring.init()
  297. for name, addr := range opt.Addrs {
  298. clopt := opt.clientOptions()
  299. clopt.Addr = addr
  300. ring.shards.Add(name, NewClient(clopt))
  301. }
  302. go ring.shards.Heartbeat(opt.HeartbeatFrequency)
  303. return ring
  304. }
  305. func (c *Ring) init() {
  306. c.cmdable.setProcessor(c.Process)
  307. }
  308. func (c *Ring) Context() context.Context {
  309. if c.ctx != nil {
  310. return c.ctx
  311. }
  312. return context.Background()
  313. }
  314. func (c *Ring) WithContext(ctx context.Context) *Ring {
  315. if ctx == nil {
  316. panic("nil context")
  317. }
  318. c2 := c.clone()
  319. c2.ctx = ctx
  320. return c2
  321. }
  322. func (c *Ring) clone() *Ring {
  323. cp := *c
  324. cp.init()
  325. return &cp
  326. }
  327. // Options returns read-only Options that were used to create the client.
  328. func (c *Ring) Options() *RingOptions {
  329. return c.opt
  330. }
  331. func (c *Ring) retryBackoff(attempt int) time.Duration {
  332. return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  333. }
  334. // PoolStats returns accumulated connection pool stats.
  335. func (c *Ring) PoolStats() *PoolStats {
  336. shards := c.shards.List()
  337. var acc PoolStats
  338. for _, shard := range shards {
  339. s := shard.Client.connPool.Stats()
  340. acc.Hits += s.Hits
  341. acc.Misses += s.Misses
  342. acc.Timeouts += s.Timeouts
  343. acc.TotalConns += s.TotalConns
  344. acc.IdleConns += s.IdleConns
  345. }
  346. return &acc
  347. }
  348. // Len returns the current number of shards in the ring.
  349. func (c *Ring) Len() int {
  350. return c.shards.Len()
  351. }
  352. // Subscribe subscribes the client to the specified channels.
  353. func (c *Ring) Subscribe(channels ...string) *PubSub {
  354. if len(channels) == 0 {
  355. panic("at least one channel is required")
  356. }
  357. shard, err := c.shards.GetByKey(channels[0])
  358. if err != nil {
  359. // TODO: return PubSub with sticky error
  360. panic(err)
  361. }
  362. return shard.Client.Subscribe(channels...)
  363. }
  364. // PSubscribe subscribes the client to the given patterns.
  365. func (c *Ring) PSubscribe(channels ...string) *PubSub {
  366. if len(channels) == 0 {
  367. panic("at least one channel is required")
  368. }
  369. shard, err := c.shards.GetByKey(channels[0])
  370. if err != nil {
  371. // TODO: return PubSub with sticky error
  372. panic(err)
  373. }
  374. return shard.Client.PSubscribe(channels...)
  375. }
  376. // ForEachShard concurrently calls the fn on each live shard in the ring.
  377. // It returns the first error if any.
  378. func (c *Ring) ForEachShard(fn func(client *Client) error) error {
  379. shards := c.shards.List()
  380. var wg sync.WaitGroup
  381. errCh := make(chan error, 1)
  382. for _, shard := range shards {
  383. if shard.IsDown() {
  384. continue
  385. }
  386. wg.Add(1)
  387. go func(shard *ringShard) {
  388. defer wg.Done()
  389. err := fn(shard.Client)
  390. if err != nil {
  391. select {
  392. case errCh <- err:
  393. default:
  394. }
  395. }
  396. }(shard)
  397. }
  398. wg.Wait()
  399. select {
  400. case err := <-errCh:
  401. return err
  402. default:
  403. return nil
  404. }
  405. }
  406. func (c *Ring) cmdsInfo() (map[string]*CommandInfo, error) {
  407. shards := c.shards.List()
  408. firstErr := errRingShardsDown
  409. for _, shard := range shards {
  410. cmdsInfo, err := shard.Client.Command().Result()
  411. if err == nil {
  412. return cmdsInfo, nil
  413. }
  414. if firstErr == nil {
  415. firstErr = err
  416. }
  417. }
  418. return nil, firstErr
  419. }
  420. func (c *Ring) cmdInfo(name string) *CommandInfo {
  421. cmdsInfo, err := c.cmdsInfoCache.Get()
  422. if err != nil {
  423. return nil
  424. }
  425. info := cmdsInfo[name]
  426. if info == nil {
  427. internal.Logf("info for cmd=%s not found", name)
  428. }
  429. return info
  430. }
  431. func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
  432. cmdInfo := c.cmdInfo(cmd.Name())
  433. pos := cmdFirstKeyPos(cmd, cmdInfo)
  434. if pos == 0 {
  435. return c.shards.Random()
  436. }
  437. firstKey := cmd.stringArg(pos)
  438. return c.shards.GetByKey(firstKey)
  439. }
  440. // Do creates a Cmd from the args and processes the cmd.
  441. func (c *Ring) Do(args ...interface{}) *Cmd {
  442. cmd := NewCmd(args...)
  443. c.Process(cmd)
  444. return cmd
  445. }
  446. func (c *Ring) WrapProcess(
  447. fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
  448. ) {
  449. c.process = fn(c.process)
  450. }
  451. func (c *Ring) Process(cmd Cmder) error {
  452. return c.process(cmd)
  453. }
  454. func (c *Ring) defaultProcess(cmd Cmder) error {
  455. for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
  456. if attempt > 0 {
  457. time.Sleep(c.retryBackoff(attempt))
  458. }
  459. shard, err := c.cmdShard(cmd)
  460. if err != nil {
  461. cmd.setErr(err)
  462. return err
  463. }
  464. err = shard.Client.Process(cmd)
  465. if err == nil {
  466. return nil
  467. }
  468. if !internal.IsRetryableError(err, cmd.readTimeout() == nil) {
  469. return err
  470. }
  471. }
  472. return cmd.Err()
  473. }
  474. func (c *Ring) Pipeline() Pipeliner {
  475. pipe := Pipeline{
  476. exec: c.processPipeline,
  477. }
  478. pipe.cmdable.setProcessor(pipe.Process)
  479. return &pipe
  480. }
  481. func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  482. return c.Pipeline().Pipelined(fn)
  483. }
  484. func (c *Ring) WrapProcessPipeline(
  485. fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
  486. ) {
  487. c.processPipeline = fn(c.processPipeline)
  488. }
  489. func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
  490. cmdsMap := make(map[string][]Cmder)
  491. for _, cmd := range cmds {
  492. cmdInfo := c.cmdInfo(cmd.Name())
  493. hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
  494. if hash != "" {
  495. hash = c.shards.Hash(hashtag.Key(hash))
  496. }
  497. cmdsMap[hash] = append(cmdsMap[hash], cmd)
  498. }
  499. for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
  500. if attempt > 0 {
  501. time.Sleep(c.retryBackoff(attempt))
  502. }
  503. var mu sync.Mutex
  504. var failedCmdsMap map[string][]Cmder
  505. var wg sync.WaitGroup
  506. for hash, cmds := range cmdsMap {
  507. wg.Add(1)
  508. go func(hash string, cmds []Cmder) {
  509. defer wg.Done()
  510. shard, err := c.shards.GetByHash(hash)
  511. if err != nil {
  512. setCmdsErr(cmds, err)
  513. return
  514. }
  515. cn, err := shard.Client.getConn()
  516. if err != nil {
  517. setCmdsErr(cmds, err)
  518. return
  519. }
  520. canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
  521. shard.Client.releaseConnStrict(cn, err)
  522. if canRetry && internal.IsRetryableError(err, true) {
  523. mu.Lock()
  524. if failedCmdsMap == nil {
  525. failedCmdsMap = make(map[string][]Cmder)
  526. }
  527. failedCmdsMap[hash] = cmds
  528. mu.Unlock()
  529. }
  530. }(hash, cmds)
  531. }
  532. wg.Wait()
  533. if len(failedCmdsMap) == 0 {
  534. break
  535. }
  536. cmdsMap = failedCmdsMap
  537. }
  538. return cmdsFirstErr(cmds)
  539. }
  540. func (c *Ring) TxPipeline() Pipeliner {
  541. panic("not implemented")
  542. }
  543. func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  544. panic("not implemented")
  545. }
  546. // Close closes the ring client, releasing any open resources.
  547. //
  548. // It is rare to Close a Ring, as the Ring is meant to be long-lived
  549. // and shared between many goroutines.
  550. func (c *Ring) Close() error {
  551. return c.shards.Close()
  552. }
  553. func (c *Ring) Watch(fn func(*Tx) error, keys ...string) error {
  554. if len(keys) == 0 {
  555. return fmt.Errorf("redis: Watch requires at least one key")
  556. }
  557. var shards []*ringShard
  558. for _, key := range keys {
  559. if key != "" {
  560. shard, err := c.shards.GetByKey(hashtag.Key(key))
  561. if err != nil {
  562. return err
  563. }
  564. shards = append(shards, shard)
  565. }
  566. }
  567. if len(shards) == 0 {
  568. return fmt.Errorf("redis: Watch requires at least one shard")
  569. }
  570. if len(shards) > 1 {
  571. for _, shard := range shards[1:] {
  572. if shard.Client != shards[0].Client {
  573. err := fmt.Errorf("redis: Watch requires all keys to be in the same shard")
  574. return err
  575. }
  576. }
  577. }
  578. return shards[0].Client.Watch(fn, keys...)
  579. }
  580. func newConsistentHash(opt *RingOptions) *consistenthash.Map {
  581. return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash))
  582. }