sentinel.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. package radix
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. )
  9. type sentinelOpts struct {
  10. cf ConnFunc
  11. pf ClientFunc
  12. }
  13. // SentinelOpt is an optional behavior which can be applied to the NewSentinel
  14. // function to effect a Sentinel's behavior.
  15. type SentinelOpt func(*sentinelOpts)
  16. // SentinelConnFunc tells the Sentinel to use the given ConnFunc when connecting
  17. // to sentinel instances.
  18. //
  19. // NOTE that if SentinelConnFunc is not used then Sentinel will attempt to
  20. // retrieve AUTH and SELECT information from the address provided to
  21. // NewSentinel, and use that for dialing all Sentinels. If SentinelConnFunc is
  22. // provided, however, those options must be given through
  23. // DialAuthPass/DialSelectDB within the ConnFunc.
  24. func SentinelConnFunc(cf ConnFunc) SentinelOpt {
  25. return func(so *sentinelOpts) {
  26. so.cf = cf
  27. }
  28. }
  29. // SentinelPoolFunc tells the Sentinel to use the given ClientFunc when creating
  30. // a pool of connections to the sentinel's primary.
  31. func SentinelPoolFunc(pf ClientFunc) SentinelOpt {
  32. return func(so *sentinelOpts) {
  33. so.pf = pf
  34. }
  35. }
  36. // Sentinel is a Client which, in the background, connects to an available
  37. // sentinel node and handles all of the following:
  38. //
  39. // * Creates a pool to the current primary instance, as advertised by the
  40. // sentinel
  41. //
  42. // * Listens for events indicating the primary has changed, and automatically
  43. // creates a new Client to the new primary
  44. //
  45. // * Keeps track of other sentinels in the cluster, and uses them if the
  46. // currently connected one becomes unreachable.
  47. //
  48. type Sentinel struct {
  49. so sentinelOpts
  50. initAddrs []string
  51. name string
  52. // we read lock when calling methods on prim, and normal lock when swapping
  53. // the value of prim, primAddr, and sentAddrs
  54. l sync.RWMutex
  55. primAddr string
  56. clients map[string]Client
  57. sentinelAddrs map[string]bool // the known sentinel addresses
  58. // We use a persistent PubSubConn here, so we don't need to do much after
  59. // initialization. The pconn is only really kept around for closing
  60. pconn PubSubConn
  61. pconnCh chan PubSubMessage
  62. // Any errors encountered internally will be written to this channel. If
  63. // nothing is reading the channel the errors will be dropped. The channel
  64. // will be closed when the Close is called.
  65. ErrCh chan error
  66. closeCh chan bool
  67. closeWG sync.WaitGroup
  68. closeOnce sync.Once
  69. // only used by tests to ensure certain actions have happened before
  70. // continuing on during the test
  71. testEventCh chan string
  72. // only used by tests to delay updates after event on pconnCh
  73. // contains time in milliseconds
  74. testSleepBeforeSwitch uint32
  75. }
  76. // NewSentinel creates and returns a *Sentinel instance. NewSentinel takes in a
  77. // number of options which can overwrite its default behavior. The default
  78. // options NewSentinel uses are:
  79. //
  80. // SentinelConnFunc(DefaultConnFunc)
  81. // SentinelPoolFunc(DefaultClientFunc)
  82. //
  83. func NewSentinel(primaryName string, sentinelAddrs []string, opts ...SentinelOpt) (*Sentinel, error) {
  84. addrs := map[string]bool{}
  85. for _, addr := range sentinelAddrs {
  86. addrs[addr] = true
  87. }
  88. sc := &Sentinel{
  89. initAddrs: sentinelAddrs,
  90. name: primaryName,
  91. sentinelAddrs: addrs,
  92. pconnCh: make(chan PubSubMessage, 1),
  93. ErrCh: make(chan error, 1),
  94. closeCh: make(chan bool),
  95. testEventCh: make(chan string, 1),
  96. }
  97. // If the given sentinelAddrs have AUTH/SELECT info encoded into them then
  98. // use that for all sentinel connections going forward (unless overwritten
  99. // by a SentinelConnFunc in opts).
  100. sc.so.cf = wrapDefaultConnFunc(sentinelAddrs[0])
  101. defaultSentinelOpts := []SentinelOpt{
  102. SentinelPoolFunc(DefaultClientFunc),
  103. }
  104. for _, opt := range append(defaultSentinelOpts, opts...) {
  105. // the other args to NewSentinel used to be a ConnFunc and a ClientFunc,
  106. // which someone might have left as nil, in which case this now gives a
  107. // weird panic. Just handle it
  108. if opt != nil {
  109. opt(&(sc.so))
  110. }
  111. }
  112. // first thing is to retrieve the state and create a pool using the first
  113. // connectable connection. This connection is only used during
  114. // initialization, it gets closed right after
  115. {
  116. conn, err := sc.dialSentinel()
  117. if err != nil {
  118. return nil, err
  119. }
  120. defer conn.Close()
  121. if err := sc.ensureSentinelAddrs(conn); err != nil {
  122. return nil, err
  123. } else if err := sc.ensureClients(conn); err != nil {
  124. return nil, err
  125. }
  126. }
  127. // because we're using persistent these can't _really_ fail
  128. sc.pconn = PersistentPubSub("", "", func(_, _ string) (Conn, error) {
  129. return sc.dialSentinel()
  130. })
  131. // persistent pubsub doesn't return errors
  132. _ = sc.pconn.Subscribe(sc.pconnCh, "switch-master")
  133. sc.closeWG.Add(1)
  134. go sc.spin()
  135. return sc, nil
  136. }
  137. func (sc *Sentinel) err(err error) {
  138. select {
  139. case sc.ErrCh <- err:
  140. default:
  141. }
  142. }
  143. func (sc *Sentinel) testEvent(event string) {
  144. select {
  145. case sc.testEventCh <- event:
  146. default:
  147. }
  148. }
  149. func (sc *Sentinel) dialSentinel() (Conn, error) {
  150. sc.l.RLock()
  151. defer sc.l.RUnlock()
  152. var conn Conn
  153. var err error
  154. for addr := range sc.sentinelAddrs {
  155. conn, err = sc.so.cf("tcp", addr)
  156. if err == nil {
  157. return conn, nil
  158. }
  159. }
  160. // try the initAddrs as a last ditch, but don't return their error if this
  161. // doesn't work
  162. for _, addr := range sc.initAddrs {
  163. if conn, err := sc.so.cf("tcp", addr); err == nil {
  164. return conn, nil
  165. }
  166. }
  167. return nil, err
  168. }
  169. // Do implements the method for the Client interface. It will pass the given
  170. // action on to the current primary.
  171. //
  172. // NOTE it's possible that in between Do being called and the Action being
  173. // actually carried out that there could be a failover event. In that case, the
  174. // Action will likely fail and return an error.
  175. func (sc *Sentinel) Do(a Action) error {
  176. sc.l.RLock()
  177. client := sc.clients[sc.primAddr]
  178. sc.l.RUnlock()
  179. return client.Do(a)
  180. }
  181. // DoSecondary is like Do but executes the Action on a random replica if possible.
  182. //
  183. // For DoSecondary to work, replicas must be configured with replica-read-only
  184. // enabled, otherwise calls to DoSecondary may by rejected by the replica.
  185. //
  186. // NOTE it's possible that in between DoSecondary being called and the Action being
  187. // actually carried out that there could be a failover event. In that case, the
  188. // Action will likely fail and return an error.
  189. func (sc *Sentinel) DoSecondary(a Action) error {
  190. c, err := sc.clientInner("")
  191. if err != nil {
  192. return err
  193. }
  194. return c.Do(a)
  195. }
  196. // Addrs returns the currently known network address of the current primary
  197. // instance and the addresses of the secondaries.
  198. func (sc *Sentinel) Addrs() (string, []string) {
  199. sc.l.RLock()
  200. defer sc.l.RUnlock()
  201. secAddrs := make([]string, 0, len(sc.clients))
  202. for addr := range sc.clients {
  203. if addr == sc.primAddr {
  204. continue
  205. }
  206. secAddrs = append(secAddrs, addr)
  207. }
  208. return sc.primAddr, secAddrs
  209. }
  210. // SentinelAddrs returns the addresses of all known sentinels.
  211. func (sc *Sentinel) SentinelAddrs() []string {
  212. sc.l.RLock()
  213. defer sc.l.RUnlock()
  214. sentAddrs := make([]string, 0, len(sc.sentinelAddrs))
  215. for addr := range sc.sentinelAddrs {
  216. sentAddrs = append(sentAddrs, addr)
  217. }
  218. return sentAddrs
  219. }
  220. // Client returns a Client for the given address, which could be either the
  221. // primary or one of the secondaries (see Addrs method for retrieving known
  222. // addresses).
  223. //
  224. // NOTE that if there is a failover while a Client returned by this method is
  225. // being used the Client may or may not continue to work as expected, depending
  226. // on the nature of the failover.
  227. //
  228. // NOTE the Client should _not_ be closed.
  229. func (sc *Sentinel) Client(addr string) (Client, error) {
  230. if addr == "" {
  231. return nil, errUnknownAddress
  232. }
  233. return sc.clientInner(addr)
  234. }
  235. func (sc *Sentinel) clientInner(addr string) (Client, error) {
  236. var client Client
  237. sc.l.RLock()
  238. if addr == "" {
  239. for addr, client = range sc.clients {
  240. if addr != sc.primAddr {
  241. break
  242. }
  243. }
  244. } else {
  245. var ok bool
  246. if client, ok = sc.clients[addr]; !ok {
  247. return nil, errUnknownAddress
  248. }
  249. }
  250. sc.l.RUnlock()
  251. if client != nil {
  252. return client, nil
  253. }
  254. // if client was nil but ok was true it means the address is a secondary but
  255. // a Client for it has never been created. Create one now and store it into
  256. // clients.
  257. newClient, err := sc.so.pf("tcp", addr)
  258. if err != nil {
  259. return nil, err
  260. }
  261. // two routines might be requesting the same addr at the same time, and
  262. // both create the client. The second one needs to make sure it closes its
  263. // own pool when it sees the other got there first.
  264. sc.l.Lock()
  265. if client = sc.clients[addr]; client == nil {
  266. sc.clients[addr] = newClient
  267. }
  268. sc.l.Unlock()
  269. if client != nil {
  270. newClient.Close()
  271. return client, nil
  272. }
  273. return newClient, nil
  274. }
  275. // Close implements the method for the Client interface.
  276. func (sc *Sentinel) Close() error {
  277. closeErr := errClientClosed
  278. sc.closeOnce.Do(func() {
  279. close(sc.closeCh)
  280. sc.closeWG.Wait()
  281. closeErr = nil
  282. sc.l.Lock()
  283. defer sc.l.Unlock()
  284. for _, client := range sc.clients {
  285. if client != nil {
  286. client.Close()
  287. }
  288. }
  289. })
  290. return closeErr
  291. }
  292. // cmd should be the command called which generated m.
  293. func sentinelMtoAddr(m map[string]string, cmd string) (string, error) {
  294. if m["ip"] == "" || m["port"] == "" {
  295. return "", fmt.Errorf("malformed %q response: %#v", cmd, m)
  296. }
  297. return net.JoinHostPort(m["ip"], m["port"]), nil
  298. }
  299. // given a connection to a sentinel, ensures that the Clients currently being
  300. // held agrees with what the sentinel thinks they should be.
  301. func (sc *Sentinel) ensureClients(conn Conn) error {
  302. var primM map[string]string
  303. var secMM []map[string]string
  304. if err := conn.Do(Pipeline(
  305. Cmd(&primM, "SENTINEL", "MASTER", sc.name),
  306. Cmd(&secMM, "SENTINEL", "SLAVES", sc.name),
  307. )); err != nil {
  308. return err
  309. }
  310. newPrimAddr, err := sentinelMtoAddr(primM, "SENTINEL MASTER")
  311. if err != nil {
  312. return err
  313. }
  314. newClients := map[string]Client{newPrimAddr: nil}
  315. for _, secM := range secMM {
  316. newSecAddr, err := sentinelMtoAddr(secM, "SENTINEL SLAVES")
  317. if err != nil {
  318. return err
  319. }
  320. newClients[newSecAddr] = nil
  321. }
  322. return sc.setClients(newPrimAddr, newClients)
  323. }
  324. // all values of newClients should be nil.
  325. func (sc *Sentinel) setClients(newPrimAddr string, newClients map[string]Client) error {
  326. newClients[newPrimAddr] = nil
  327. var toClose []Client
  328. sc.l.RLock()
  329. // stateChanged may be set to true in other ways later in the method
  330. stateChanged := sc.primAddr != newPrimAddr
  331. // for each actual Client instance in sc.client, either move it over to
  332. // newClients (if the address is shared) or make sure it is closed
  333. for addr, client := range sc.clients {
  334. if client == nil {
  335. // do nothing
  336. } else if _, ok := newClients[addr]; ok {
  337. newClients[addr] = client
  338. } else {
  339. toClose = append(toClose, client)
  340. }
  341. // separately, if the newClients doesn't have address it means the state
  342. // has changed
  343. if _, ok := newClients[addr]; !ok {
  344. stateChanged = true
  345. }
  346. }
  347. // this is only checks if a client was added so we know the replica set
  348. // state has changed later in the method.
  349. for addr := range newClients {
  350. if _, ok := sc.clients[addr]; !ok {
  351. stateChanged = true
  352. }
  353. }
  354. sc.l.RUnlock()
  355. if !stateChanged {
  356. return nil
  357. }
  358. // if the primary doesn't have a client created, create it here outside the
  359. // lock where it won't block everything else
  360. if newClients[newPrimAddr] == nil {
  361. var err error
  362. if newClients[newPrimAddr], err = sc.so.pf("tcp", newPrimAddr); err != nil {
  363. return err
  364. }
  365. }
  366. sc.l.Lock()
  367. sc.primAddr = newPrimAddr
  368. sc.clients = newClients
  369. sc.l.Unlock()
  370. for _, client := range toClose {
  371. client.Close()
  372. }
  373. return nil
  374. }
  375. // annoyingly the SENTINEL SENTINELS <name> command doesn't return _this_
  376. // sentinel instance, only the others it knows about for that primary.
  377. func (sc *Sentinel) ensureSentinelAddrs(conn Conn) error {
  378. var mm []map[string]string
  379. err := conn.Do(Cmd(&mm, "SENTINEL", "SENTINELS", sc.name))
  380. if err != nil {
  381. return err
  382. }
  383. addrs := map[string]bool{conn.NetConn().RemoteAddr().String(): true}
  384. for _, m := range mm {
  385. addrs[net.JoinHostPort(m["ip"], m["port"])] = true
  386. }
  387. sc.l.Lock()
  388. sc.sentinelAddrs = addrs
  389. sc.l.Unlock()
  390. return nil
  391. }
  392. func (sc *Sentinel) spin() {
  393. defer sc.closeWG.Done()
  394. defer sc.pconn.Close()
  395. for {
  396. if err := sc.innerSpin(); err != nil {
  397. sc.err(err)
  398. // sleep a second so we don't end up in a tight loop
  399. time.Sleep(1 * time.Second)
  400. }
  401. // This also gets checked within innerSpin to short-circuit that, but
  402. // we also must check in here to short-circuit this
  403. select {
  404. case <-sc.closeCh:
  405. return
  406. default:
  407. }
  408. }
  409. }
  410. // makes connection to an address in sc.addrs and handles
  411. // the sentinel until that connection goes bad.
  412. //
  413. // Things this handles:
  414. // * Listening for switch-master events (from pconn, which has reconnect logic
  415. // external to this package)
  416. // * Periodically re-ensuring that the list of sentinel addresses is up-to-date
  417. // * Periodically re-checking the current primary, in case the switch-master was
  418. // missed somehow
  419. func (sc *Sentinel) innerSpin() error {
  420. conn, err := sc.dialSentinel()
  421. if err != nil {
  422. return err
  423. }
  424. defer conn.Close()
  425. tick := time.NewTicker(5 * time.Second)
  426. defer tick.Stop()
  427. var switchMaster bool
  428. for {
  429. if err := sc.ensureSentinelAddrs(conn); err != nil {
  430. return err
  431. } else if err := sc.ensureClients(conn); err != nil {
  432. return err
  433. }
  434. // persistent pubsub methods don't return an error
  435. _ = sc.pconn.Ping()
  436. // the tests want to know when the client state has been updated due to
  437. // a switch-master event
  438. if switchMaster {
  439. sc.testEvent("switch-master completed")
  440. switchMaster = false
  441. }
  442. select {
  443. case <-tick.C:
  444. // loop
  445. case <-sc.pconnCh:
  446. switchMaster = true
  447. if waitFor := atomic.SwapUint32(&sc.testSleepBeforeSwitch, 0); waitFor > 0 {
  448. time.Sleep(time.Duration(waitFor) * time.Millisecond)
  449. }
  450. // loop
  451. case <-sc.closeCh:
  452. return nil
  453. }
  454. }
  455. }
  456. func (sc *Sentinel) forceMasterSwitch(waitFor time.Duration) {
  457. // can not use waitFor.Milliseconds() here since it was only introduced in Go 1.13 and we still support 1.12
  458. atomic.StoreUint32(&sc.testSleepBeforeSwitch, uint32(waitFor.Nanoseconds()/1e6))
  459. sc.pconnCh <- PubSubMessage{}
  460. }