stackexchange_redis.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. package redis
  2. import (
  3. "context"
  4. "math/rand"
  5. "time"
  6. "github.com/kataras/neffos"
  7. "github.com/mediocregopher/radix/v3"
  8. )
  9. // Config is used on the `StackExchange` package-level function.
  10. // Can be used to customize the redis client dialer.
  11. type Config struct {
  12. // Network to use.
  13. // Defaults to "tcp".
  14. Network string
  15. // Addr of a single redis server instance.
  16. // See "Clusters" field for clusters support.
  17. // Defaults to "127.0.0.1:6379".
  18. Addr string
  19. // Clusters a list of network addresses for clusters.
  20. // If not empty "Addr" is ignored.
  21. Clusters []string
  22. Password string
  23. DialTimeout time.Duration
  24. // MaxActive defines the size connection pool.
  25. // Defaults to 10.
  26. MaxActive int
  27. }
  28. // StackExchange is a `neffos.StackExchange` for redis.
  29. type StackExchange struct {
  30. channel string
  31. pool *radix.Pool
  32. connFunc radix.ConnFunc
  33. subscribers map[*neffos.Conn]*subscriber
  34. addSubscriber chan *subscriber
  35. subscribe chan subscribeAction
  36. unsubscribe chan unsubscribeAction
  37. delSubscriber chan closeAction
  38. }
  39. type (
  40. subscriber struct {
  41. conn *neffos.Conn
  42. pubSub radix.PubSubConn
  43. msgCh chan<- radix.PubSubMessage
  44. }
  45. subscribeAction struct {
  46. conn *neffos.Conn
  47. namespace string
  48. }
  49. unsubscribeAction struct {
  50. conn *neffos.Conn
  51. namespace string
  52. }
  53. closeAction struct {
  54. conn *neffos.Conn
  55. }
  56. )
  57. var _ neffos.StackExchange = (*StackExchange)(nil)
  58. // NewStackExchange returns a new redis StackExchange.
  59. // The "channel" input argument is the channel prefix for publish and subscribe.
  60. func NewStackExchange(cfg Config, channel string) (*StackExchange, error) {
  61. if cfg.Network == "" {
  62. cfg.Network = "tcp"
  63. }
  64. if cfg.Addr == "" && len(cfg.Clusters) == 0 {
  65. cfg.Addr = "127.0.0.1:6379"
  66. }
  67. if cfg.DialTimeout < 0 {
  68. cfg.DialTimeout = 30 * time.Second
  69. }
  70. if cfg.MaxActive == 0 {
  71. cfg.MaxActive = 10
  72. }
  73. var dialOptions []radix.DialOpt
  74. if cfg.Password != "" {
  75. dialOptions = append(dialOptions, radix.DialAuthPass(cfg.Password))
  76. }
  77. if cfg.DialTimeout > 0 {
  78. dialOptions = append(dialOptions, radix.DialTimeout(cfg.DialTimeout))
  79. }
  80. var connFunc radix.ConnFunc
  81. if len(cfg.Clusters) > 0 {
  82. cluster, err := radix.NewCluster(cfg.Clusters)
  83. if err != nil {
  84. // maybe an
  85. // ERR This instance has cluster support disabled
  86. return nil, err
  87. }
  88. connFunc = func(network, addr string) (radix.Conn, error) {
  89. topo := cluster.Topo()
  90. node := topo[rand.Intn(len(topo))]
  91. return radix.Dial(cfg.Network, node.Addr, dialOptions...)
  92. }
  93. } else {
  94. connFunc = func(network, addr string) (radix.Conn, error) {
  95. return radix.Dial(cfg.Network, cfg.Addr, dialOptions...)
  96. }
  97. }
  98. pool, err := radix.NewPool("", "", cfg.MaxActive, radix.PoolConnFunc(connFunc))
  99. if err != nil {
  100. return nil, err
  101. }
  102. exc := &StackExchange{
  103. pool: pool,
  104. connFunc: connFunc,
  105. // If you are using one redis server for multiple nefos servers,
  106. // use a different channel for each neffos server.
  107. // Otherwise a message sent from one server to all of its own clients will go
  108. // to all clients of all nefos servers that use the redis server.
  109. // We could use multiple channels but overcomplicate things here.
  110. channel: channel,
  111. subscribers: make(map[*neffos.Conn]*subscriber),
  112. addSubscriber: make(chan *subscriber),
  113. delSubscriber: make(chan closeAction),
  114. subscribe: make(chan subscribeAction),
  115. unsubscribe: make(chan unsubscribeAction),
  116. }
  117. go exc.run()
  118. return exc, nil
  119. }
  120. func (exc *StackExchange) run() {
  121. for {
  122. select {
  123. case s := <-exc.addSubscriber:
  124. exc.subscribers[s.conn] = s
  125. // neffos.Debugf("[%s] added to potential subscribers", s.conn.ID())
  126. case m := <-exc.subscribe:
  127. if sub, ok := exc.subscribers[m.conn]; ok {
  128. channel := exc.getChannel(m.namespace, "", "")
  129. sub.pubSub.PSubscribe(sub.msgCh, channel)
  130. // neffos.Debugf("[%s] subscribed to [%s] for namespace [%s]", m.conn.ID(), channel, m.namespace)
  131. // } else {
  132. // neffos.Debugf("[%s] tried to subscribe to [%s] namespace before 'OnConnect.addSubscriber'!", m.conn.ID(), m.namespace)
  133. }
  134. case m := <-exc.unsubscribe:
  135. if sub, ok := exc.subscribers[m.conn]; ok {
  136. channel := exc.getChannel(m.namespace, "", "")
  137. // neffos.Debugf("[%s] unsubscribed from [%s]", channel)
  138. sub.pubSub.PUnsubscribe(sub.msgCh, channel)
  139. }
  140. case m := <-exc.delSubscriber:
  141. if sub, ok := exc.subscribers[m.conn]; ok {
  142. // neffos.Debugf("[%s] disconnected", m.conn.ID())
  143. sub.pubSub.Close()
  144. close(sub.msgCh)
  145. delete(exc.subscribers, m.conn)
  146. }
  147. }
  148. }
  149. }
  150. func (exc *StackExchange) getChannel(namespace, room, connID string) string {
  151. if connID != "" {
  152. // publish direct and let the server-side do the checks
  153. // of valid or invalid message to send on this particular client.
  154. return exc.channel + "." + connID + "."
  155. }
  156. if namespace == "" && room != "" {
  157. // should never happen but give info for debugging.
  158. panic("namespace cannot be empty when sending to a namespace's room")
  159. }
  160. return exc.channel + "." + namespace + "."
  161. }
  162. // OnConnect prepares the connection redis subscriber
  163. // and subscribes to itself for direct neffos messages.
  164. // It's called automatically after the neffos server's OnConnect (if any)
  165. // on incoming client connections.
  166. func (exc *StackExchange) OnConnect(c *neffos.Conn) error {
  167. redisMsgCh := make(chan radix.PubSubMessage)
  168. go func() {
  169. for redisMsg := range redisMsgCh {
  170. // neffos.Debugf("[%s] send to client: [%s]", c.ID(), string(redisMsg.Message))
  171. msg := c.DeserializeMessage(neffos.TextMessage, redisMsg.Message)
  172. msg.FromStackExchange = true
  173. c.Write(msg)
  174. }
  175. }()
  176. pubSub := radix.PersistentPubSub("", "", exc.connFunc)
  177. s := &subscriber{
  178. conn: c,
  179. pubSub: pubSub,
  180. msgCh: redisMsgCh,
  181. }
  182. selfChannel := exc.getChannel("", "", c.ID())
  183. pubSub.PSubscribe(redisMsgCh, selfChannel)
  184. exc.addSubscriber <- s
  185. return nil
  186. }
  187. // Publish publishes messages through redis.
  188. // It's called automatically on neffos broadcasting.
  189. func (exc *StackExchange) Publish(msgs []neffos.Message) bool {
  190. for _, msg := range msgs {
  191. if !exc.publish(msg) {
  192. return false
  193. }
  194. }
  195. return true
  196. }
  197. func (exc *StackExchange) publish(msg neffos.Message) bool {
  198. // channel := exc.getMessageChannel(c.ID(), msg)
  199. channel := exc.getChannel(msg.Namespace, msg.Room, msg.To)
  200. // neffos.Debugf("[%s] publish to channel [%s] the data [%s]\n", msg.FromExplicit, channel, string(msg.Serialize()))
  201. err := exc.publishCommand(channel, msg.Serialize())
  202. return err == nil
  203. }
  204. func (exc *StackExchange) publishCommand(channel string, b []byte) error {
  205. cmd := radix.FlatCmd(nil, "PUBLISH", channel, b)
  206. return exc.pool.Do(cmd)
  207. }
  208. // Ask implements the server Ask feature for redis. It blocks until response.
  209. func (exc *StackExchange) Ask(ctx context.Context, msg neffos.Message, token string) (response neffos.Message, err error) {
  210. sub := radix.PersistentPubSub("", "", exc.connFunc)
  211. msgCh := make(chan radix.PubSubMessage)
  212. err = sub.Subscribe(msgCh, token)
  213. if err != nil {
  214. return
  215. }
  216. defer sub.Close()
  217. if !exc.publish(msg) {
  218. return response, neffos.ErrWrite
  219. }
  220. select {
  221. case <-ctx.Done():
  222. err = ctx.Err()
  223. case redisMsg := <-msgCh:
  224. response = neffos.DeserializeMessage(neffos.TextMessage, redisMsg.Message, false, false)
  225. err = response.Err
  226. }
  227. return
  228. }
  229. // NotifyAsk notifies and unblocks a "msg" subscriber, called on a server connection's read when expects a result.
  230. func (exc *StackExchange) NotifyAsk(msg neffos.Message, token string) error {
  231. msg.ClearWait()
  232. return exc.publishCommand(token, msg.Serialize())
  233. }
  234. // Subscribe subscribes to a specific namespace,
  235. // it's called automatically on neffos namespace connected.
  236. func (exc *StackExchange) Subscribe(c *neffos.Conn, namespace string) {
  237. exc.subscribe <- subscribeAction{
  238. conn: c,
  239. namespace: namespace,
  240. }
  241. }
  242. // Unsubscribe unsubscribes from a specific namespace,
  243. // it's called automatically on neffos namespace disconnect.
  244. func (exc *StackExchange) Unsubscribe(c *neffos.Conn, namespace string) {
  245. exc.unsubscribe <- unsubscribeAction{
  246. conn: c,
  247. namespace: namespace,
  248. }
  249. }
  250. // OnDisconnect terminates the connection's subscriber that
  251. // created on the `OnConnect` method.
  252. // It unsubscribes to all opened channels and
  253. // closes the internal read messages channel.
  254. // It's called automatically when a connection goes offline,
  255. // manually by server or client or by network failure.
  256. func (exc *StackExchange) OnDisconnect(c *neffos.Conn) {
  257. exc.delSubscriber <- closeAction{conn: c}
  258. }