pubsub_persistent.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. package radix
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. type persistentPubSubOpts struct {
  8. connFn ConnFunc
  9. abortAfter int
  10. errCh chan<- error
  11. }
  12. // PersistentPubSubOpt is an optional parameter which can be passed into
  13. // PersistentPubSub in order to affect its behavior.
  14. type PersistentPubSubOpt func(*persistentPubSubOpts)
  15. // PersistentPubSubConnFunc causes PersistentPubSub to use the given ConnFunc
  16. // when connecting to its destination.
  17. func PersistentPubSubConnFunc(connFn ConnFunc) PersistentPubSubOpt {
  18. return func(opts *persistentPubSubOpts) {
  19. opts.connFn = connFn
  20. }
  21. }
  22. // PersistentPubSubAbortAfter changes PersistentPubSub's reconnect behavior.
  23. // Usually PersistentPubSub will try to reconnect forever upon a disconnect,
  24. // blocking any methods which have been called until reconnect is successful.
  25. //
  26. // When PersistentPubSubAbortAfter is used, it will give up after that many
  27. // attempts and return the error to the method which has been blocked the
  28. // longest. Another method will need to be called in order for PersistentPubSub
  29. // to resume trying to reconnect.
  30. func PersistentPubSubAbortAfter(attempts int) PersistentPubSubOpt {
  31. return func(opts *persistentPubSubOpts) {
  32. opts.abortAfter = attempts
  33. }
  34. }
  35. // PersistentPubSubErrCh takes a channel which asynchronous errors
  36. // encountered by the PersistentPubSub can be read off of. If the channel blocks
  37. // the error will be dropped. The channel will be closed when PersistentPubSub
  38. // is closed.
  39. func PersistentPubSubErrCh(errCh chan<- error) PersistentPubSubOpt {
  40. return func(opts *persistentPubSubOpts) {
  41. opts.errCh = errCh
  42. }
  43. }
  44. type pubSubCmd struct {
  45. // msgCh can be set along with one of subscribe/unsubscribe/etc...
  46. msgCh chan<- PubSubMessage
  47. subscribe, unsubscribe, psubscribe, punsubscribe []string
  48. // ... or one of ping or close can be set
  49. ping, close bool
  50. // resCh is always set
  51. resCh chan error
  52. }
  53. type persistentPubSub struct {
  54. dial func() (Conn, error)
  55. opts persistentPubSubOpts
  56. subs, psubs chanSet
  57. curr PubSubConn
  58. currErrCh chan error
  59. cmdCh chan pubSubCmd
  60. closeErr error
  61. closeCh chan struct{}
  62. closeOnce sync.Once
  63. }
  64. // PersistentPubSubWithOpts is like PubSub, but instead of taking in an existing
  65. // Conn to wrap it will create one on the fly. If the connection is ever
  66. // terminated then a new one will be created and will be reset to the previous
  67. // connection's state.
  68. //
  69. // This is effectively a way to have a permanent PubSubConn established which
  70. // supports subscribing/unsubscribing but without the hassle of implementing
  71. // reconnect/re-subscribe logic.
  72. //
  73. // With default options, neither this function nor any of the methods on the
  74. // returned PubSubConn will ever return an error, they will instead block until
  75. // a connection can be successfully reinstated.
  76. //
  77. // PersistentPubSubWithOpts takes in a number of options which can overwrite its
  78. // default behavior. The default options PersistentPubSubWithOpts uses are:
  79. //
  80. // PersistentPubSubConnFunc(DefaultConnFunc)
  81. //
  82. func PersistentPubSubWithOpts(
  83. network, addr string, options ...PersistentPubSubOpt,
  84. ) (
  85. PubSubConn, error,
  86. ) {
  87. opts := persistentPubSubOpts{
  88. connFn: DefaultConnFunc,
  89. }
  90. for _, opt := range options {
  91. opt(&opts)
  92. }
  93. p := &persistentPubSub{
  94. dial: func() (Conn, error) { return opts.connFn(network, addr) },
  95. opts: opts,
  96. subs: chanSet{},
  97. psubs: chanSet{},
  98. cmdCh: make(chan pubSubCmd),
  99. closeCh: make(chan struct{}),
  100. }
  101. if err := p.refresh(); err != nil {
  102. return nil, err
  103. }
  104. go p.spin()
  105. return p, nil
  106. }
  107. // PersistentPubSub is deprecated in favor of PersistentPubSubWithOpts instead.
  108. func PersistentPubSub(network, addr string, connFn ConnFunc) PubSubConn {
  109. var opts []PersistentPubSubOpt
  110. if connFn != nil {
  111. opts = append(opts, PersistentPubSubConnFunc(connFn))
  112. }
  113. // since PersistentPubSubAbortAfter isn't used, this will never return an
  114. // error, panic if it does
  115. p, err := PersistentPubSubWithOpts(network, addr, opts...)
  116. if err != nil {
  117. panic(fmt.Sprintf("PersistentPubSubWithOpts impossibly returned an error: %v", err))
  118. }
  119. return p
  120. }
  121. // refresh only returns an error if the connection could not be made.
  122. func (p *persistentPubSub) refresh() error {
  123. if p.curr != nil {
  124. p.curr.Close()
  125. <-p.currErrCh
  126. p.curr = nil
  127. p.currErrCh = nil
  128. }
  129. attempt := func() (PubSubConn, chan error, error) {
  130. c, err := p.dial()
  131. if err != nil {
  132. return nil, nil, err
  133. }
  134. errCh := make(chan error, 1)
  135. pc := newPubSub(c, errCh)
  136. for msgCh, channels := range p.subs.inverse() {
  137. if err := pc.Subscribe(msgCh, channels...); err != nil {
  138. pc.Close()
  139. return nil, nil, err
  140. }
  141. }
  142. for msgCh, patterns := range p.psubs.inverse() {
  143. if err := pc.PSubscribe(msgCh, patterns...); err != nil {
  144. pc.Close()
  145. return nil, nil, err
  146. }
  147. }
  148. return pc, errCh, nil
  149. }
  150. var attempts int
  151. for {
  152. var err error
  153. if p.curr, p.currErrCh, err = attempt(); err == nil {
  154. return nil
  155. }
  156. attempts++
  157. if p.opts.abortAfter > 0 && attempts >= p.opts.abortAfter {
  158. return err
  159. }
  160. time.Sleep(200 * time.Millisecond)
  161. }
  162. }
  163. func (p *persistentPubSub) execCmd(cmd pubSubCmd) error {
  164. if p.curr == nil {
  165. if err := p.refresh(); err != nil {
  166. return err
  167. }
  168. }
  169. // For all subscribe/unsubscribe/etc... commands the modifications to
  170. // p.subs/p.psubs are made first, so that if the actual call to curr fails
  171. // then refresh will still instate the new desired subscription.
  172. var err error
  173. switch {
  174. case len(cmd.subscribe) > 0:
  175. for _, channel := range cmd.subscribe {
  176. p.subs.add(channel, cmd.msgCh)
  177. }
  178. err = p.curr.Subscribe(cmd.msgCh, cmd.subscribe...)
  179. case len(cmd.unsubscribe) > 0:
  180. for _, channel := range cmd.unsubscribe {
  181. p.subs.del(channel, cmd.msgCh)
  182. }
  183. err = p.curr.Unsubscribe(cmd.msgCh, cmd.unsubscribe...)
  184. case len(cmd.psubscribe) > 0:
  185. for _, channel := range cmd.psubscribe {
  186. p.psubs.add(channel, cmd.msgCh)
  187. }
  188. err = p.curr.PSubscribe(cmd.msgCh, cmd.psubscribe...)
  189. case len(cmd.punsubscribe) > 0:
  190. for _, channel := range cmd.punsubscribe {
  191. p.psubs.del(channel, cmd.msgCh)
  192. }
  193. err = p.curr.PUnsubscribe(cmd.msgCh, cmd.punsubscribe...)
  194. case cmd.ping:
  195. err = p.curr.Ping()
  196. case cmd.close:
  197. if p.curr != nil {
  198. err = p.curr.Close()
  199. <-p.currErrCh
  200. }
  201. default:
  202. // don't do anything I guess
  203. }
  204. if err != nil {
  205. return p.refresh()
  206. }
  207. return nil
  208. }
  209. func (p *persistentPubSub) err(err error) {
  210. select {
  211. case p.opts.errCh <- err:
  212. default:
  213. }
  214. }
  215. func (p *persistentPubSub) spin() {
  216. for {
  217. select {
  218. case err := <-p.currErrCh:
  219. p.err(err)
  220. if err := p.refresh(); err != nil {
  221. p.err(err)
  222. }
  223. case cmd := <-p.cmdCh:
  224. cmd.resCh <- p.execCmd(cmd)
  225. if cmd.close {
  226. return
  227. }
  228. }
  229. }
  230. }
  231. func (p *persistentPubSub) cmd(cmd pubSubCmd) error {
  232. cmd.resCh = make(chan error, 1)
  233. select {
  234. case p.cmdCh <- cmd:
  235. return <-cmd.resCh
  236. case <-p.closeCh:
  237. return fmt.Errorf("closed")
  238. }
  239. }
  240. func (p *persistentPubSub) Subscribe(msgCh chan<- PubSubMessage, channels ...string) error {
  241. return p.cmd(pubSubCmd{
  242. msgCh: msgCh,
  243. subscribe: channels,
  244. })
  245. }
  246. func (p *persistentPubSub) Unsubscribe(msgCh chan<- PubSubMessage, channels ...string) error {
  247. return p.cmd(pubSubCmd{
  248. msgCh: msgCh,
  249. unsubscribe: channels,
  250. })
  251. }
  252. func (p *persistentPubSub) PSubscribe(msgCh chan<- PubSubMessage, channels ...string) error {
  253. return p.cmd(pubSubCmd{
  254. msgCh: msgCh,
  255. psubscribe: channels,
  256. })
  257. }
  258. func (p *persistentPubSub) PUnsubscribe(msgCh chan<- PubSubMessage, channels ...string) error {
  259. return p.cmd(pubSubCmd{
  260. msgCh: msgCh,
  261. punsubscribe: channels,
  262. })
  263. }
  264. func (p *persistentPubSub) Ping() error {
  265. return p.cmd(pubSubCmd{ping: true})
  266. }
  267. func (p *persistentPubSub) Close() error {
  268. p.closeOnce.Do(func() {
  269. p.closeErr = p.cmd(pubSubCmd{close: true})
  270. close(p.closeCh)
  271. if p.opts.errCh != nil {
  272. close(p.opts.errCh)
  273. }
  274. })
  275. return p.closeErr
  276. }