123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 |
- package radix
- import (
- "fmt"
- "sync"
- "time"
- )
- type persistentPubSubOpts struct {
- connFn ConnFunc
- abortAfter int
- errCh chan<- error
- }
- // PersistentPubSubOpt is an optional parameter which can be passed into
- // PersistentPubSub in order to affect its behavior.
- type PersistentPubSubOpt func(*persistentPubSubOpts)
- // PersistentPubSubConnFunc causes PersistentPubSub to use the given ConnFunc
- // when connecting to its destination.
- func PersistentPubSubConnFunc(connFn ConnFunc) PersistentPubSubOpt {
- return func(opts *persistentPubSubOpts) {
- opts.connFn = connFn
- }
- }
- // PersistentPubSubAbortAfter changes PersistentPubSub's reconnect behavior.
- // Usually PersistentPubSub will try to reconnect forever upon a disconnect,
- // blocking any methods which have been called until reconnect is successful.
- //
- // When PersistentPubSubAbortAfter is used, it will give up after that many
- // attempts and return the error to the method which has been blocked the
- // longest. Another method will need to be called in order for PersistentPubSub
- // to resume trying to reconnect.
- func PersistentPubSubAbortAfter(attempts int) PersistentPubSubOpt {
- return func(opts *persistentPubSubOpts) {
- opts.abortAfter = attempts
- }
- }
- // PersistentPubSubErrCh takes a channel which asynchronous errors
- // encountered by the PersistentPubSub can be read off of. If the channel blocks
- // the error will be dropped. The channel will be closed when PersistentPubSub
- // is closed.
- func PersistentPubSubErrCh(errCh chan<- error) PersistentPubSubOpt {
- return func(opts *persistentPubSubOpts) {
- opts.errCh = errCh
- }
- }
- type pubSubCmd struct {
- // msgCh can be set along with one of subscribe/unsubscribe/etc...
- msgCh chan<- PubSubMessage
- subscribe, unsubscribe, psubscribe, punsubscribe []string
- // ... or one of ping or close can be set
- ping, close bool
- // resCh is always set
- resCh chan error
- }
- type persistentPubSub struct {
- dial func() (Conn, error)
- opts persistentPubSubOpts
- subs, psubs chanSet
- curr PubSubConn
- currErrCh chan error
- cmdCh chan pubSubCmd
- closeErr error
- closeCh chan struct{}
- closeOnce sync.Once
- }
- // PersistentPubSubWithOpts is like PubSub, but instead of taking in an existing
- // Conn to wrap it will create one on the fly. If the connection is ever
- // terminated then a new one will be created and will be reset to the previous
- // connection's state.
- //
- // This is effectively a way to have a permanent PubSubConn established which
- // supports subscribing/unsubscribing but without the hassle of implementing
- // reconnect/re-subscribe logic.
- //
- // With default options, neither this function nor any of the methods on the
- // returned PubSubConn will ever return an error, they will instead block until
- // a connection can be successfully reinstated.
- //
- // PersistentPubSubWithOpts takes in a number of options which can overwrite its
- // default behavior. The default options PersistentPubSubWithOpts uses are:
- //
- // PersistentPubSubConnFunc(DefaultConnFunc)
- //
- func PersistentPubSubWithOpts(
- network, addr string, options ...PersistentPubSubOpt,
- ) (
- PubSubConn, error,
- ) {
- opts := persistentPubSubOpts{
- connFn: DefaultConnFunc,
- }
- for _, opt := range options {
- opt(&opts)
- }
- p := &persistentPubSub{
- dial: func() (Conn, error) { return opts.connFn(network, addr) },
- opts: opts,
- subs: chanSet{},
- psubs: chanSet{},
- cmdCh: make(chan pubSubCmd),
- closeCh: make(chan struct{}),
- }
- if err := p.refresh(); err != nil {
- return nil, err
- }
- go p.spin()
- return p, nil
- }
- // PersistentPubSub is deprecated in favor of PersistentPubSubWithOpts instead.
- func PersistentPubSub(network, addr string, connFn ConnFunc) PubSubConn {
- var opts []PersistentPubSubOpt
- if connFn != nil {
- opts = append(opts, PersistentPubSubConnFunc(connFn))
- }
- // since PersistentPubSubAbortAfter isn't used, this will never return an
- // error, panic if it does
- p, err := PersistentPubSubWithOpts(network, addr, opts...)
- if err != nil {
- panic(fmt.Sprintf("PersistentPubSubWithOpts impossibly returned an error: %v", err))
- }
- return p
- }
- // refresh only returns an error if the connection could not be made.
- func (p *persistentPubSub) refresh() error {
- if p.curr != nil {
- p.curr.Close()
- <-p.currErrCh
- p.curr = nil
- p.currErrCh = nil
- }
- attempt := func() (PubSubConn, chan error, error) {
- c, err := p.dial()
- if err != nil {
- return nil, nil, err
- }
- errCh := make(chan error, 1)
- pc := newPubSub(c, errCh)
- for msgCh, channels := range p.subs.inverse() {
- if err := pc.Subscribe(msgCh, channels...); err != nil {
- pc.Close()
- return nil, nil, err
- }
- }
- for msgCh, patterns := range p.psubs.inverse() {
- if err := pc.PSubscribe(msgCh, patterns...); err != nil {
- pc.Close()
- return nil, nil, err
- }
- }
- return pc, errCh, nil
- }
- var attempts int
- for {
- var err error
- if p.curr, p.currErrCh, err = attempt(); err == nil {
- return nil
- }
- attempts++
- if p.opts.abortAfter > 0 && attempts >= p.opts.abortAfter {
- return err
- }
- time.Sleep(200 * time.Millisecond)
- }
- }
- func (p *persistentPubSub) execCmd(cmd pubSubCmd) error {
- if p.curr == nil {
- if err := p.refresh(); err != nil {
- return err
- }
- }
- // For all subscribe/unsubscribe/etc... commands the modifications to
- // p.subs/p.psubs are made first, so that if the actual call to curr fails
- // then refresh will still instate the new desired subscription.
- var err error
- switch {
- case len(cmd.subscribe) > 0:
- for _, channel := range cmd.subscribe {
- p.subs.add(channel, cmd.msgCh)
- }
- err = p.curr.Subscribe(cmd.msgCh, cmd.subscribe...)
- case len(cmd.unsubscribe) > 0:
- for _, channel := range cmd.unsubscribe {
- p.subs.del(channel, cmd.msgCh)
- }
- err = p.curr.Unsubscribe(cmd.msgCh, cmd.unsubscribe...)
- case len(cmd.psubscribe) > 0:
- for _, channel := range cmd.psubscribe {
- p.psubs.add(channel, cmd.msgCh)
- }
- err = p.curr.PSubscribe(cmd.msgCh, cmd.psubscribe...)
- case len(cmd.punsubscribe) > 0:
- for _, channel := range cmd.punsubscribe {
- p.psubs.del(channel, cmd.msgCh)
- }
- err = p.curr.PUnsubscribe(cmd.msgCh, cmd.punsubscribe...)
- case cmd.ping:
- err = p.curr.Ping()
- case cmd.close:
- if p.curr != nil {
- err = p.curr.Close()
- <-p.currErrCh
- }
- default:
- // don't do anything I guess
- }
- if err != nil {
- return p.refresh()
- }
- return nil
- }
- func (p *persistentPubSub) err(err error) {
- select {
- case p.opts.errCh <- err:
- default:
- }
- }
- func (p *persistentPubSub) spin() {
- for {
- select {
- case err := <-p.currErrCh:
- p.err(err)
- if err := p.refresh(); err != nil {
- p.err(err)
- }
- case cmd := <-p.cmdCh:
- cmd.resCh <- p.execCmd(cmd)
- if cmd.close {
- return
- }
- }
- }
- }
- func (p *persistentPubSub) cmd(cmd pubSubCmd) error {
- cmd.resCh = make(chan error, 1)
- select {
- case p.cmdCh <- cmd:
- return <-cmd.resCh
- case <-p.closeCh:
- return fmt.Errorf("closed")
- }
- }
- func (p *persistentPubSub) Subscribe(msgCh chan<- PubSubMessage, channels ...string) error {
- return p.cmd(pubSubCmd{
- msgCh: msgCh,
- subscribe: channels,
- })
- }
- func (p *persistentPubSub) Unsubscribe(msgCh chan<- PubSubMessage, channels ...string) error {
- return p.cmd(pubSubCmd{
- msgCh: msgCh,
- unsubscribe: channels,
- })
- }
- func (p *persistentPubSub) PSubscribe(msgCh chan<- PubSubMessage, channels ...string) error {
- return p.cmd(pubSubCmd{
- msgCh: msgCh,
- psubscribe: channels,
- })
- }
- func (p *persistentPubSub) PUnsubscribe(msgCh chan<- PubSubMessage, channels ...string) error {
- return p.cmd(pubSubCmd{
- msgCh: msgCh,
- punsubscribe: channels,
- })
- }
- func (p *persistentPubSub) Ping() error {
- return p.cmd(pubSubCmd{ping: true})
- }
- func (p *persistentPubSub) Close() error {
- p.closeOnce.Do(func() {
- p.closeErr = p.cmd(pubSubCmd{close: true})
- close(p.closeCh)
- if p.opts.errCh != nil {
- close(p.opts.errCh)
- }
- })
- return p.closeErr
- }
|