sentinel.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. package redis
  2. import (
  3. "crypto/tls"
  4. "errors"
  5. "net"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/go-redis/redis/internal"
  10. "github.com/go-redis/redis/internal/pool"
  11. )
  12. //------------------------------------------------------------------------------
  13. // FailoverOptions are used to configure a failover client and should
  14. // be passed to NewFailoverClient.
  15. type FailoverOptions struct {
  16. // The master name.
  17. MasterName string
  18. // A seed list of host:port addresses of sentinel nodes.
  19. SentinelAddrs []string
  20. // Following options are copied from Options struct.
  21. OnConnect func(*Conn) error
  22. Password string
  23. DB int
  24. MaxRetries int
  25. MinRetryBackoff time.Duration
  26. MaxRetryBackoff time.Duration
  27. DialTimeout time.Duration
  28. ReadTimeout time.Duration
  29. WriteTimeout time.Duration
  30. PoolSize int
  31. MinIdleConns int
  32. MaxConnAge time.Duration
  33. PoolTimeout time.Duration
  34. IdleTimeout time.Duration
  35. IdleCheckFrequency time.Duration
  36. TLSConfig *tls.Config
  37. }
  38. func (opt *FailoverOptions) options() *Options {
  39. return &Options{
  40. Addr: "FailoverClient",
  41. OnConnect: opt.OnConnect,
  42. DB: opt.DB,
  43. Password: opt.Password,
  44. MaxRetries: opt.MaxRetries,
  45. DialTimeout: opt.DialTimeout,
  46. ReadTimeout: opt.ReadTimeout,
  47. WriteTimeout: opt.WriteTimeout,
  48. PoolSize: opt.PoolSize,
  49. PoolTimeout: opt.PoolTimeout,
  50. IdleTimeout: opt.IdleTimeout,
  51. IdleCheckFrequency: opt.IdleCheckFrequency,
  52. TLSConfig: opt.TLSConfig,
  53. }
  54. }
  55. // NewFailoverClient returns a Redis client that uses Redis Sentinel
  56. // for automatic failover. It's safe for concurrent use by multiple
  57. // goroutines.
  58. func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
  59. opt := failoverOpt.options()
  60. opt.init()
  61. failover := &sentinelFailover{
  62. masterName: failoverOpt.MasterName,
  63. sentinelAddrs: failoverOpt.SentinelAddrs,
  64. opt: opt,
  65. }
  66. c := Client{
  67. baseClient: baseClient{
  68. opt: opt,
  69. connPool: failover.Pool(),
  70. onClose: failover.Close,
  71. },
  72. }
  73. c.baseClient.init()
  74. c.cmdable.setProcessor(c.Process)
  75. return &c
  76. }
  77. //------------------------------------------------------------------------------
  78. type SentinelClient struct {
  79. baseClient
  80. }
  81. func NewSentinelClient(opt *Options) *SentinelClient {
  82. opt.init()
  83. c := &SentinelClient{
  84. baseClient: baseClient{
  85. opt: opt,
  86. connPool: newConnPool(opt),
  87. },
  88. }
  89. c.baseClient.init()
  90. return c
  91. }
  92. func (c *SentinelClient) pubSub() *PubSub {
  93. pubsub := &PubSub{
  94. opt: c.opt,
  95. newConn: func(channels []string) (*pool.Conn, error) {
  96. return c.newConn()
  97. },
  98. closeConn: c.connPool.CloseConn,
  99. }
  100. pubsub.init()
  101. return pubsub
  102. }
  103. // Subscribe subscribes the client to the specified channels.
  104. // Channels can be omitted to create empty subscription.
  105. func (c *SentinelClient) Subscribe(channels ...string) *PubSub {
  106. pubsub := c.pubSub()
  107. if len(channels) > 0 {
  108. _ = pubsub.Subscribe(channels...)
  109. }
  110. return pubsub
  111. }
  112. // PSubscribe subscribes the client to the given patterns.
  113. // Patterns can be omitted to create empty subscription.
  114. func (c *SentinelClient) PSubscribe(channels ...string) *PubSub {
  115. pubsub := c.pubSub()
  116. if len(channels) > 0 {
  117. _ = pubsub.PSubscribe(channels...)
  118. }
  119. return pubsub
  120. }
  121. func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
  122. cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name)
  123. c.Process(cmd)
  124. return cmd
  125. }
  126. func (c *SentinelClient) Sentinels(name string) *SliceCmd {
  127. cmd := NewSliceCmd("sentinel", "sentinels", name)
  128. c.Process(cmd)
  129. return cmd
  130. }
  131. // Failover forces a failover as if the master was not reachable, and without
  132. // asking for agreement to other Sentinels.
  133. func (c *SentinelClient) Failover(name string) *StatusCmd {
  134. cmd := NewStatusCmd("sentinel", "failover", name)
  135. c.Process(cmd)
  136. return cmd
  137. }
  138. // Reset resets all the masters with matching name. The pattern argument is a
  139. // glob-style pattern. The reset process clears any previous state in a master
  140. // (including a failover in progress), and removes every slave and sentinel
  141. // already discovered and associated with the master.
  142. func (c *SentinelClient) Reset(pattern string) *IntCmd {
  143. cmd := NewIntCmd("sentinel", "reset", pattern)
  144. c.Process(cmd)
  145. return cmd
  146. }
  147. // FlushConfig forces Sentinel to rewrite its configuration on disk, including
  148. // the current Sentinel state.
  149. func (c *SentinelClient) FlushConfig() *StatusCmd {
  150. cmd := NewStatusCmd("sentinel", "flushconfig")
  151. c.Process(cmd)
  152. return cmd
  153. }
  154. // Master shows the state and info of the specified master.
  155. func (c *SentinelClient) Master(name string) *StringStringMapCmd {
  156. cmd := NewStringStringMapCmd("sentinel", "master", name)
  157. c.Process(cmd)
  158. return cmd
  159. }
  160. type sentinelFailover struct {
  161. sentinelAddrs []string
  162. opt *Options
  163. pool *pool.ConnPool
  164. poolOnce sync.Once
  165. mu sync.RWMutex
  166. masterName string
  167. _masterAddr string
  168. sentinel *SentinelClient
  169. pubsub *PubSub
  170. }
  171. func (c *sentinelFailover) Close() error {
  172. c.mu.Lock()
  173. defer c.mu.Unlock()
  174. if c.sentinel != nil {
  175. return c.closeSentinel()
  176. }
  177. return nil
  178. }
  179. func (c *sentinelFailover) Pool() *pool.ConnPool {
  180. c.poolOnce.Do(func() {
  181. c.opt.Dialer = c.dial
  182. c.pool = newConnPool(c.opt)
  183. })
  184. return c.pool
  185. }
  186. func (c *sentinelFailover) dial() (net.Conn, error) {
  187. addr, err := c.MasterAddr()
  188. if err != nil {
  189. return nil, err
  190. }
  191. return net.DialTimeout("tcp", addr, c.opt.DialTimeout)
  192. }
  193. func (c *sentinelFailover) MasterAddr() (string, error) {
  194. addr, err := c.masterAddr()
  195. if err != nil {
  196. return "", err
  197. }
  198. c.switchMaster(addr)
  199. return addr, nil
  200. }
  201. func (c *sentinelFailover) masterAddr() (string, error) {
  202. c.mu.RLock()
  203. addr := c.getMasterAddr()
  204. c.mu.RUnlock()
  205. if addr != "" {
  206. return addr, nil
  207. }
  208. c.mu.Lock()
  209. defer c.mu.Unlock()
  210. addr = c.getMasterAddr()
  211. if addr != "" {
  212. return addr, nil
  213. }
  214. if c.sentinel != nil {
  215. c.closeSentinel()
  216. }
  217. for i, sentinelAddr := range c.sentinelAddrs {
  218. sentinel := NewSentinelClient(&Options{
  219. Addr: sentinelAddr,
  220. MaxRetries: c.opt.MaxRetries,
  221. DialTimeout: c.opt.DialTimeout,
  222. ReadTimeout: c.opt.ReadTimeout,
  223. WriteTimeout: c.opt.WriteTimeout,
  224. PoolSize: c.opt.PoolSize,
  225. PoolTimeout: c.opt.PoolTimeout,
  226. IdleTimeout: c.opt.IdleTimeout,
  227. IdleCheckFrequency: c.opt.IdleCheckFrequency,
  228. TLSConfig: c.opt.TLSConfig,
  229. })
  230. masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
  231. if err != nil {
  232. internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
  233. c.masterName, err)
  234. _ = sentinel.Close()
  235. continue
  236. }
  237. // Push working sentinel to the top.
  238. c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
  239. c.setSentinel(sentinel)
  240. addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
  241. return addr, nil
  242. }
  243. return "", errors.New("redis: all sentinels are unreachable")
  244. }
  245. func (c *sentinelFailover) getMasterAddr() string {
  246. sentinel := c.sentinel
  247. if sentinel == nil {
  248. return ""
  249. }
  250. addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
  251. if err != nil {
  252. internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
  253. c.masterName, err)
  254. return ""
  255. }
  256. return net.JoinHostPort(addr[0], addr[1])
  257. }
  258. func (c *sentinelFailover) switchMaster(addr string) {
  259. c.mu.RLock()
  260. masterAddr := c._masterAddr
  261. c.mu.RUnlock()
  262. if masterAddr == addr {
  263. return
  264. }
  265. c.mu.Lock()
  266. defer c.mu.Unlock()
  267. internal.Logf("sentinel: new master=%q addr=%q",
  268. c.masterName, addr)
  269. _ = c.Pool().Filter(func(cn *pool.Conn) bool {
  270. return cn.RemoteAddr().String() != addr
  271. })
  272. c._masterAddr = addr
  273. }
  274. func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
  275. c.discoverSentinels(sentinel)
  276. c.sentinel = sentinel
  277. c.pubsub = sentinel.Subscribe("+switch-master")
  278. go c.listen(c.pubsub)
  279. }
  280. func (c *sentinelFailover) closeSentinel() error {
  281. var firstErr error
  282. err := c.pubsub.Close()
  283. if err != nil && firstErr == err {
  284. firstErr = err
  285. }
  286. c.pubsub = nil
  287. err = c.sentinel.Close()
  288. if err != nil && firstErr == err {
  289. firstErr = err
  290. }
  291. c.sentinel = nil
  292. return firstErr
  293. }
  294. func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
  295. sentinels, err := sentinel.Sentinels(c.masterName).Result()
  296. if err != nil {
  297. internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
  298. return
  299. }
  300. for _, sentinel := range sentinels {
  301. vals := sentinel.([]interface{})
  302. for i := 0; i < len(vals); i += 2 {
  303. key := vals[i].(string)
  304. if key == "name" {
  305. sentinelAddr := vals[i+1].(string)
  306. if !contains(c.sentinelAddrs, sentinelAddr) {
  307. internal.Logf("sentinel: discovered new sentinel=%q for master=%q",
  308. sentinelAddr, c.masterName)
  309. c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
  310. }
  311. }
  312. }
  313. }
  314. }
  315. func (c *sentinelFailover) listen(pubsub *PubSub) {
  316. ch := pubsub.Channel()
  317. for {
  318. msg, ok := <-ch
  319. if !ok {
  320. break
  321. }
  322. if msg.Channel == "+switch-master" {
  323. parts := strings.Split(msg.Payload, " ")
  324. if parts[0] != c.masterName {
  325. internal.Logf("sentinel: ignore addr for master=%q", parts[0])
  326. continue
  327. }
  328. addr := net.JoinHostPort(parts[3], parts[4])
  329. c.switchMaster(addr)
  330. }
  331. }
  332. }
  333. func contains(slice []string, str string) bool {
  334. for _, s := range slice {
  335. if s == str {
  336. return true
  337. }
  338. }
  339. return false
  340. }