123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528 |
- package radix
- import (
- "fmt"
- "net"
- "sync"
- "sync/atomic"
- "time"
- )
- type sentinelOpts struct {
- cf ConnFunc
- pf ClientFunc
- }
- // SentinelOpt is an optional behavior which can be applied to the NewSentinel
- // function to effect a Sentinel's behavior.
- type SentinelOpt func(*sentinelOpts)
- // SentinelConnFunc tells the Sentinel to use the given ConnFunc when connecting
- // to sentinel instances.
- //
- // NOTE that if SentinelConnFunc is not used then Sentinel will attempt to
- // retrieve AUTH and SELECT information from the address provided to
- // NewSentinel, and use that for dialing all Sentinels. If SentinelConnFunc is
- // provided, however, those options must be given through
- // DialAuthPass/DialSelectDB within the ConnFunc.
- func SentinelConnFunc(cf ConnFunc) SentinelOpt {
- return func(so *sentinelOpts) {
- so.cf = cf
- }
- }
- // SentinelPoolFunc tells the Sentinel to use the given ClientFunc when creating
- // a pool of connections to the sentinel's primary.
- func SentinelPoolFunc(pf ClientFunc) SentinelOpt {
- return func(so *sentinelOpts) {
- so.pf = pf
- }
- }
- // Sentinel is a Client which, in the background, connects to an available
- // sentinel node and handles all of the following:
- //
- // * Creates a pool to the current primary instance, as advertised by the
- // sentinel
- //
- // * Listens for events indicating the primary has changed, and automatically
- // creates a new Client to the new primary
- //
- // * Keeps track of other sentinels in the cluster, and uses them if the
- // currently connected one becomes unreachable.
- //
- type Sentinel struct {
- so sentinelOpts
- initAddrs []string
- name string
- // we read lock when calling methods on prim, and normal lock when swapping
- // the value of prim, primAddr, and sentAddrs
- l sync.RWMutex
- primAddr string
- clients map[string]Client
- sentinelAddrs map[string]bool // the known sentinel addresses
- // We use a persistent PubSubConn here, so we don't need to do much after
- // initialization. The pconn is only really kept around for closing
- pconn PubSubConn
- pconnCh chan PubSubMessage
- // Any errors encountered internally will be written to this channel. If
- // nothing is reading the channel the errors will be dropped. The channel
- // will be closed when the Close is called.
- ErrCh chan error
- closeCh chan bool
- closeWG sync.WaitGroup
- closeOnce sync.Once
- // only used by tests to ensure certain actions have happened before
- // continuing on during the test
- testEventCh chan string
- // only used by tests to delay updates after event on pconnCh
- // contains time in milliseconds
- testSleepBeforeSwitch uint32
- }
- // NewSentinel creates and returns a *Sentinel instance. NewSentinel takes in a
- // number of options which can overwrite its default behavior. The default
- // options NewSentinel uses are:
- //
- // SentinelConnFunc(DefaultConnFunc)
- // SentinelPoolFunc(DefaultClientFunc)
- //
- func NewSentinel(primaryName string, sentinelAddrs []string, opts ...SentinelOpt) (*Sentinel, error) {
- addrs := map[string]bool{}
- for _, addr := range sentinelAddrs {
- addrs[addr] = true
- }
- sc := &Sentinel{
- initAddrs: sentinelAddrs,
- name: primaryName,
- sentinelAddrs: addrs,
- pconnCh: make(chan PubSubMessage, 1),
- ErrCh: make(chan error, 1),
- closeCh: make(chan bool),
- testEventCh: make(chan string, 1),
- }
- // If the given sentinelAddrs have AUTH/SELECT info encoded into them then
- // use that for all sentinel connections going forward (unless overwritten
- // by a SentinelConnFunc in opts).
- sc.so.cf = wrapDefaultConnFunc(sentinelAddrs[0])
- defaultSentinelOpts := []SentinelOpt{
- SentinelPoolFunc(DefaultClientFunc),
- }
- for _, opt := range append(defaultSentinelOpts, opts...) {
- // the other args to NewSentinel used to be a ConnFunc and a ClientFunc,
- // which someone might have left as nil, in which case this now gives a
- // weird panic. Just handle it
- if opt != nil {
- opt(&(sc.so))
- }
- }
- // first thing is to retrieve the state and create a pool using the first
- // connectable connection. This connection is only used during
- // initialization, it gets closed right after
- {
- conn, err := sc.dialSentinel()
- if err != nil {
- return nil, err
- }
- defer conn.Close()
- if err := sc.ensureSentinelAddrs(conn); err != nil {
- return nil, err
- } else if err := sc.ensureClients(conn); err != nil {
- return nil, err
- }
- }
- // because we're using persistent these can't _really_ fail
- sc.pconn = PersistentPubSub("", "", func(_, _ string) (Conn, error) {
- return sc.dialSentinel()
- })
- // persistent pubsub doesn't return errors
- _ = sc.pconn.Subscribe(sc.pconnCh, "switch-master")
- sc.closeWG.Add(1)
- go sc.spin()
- return sc, nil
- }
- func (sc *Sentinel) err(err error) {
- select {
- case sc.ErrCh <- err:
- default:
- }
- }
- func (sc *Sentinel) testEvent(event string) {
- select {
- case sc.testEventCh <- event:
- default:
- }
- }
- func (sc *Sentinel) dialSentinel() (Conn, error) {
- sc.l.RLock()
- defer sc.l.RUnlock()
- var conn Conn
- var err error
- for addr := range sc.sentinelAddrs {
- conn, err = sc.so.cf("tcp", addr)
- if err == nil {
- return conn, nil
- }
- }
- // try the initAddrs as a last ditch, but don't return their error if this
- // doesn't work
- for _, addr := range sc.initAddrs {
- if conn, err := sc.so.cf("tcp", addr); err == nil {
- return conn, nil
- }
- }
- return nil, err
- }
- // Do implements the method for the Client interface. It will pass the given
- // action on to the current primary.
- //
- // NOTE it's possible that in between Do being called and the Action being
- // actually carried out that there could be a failover event. In that case, the
- // Action will likely fail and return an error.
- func (sc *Sentinel) Do(a Action) error {
- sc.l.RLock()
- client := sc.clients[sc.primAddr]
- sc.l.RUnlock()
- return client.Do(a)
- }
- // DoSecondary is like Do but executes the Action on a random replica if possible.
- //
- // For DoSecondary to work, replicas must be configured with replica-read-only
- // enabled, otherwise calls to DoSecondary may by rejected by the replica.
- //
- // NOTE it's possible that in between DoSecondary being called and the Action being
- // actually carried out that there could be a failover event. In that case, the
- // Action will likely fail and return an error.
- func (sc *Sentinel) DoSecondary(a Action) error {
- c, err := sc.clientInner("")
- if err != nil {
- return err
- }
- return c.Do(a)
- }
- // Addrs returns the currently known network address of the current primary
- // instance and the addresses of the secondaries.
- func (sc *Sentinel) Addrs() (string, []string) {
- sc.l.RLock()
- defer sc.l.RUnlock()
- secAddrs := make([]string, 0, len(sc.clients))
- for addr := range sc.clients {
- if addr == sc.primAddr {
- continue
- }
- secAddrs = append(secAddrs, addr)
- }
- return sc.primAddr, secAddrs
- }
- // SentinelAddrs returns the addresses of all known sentinels.
- func (sc *Sentinel) SentinelAddrs() []string {
- sc.l.RLock()
- defer sc.l.RUnlock()
- sentAddrs := make([]string, 0, len(sc.sentinelAddrs))
- for addr := range sc.sentinelAddrs {
- sentAddrs = append(sentAddrs, addr)
- }
- return sentAddrs
- }
- // Client returns a Client for the given address, which could be either the
- // primary or one of the secondaries (see Addrs method for retrieving known
- // addresses).
- //
- // NOTE that if there is a failover while a Client returned by this method is
- // being used the Client may or may not continue to work as expected, depending
- // on the nature of the failover.
- //
- // NOTE the Client should _not_ be closed.
- func (sc *Sentinel) Client(addr string) (Client, error) {
- if addr == "" {
- return nil, errUnknownAddress
- }
- return sc.clientInner(addr)
- }
- func (sc *Sentinel) clientInner(addr string) (Client, error) {
- var client Client
- sc.l.RLock()
- if addr == "" {
- for addr, client = range sc.clients {
- if addr != sc.primAddr {
- break
- }
- }
- } else {
- var ok bool
- if client, ok = sc.clients[addr]; !ok {
- return nil, errUnknownAddress
- }
- }
- sc.l.RUnlock()
- if client != nil {
- return client, nil
- }
- // if client was nil but ok was true it means the address is a secondary but
- // a Client for it has never been created. Create one now and store it into
- // clients.
- newClient, err := sc.so.pf("tcp", addr)
- if err != nil {
- return nil, err
- }
- // two routines might be requesting the same addr at the same time, and
- // both create the client. The second one needs to make sure it closes its
- // own pool when it sees the other got there first.
- sc.l.Lock()
- if client = sc.clients[addr]; client == nil {
- sc.clients[addr] = newClient
- }
- sc.l.Unlock()
- if client != nil {
- newClient.Close()
- return client, nil
- }
- return newClient, nil
- }
- // Close implements the method for the Client interface.
- func (sc *Sentinel) Close() error {
- closeErr := errClientClosed
- sc.closeOnce.Do(func() {
- close(sc.closeCh)
- sc.closeWG.Wait()
- closeErr = nil
- sc.l.Lock()
- defer sc.l.Unlock()
- for _, client := range sc.clients {
- if client != nil {
- client.Close()
- }
- }
- })
- return closeErr
- }
- // cmd should be the command called which generated m.
- func sentinelMtoAddr(m map[string]string, cmd string) (string, error) {
- if m["ip"] == "" || m["port"] == "" {
- return "", fmt.Errorf("malformed %q response: %#v", cmd, m)
- }
- return net.JoinHostPort(m["ip"], m["port"]), nil
- }
- // given a connection to a sentinel, ensures that the Clients currently being
- // held agrees with what the sentinel thinks they should be.
- func (sc *Sentinel) ensureClients(conn Conn) error {
- var primM map[string]string
- var secMM []map[string]string
- if err := conn.Do(Pipeline(
- Cmd(&primM, "SENTINEL", "MASTER", sc.name),
- Cmd(&secMM, "SENTINEL", "SLAVES", sc.name),
- )); err != nil {
- return err
- }
- newPrimAddr, err := sentinelMtoAddr(primM, "SENTINEL MASTER")
- if err != nil {
- return err
- }
- newClients := map[string]Client{newPrimAddr: nil}
- for _, secM := range secMM {
- newSecAddr, err := sentinelMtoAddr(secM, "SENTINEL SLAVES")
- if err != nil {
- return err
- }
- newClients[newSecAddr] = nil
- }
- return sc.setClients(newPrimAddr, newClients)
- }
- // all values of newClients should be nil.
- func (sc *Sentinel) setClients(newPrimAddr string, newClients map[string]Client) error {
- newClients[newPrimAddr] = nil
- var toClose []Client
- sc.l.RLock()
- // stateChanged may be set to true in other ways later in the method
- stateChanged := sc.primAddr != newPrimAddr
- // for each actual Client instance in sc.client, either move it over to
- // newClients (if the address is shared) or make sure it is closed
- for addr, client := range sc.clients {
- if client == nil {
- // do nothing
- } else if _, ok := newClients[addr]; ok {
- newClients[addr] = client
- } else {
- toClose = append(toClose, client)
- }
- // separately, if the newClients doesn't have address it means the state
- // has changed
- if _, ok := newClients[addr]; !ok {
- stateChanged = true
- }
- }
- // this is only checks if a client was added so we know the replica set
- // state has changed later in the method.
- for addr := range newClients {
- if _, ok := sc.clients[addr]; !ok {
- stateChanged = true
- }
- }
- sc.l.RUnlock()
- if !stateChanged {
- return nil
- }
- // if the primary doesn't have a client created, create it here outside the
- // lock where it won't block everything else
- if newClients[newPrimAddr] == nil {
- var err error
- if newClients[newPrimAddr], err = sc.so.pf("tcp", newPrimAddr); err != nil {
- return err
- }
- }
- sc.l.Lock()
- sc.primAddr = newPrimAddr
- sc.clients = newClients
- sc.l.Unlock()
- for _, client := range toClose {
- client.Close()
- }
- return nil
- }
- // annoyingly the SENTINEL SENTINELS <name> command doesn't return _this_
- // sentinel instance, only the others it knows about for that primary.
- func (sc *Sentinel) ensureSentinelAddrs(conn Conn) error {
- var mm []map[string]string
- err := conn.Do(Cmd(&mm, "SENTINEL", "SENTINELS", sc.name))
- if err != nil {
- return err
- }
- addrs := map[string]bool{conn.NetConn().RemoteAddr().String(): true}
- for _, m := range mm {
- addrs[net.JoinHostPort(m["ip"], m["port"])] = true
- }
- sc.l.Lock()
- sc.sentinelAddrs = addrs
- sc.l.Unlock()
- return nil
- }
- func (sc *Sentinel) spin() {
- defer sc.closeWG.Done()
- defer sc.pconn.Close()
- for {
- if err := sc.innerSpin(); err != nil {
- sc.err(err)
- // sleep a second so we don't end up in a tight loop
- time.Sleep(1 * time.Second)
- }
- // This also gets checked within innerSpin to short-circuit that, but
- // we also must check in here to short-circuit this
- select {
- case <-sc.closeCh:
- return
- default:
- }
- }
- }
- // makes connection to an address in sc.addrs and handles
- // the sentinel until that connection goes bad.
- //
- // Things this handles:
- // * Listening for switch-master events (from pconn, which has reconnect logic
- // external to this package)
- // * Periodically re-ensuring that the list of sentinel addresses is up-to-date
- // * Periodically re-checking the current primary, in case the switch-master was
- // missed somehow
- func (sc *Sentinel) innerSpin() error {
- conn, err := sc.dialSentinel()
- if err != nil {
- return err
- }
- defer conn.Close()
- tick := time.NewTicker(5 * time.Second)
- defer tick.Stop()
- var switchMaster bool
- for {
- if err := sc.ensureSentinelAddrs(conn); err != nil {
- return err
- } else if err := sc.ensureClients(conn); err != nil {
- return err
- }
- // persistent pubsub methods don't return an error
- _ = sc.pconn.Ping()
- // the tests want to know when the client state has been updated due to
- // a switch-master event
- if switchMaster {
- sc.testEvent("switch-master completed")
- switchMaster = false
- }
- select {
- case <-tick.C:
- // loop
- case <-sc.pconnCh:
- switchMaster = true
- if waitFor := atomic.SwapUint32(&sc.testSleepBeforeSwitch, 0); waitFor > 0 {
- time.Sleep(time.Duration(waitFor) * time.Millisecond)
- }
- // loop
- case <-sc.closeCh:
- return nil
- }
- }
- }
- func (sc *Sentinel) forceMasterSwitch(waitFor time.Duration) {
- // can not use waitFor.Milliseconds() here since it was only introduced in Go 1.13 and we still support 1.12
- atomic.StoreUint32(&sc.testSleepBeforeSwitch, uint32(waitFor.Nanoseconds()/1e6))
- sc.pconnCh <- PubSubMessage{}
- }
|