pubsub.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. package redis
  2. import (
  3. "errors"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/go-redis/redis/internal"
  9. "github.com/go-redis/redis/internal/pool"
  10. "github.com/go-redis/redis/internal/proto"
  11. )
  12. var errPingTimeout = errors.New("redis: ping timeout")
  13. // PubSub implements Pub/Sub commands as described in
  14. // http://redis.io/topics/pubsub. Message receiving is NOT safe
  15. // for concurrent use by multiple goroutines.
  16. //
  17. // PubSub automatically reconnects to Redis Server and resubscribes
  18. // to the channels in case of network errors.
  19. type PubSub struct {
  20. opt *Options
  21. newConn func([]string) (*pool.Conn, error)
  22. closeConn func(*pool.Conn) error
  23. mu sync.Mutex
  24. cn *pool.Conn
  25. channels map[string]struct{}
  26. patterns map[string]struct{}
  27. closed bool
  28. exit chan struct{}
  29. cmd *Cmd
  30. chOnce sync.Once
  31. ch chan *Message
  32. ping chan struct{}
  33. }
  34. func (c *PubSub) String() string {
  35. channels := mapKeys(c.channels)
  36. channels = append(channels, mapKeys(c.patterns)...)
  37. return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", "))
  38. }
  39. func (c *PubSub) init() {
  40. c.exit = make(chan struct{})
  41. }
  42. func (c *PubSub) conn() (*pool.Conn, error) {
  43. c.mu.Lock()
  44. cn, err := c._conn(nil)
  45. c.mu.Unlock()
  46. return cn, err
  47. }
  48. func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) {
  49. if c.closed {
  50. return nil, pool.ErrClosed
  51. }
  52. if c.cn != nil {
  53. return c.cn, nil
  54. }
  55. channels := mapKeys(c.channels)
  56. channels = append(channels, newChannels...)
  57. cn, err := c.newConn(channels)
  58. if err != nil {
  59. return nil, err
  60. }
  61. if err := c.resubscribe(cn); err != nil {
  62. _ = c.closeConn(cn)
  63. return nil, err
  64. }
  65. c.cn = cn
  66. return cn, nil
  67. }
  68. func (c *PubSub) writeCmd(cn *pool.Conn, cmd Cmder) error {
  69. return cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
  70. return writeCmd(wr, cmd)
  71. })
  72. }
  73. func (c *PubSub) resubscribe(cn *pool.Conn) error {
  74. var firstErr error
  75. if len(c.channels) > 0 {
  76. err := c._subscribe(cn, "subscribe", mapKeys(c.channels))
  77. if err != nil && firstErr == nil {
  78. firstErr = err
  79. }
  80. }
  81. if len(c.patterns) > 0 {
  82. err := c._subscribe(cn, "psubscribe", mapKeys(c.patterns))
  83. if err != nil && firstErr == nil {
  84. firstErr = err
  85. }
  86. }
  87. return firstErr
  88. }
  89. func mapKeys(m map[string]struct{}) []string {
  90. s := make([]string, len(m))
  91. i := 0
  92. for k := range m {
  93. s[i] = k
  94. i++
  95. }
  96. return s
  97. }
  98. func (c *PubSub) _subscribe(
  99. cn *pool.Conn, redisCmd string, channels []string,
  100. ) error {
  101. args := make([]interface{}, 0, 1+len(channels))
  102. args = append(args, redisCmd)
  103. for _, channel := range channels {
  104. args = append(args, channel)
  105. }
  106. cmd := NewSliceCmd(args...)
  107. return c.writeCmd(cn, cmd)
  108. }
  109. func (c *PubSub) releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
  110. c.mu.Lock()
  111. c._releaseConn(cn, err, allowTimeout)
  112. c.mu.Unlock()
  113. }
  114. func (c *PubSub) _releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
  115. if c.cn != cn {
  116. return
  117. }
  118. if internal.IsBadConn(err, allowTimeout) {
  119. c._reconnect(err)
  120. }
  121. }
  122. func (c *PubSub) _reconnect(reason error) {
  123. _ = c._closeTheCn(reason)
  124. _, _ = c._conn(nil)
  125. }
  126. func (c *PubSub) _closeTheCn(reason error) error {
  127. if c.cn == nil {
  128. return nil
  129. }
  130. if !c.closed {
  131. internal.Logf("redis: discarding bad PubSub connection: %s", reason)
  132. }
  133. err := c.closeConn(c.cn)
  134. c.cn = nil
  135. return err
  136. }
  137. func (c *PubSub) Close() error {
  138. c.mu.Lock()
  139. defer c.mu.Unlock()
  140. if c.closed {
  141. return pool.ErrClosed
  142. }
  143. c.closed = true
  144. close(c.exit)
  145. err := c._closeTheCn(pool.ErrClosed)
  146. return err
  147. }
  148. // Subscribe the client to the specified channels. It returns
  149. // empty subscription if there are no channels.
  150. func (c *PubSub) Subscribe(channels ...string) error {
  151. c.mu.Lock()
  152. defer c.mu.Unlock()
  153. err := c.subscribe("subscribe", channels...)
  154. if c.channels == nil {
  155. c.channels = make(map[string]struct{})
  156. }
  157. for _, s := range channels {
  158. c.channels[s] = struct{}{}
  159. }
  160. return err
  161. }
  162. // PSubscribe the client to the given patterns. It returns
  163. // empty subscription if there are no patterns.
  164. func (c *PubSub) PSubscribe(patterns ...string) error {
  165. c.mu.Lock()
  166. defer c.mu.Unlock()
  167. err := c.subscribe("psubscribe", patterns...)
  168. if c.patterns == nil {
  169. c.patterns = make(map[string]struct{})
  170. }
  171. for _, s := range patterns {
  172. c.patterns[s] = struct{}{}
  173. }
  174. return err
  175. }
  176. // Unsubscribe the client from the given channels, or from all of
  177. // them if none is given.
  178. func (c *PubSub) Unsubscribe(channels ...string) error {
  179. c.mu.Lock()
  180. defer c.mu.Unlock()
  181. for _, channel := range channels {
  182. delete(c.channels, channel)
  183. }
  184. err := c.subscribe("unsubscribe", channels...)
  185. return err
  186. }
  187. // PUnsubscribe the client from the given patterns, or from all of
  188. // them if none is given.
  189. func (c *PubSub) PUnsubscribe(patterns ...string) error {
  190. c.mu.Lock()
  191. defer c.mu.Unlock()
  192. for _, pattern := range patterns {
  193. delete(c.patterns, pattern)
  194. }
  195. err := c.subscribe("punsubscribe", patterns...)
  196. return err
  197. }
  198. func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
  199. cn, err := c._conn(channels)
  200. if err != nil {
  201. return err
  202. }
  203. err = c._subscribe(cn, redisCmd, channels)
  204. c._releaseConn(cn, err, false)
  205. return err
  206. }
  207. func (c *PubSub) Ping(payload ...string) error {
  208. args := []interface{}{"ping"}
  209. if len(payload) == 1 {
  210. args = append(args, payload[0])
  211. }
  212. cmd := NewCmd(args...)
  213. cn, err := c.conn()
  214. if err != nil {
  215. return err
  216. }
  217. err = c.writeCmd(cn, cmd)
  218. c.releaseConn(cn, err, false)
  219. return err
  220. }
  221. // Subscription received after a successful subscription to channel.
  222. type Subscription struct {
  223. // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
  224. Kind string
  225. // Channel name we have subscribed to.
  226. Channel string
  227. // Number of channels we are currently subscribed to.
  228. Count int
  229. }
  230. func (m *Subscription) String() string {
  231. return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
  232. }
  233. // Message received as result of a PUBLISH command issued by another client.
  234. type Message struct {
  235. Channel string
  236. Pattern string
  237. Payload string
  238. }
  239. func (m *Message) String() string {
  240. return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
  241. }
  242. // Pong received as result of a PING command issued by another client.
  243. type Pong struct {
  244. Payload string
  245. }
  246. func (p *Pong) String() string {
  247. if p.Payload != "" {
  248. return fmt.Sprintf("Pong<%s>", p.Payload)
  249. }
  250. return "Pong"
  251. }
  252. func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
  253. switch reply := reply.(type) {
  254. case string:
  255. return &Pong{
  256. Payload: reply,
  257. }, nil
  258. case []interface{}:
  259. switch kind := reply[0].(string); kind {
  260. case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
  261. channel, _ := reply[1].(string)
  262. return &Subscription{
  263. Kind: kind,
  264. Channel: channel,
  265. Count: int(reply[2].(int64)),
  266. }, nil
  267. case "message":
  268. return &Message{
  269. Channel: reply[1].(string),
  270. Payload: reply[2].(string),
  271. }, nil
  272. case "pmessage":
  273. return &Message{
  274. Pattern: reply[1].(string),
  275. Channel: reply[2].(string),
  276. Payload: reply[3].(string),
  277. }, nil
  278. case "pong":
  279. return &Pong{
  280. Payload: reply[1].(string),
  281. }, nil
  282. default:
  283. return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
  284. }
  285. default:
  286. return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply)
  287. }
  288. }
  289. // ReceiveTimeout acts like Receive but returns an error if message
  290. // is not received in time. This is low-level API and in most cases
  291. // Channel should be used instead.
  292. func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
  293. if c.cmd == nil {
  294. c.cmd = NewCmd()
  295. }
  296. cn, err := c.conn()
  297. if err != nil {
  298. return nil, err
  299. }
  300. err = cn.WithReader(timeout, func(rd *proto.Reader) error {
  301. return c.cmd.readReply(rd)
  302. })
  303. c.releaseConn(cn, err, timeout > 0)
  304. if err != nil {
  305. return nil, err
  306. }
  307. return c.newMessage(c.cmd.Val())
  308. }
  309. // Receive returns a message as a Subscription, Message, Pong or error.
  310. // See PubSub example for details. This is low-level API and in most cases
  311. // Channel should be used instead.
  312. func (c *PubSub) Receive() (interface{}, error) {
  313. return c.ReceiveTimeout(0)
  314. }
  315. // ReceiveMessage returns a Message or error ignoring Subscription and Pong
  316. // messages. This is low-level API and in most cases Channel should be used
  317. // instead.
  318. func (c *PubSub) ReceiveMessage() (*Message, error) {
  319. for {
  320. msg, err := c.Receive()
  321. if err != nil {
  322. return nil, err
  323. }
  324. switch msg := msg.(type) {
  325. case *Subscription:
  326. // Ignore.
  327. case *Pong:
  328. // Ignore.
  329. case *Message:
  330. return msg, nil
  331. default:
  332. err := fmt.Errorf("redis: unknown message: %T", msg)
  333. return nil, err
  334. }
  335. }
  336. }
  337. // Channel returns a Go channel for concurrently receiving messages.
  338. // It periodically sends Ping messages to test connection health.
  339. // The channel is closed with PubSub. Receive* APIs can not be used
  340. // after channel is created.
  341. //
  342. // If the Go channel is full for 30 seconds the message is dropped.
  343. func (c *PubSub) Channel() <-chan *Message {
  344. return c.channel(100)
  345. }
  346. // ChannelSize is like Channel, but creates a Go channel
  347. // with specified buffer size.
  348. func (c *PubSub) ChannelSize(size int) <-chan *Message {
  349. return c.channel(size)
  350. }
  351. func (c *PubSub) channel(size int) <-chan *Message {
  352. c.chOnce.Do(func() {
  353. c.initChannel(size)
  354. })
  355. if cap(c.ch) != size {
  356. err := fmt.Errorf("redis: PubSub.Channel is called with different buffer size")
  357. panic(err)
  358. }
  359. return c.ch
  360. }
  361. func (c *PubSub) initChannel(size int) {
  362. const timeout = 30 * time.Second
  363. c.ch = make(chan *Message, size)
  364. c.ping = make(chan struct{}, 1)
  365. go func() {
  366. timer := time.NewTimer(timeout)
  367. timer.Stop()
  368. var errCount int
  369. for {
  370. msg, err := c.Receive()
  371. if err != nil {
  372. if err == pool.ErrClosed {
  373. close(c.ch)
  374. return
  375. }
  376. if errCount > 0 {
  377. time.Sleep(c.retryBackoff(errCount))
  378. }
  379. errCount++
  380. continue
  381. }
  382. errCount = 0
  383. // Any message is as good as a ping.
  384. select {
  385. case c.ping <- struct{}{}:
  386. default:
  387. }
  388. switch msg := msg.(type) {
  389. case *Subscription:
  390. // Ignore.
  391. case *Pong:
  392. // Ignore.
  393. case *Message:
  394. timer.Reset(timeout)
  395. select {
  396. case c.ch <- msg:
  397. if !timer.Stop() {
  398. <-timer.C
  399. }
  400. case <-timer.C:
  401. internal.Logf(
  402. "redis: %s channel is full for %s (message is dropped)",
  403. c, timeout)
  404. }
  405. default:
  406. internal.Logf("redis: unknown message type: %T", msg)
  407. }
  408. }
  409. }()
  410. go func() {
  411. timer := time.NewTimer(timeout)
  412. timer.Stop()
  413. healthy := true
  414. for {
  415. timer.Reset(timeout)
  416. select {
  417. case <-c.ping:
  418. healthy = true
  419. if !timer.Stop() {
  420. <-timer.C
  421. }
  422. case <-timer.C:
  423. pingErr := c.Ping()
  424. if healthy {
  425. healthy = false
  426. } else {
  427. if pingErr == nil {
  428. pingErr = errPingTimeout
  429. }
  430. c.mu.Lock()
  431. c._reconnect(pingErr)
  432. c.mu.Unlock()
  433. }
  434. case <-c.exit:
  435. return
  436. }
  437. }
  438. }()
  439. }
  440. func (c *PubSub) retryBackoff(attempt int) time.Duration {
  441. return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  442. }