stackexchange_nats.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. package nats
  2. import (
  3. "context"
  4. "strings"
  5. "sync"
  6. "github.com/kataras/neffos"
  7. "github.com/nats-io/nats.go"
  8. )
  9. // StackExchange is a `neffos.StackExchange` for nats
  10. // based on https://nats-io.github.io/docs/developer/tutorials/pubsub.html.
  11. type StackExchange struct {
  12. // options holds the nats options for clients.
  13. // Defaults to the `nats.GetDefaultOptions()` which
  14. // can be overridden by the `With` function on `NewStackExchange`.
  15. opts nats.Options
  16. // If you use the same nats server instance for multiple neffos apps,
  17. // set this to different values across your apps.
  18. SubjectPrefix string
  19. publisher *nats.Conn
  20. subscribers map[*neffos.Conn]*subscriber
  21. addSubscriber chan *subscriber
  22. subscribe chan subscribeAction
  23. unsubscribe chan unsubscribeAction
  24. delSubscriber chan closeAction
  25. }
  26. var _ neffos.StackExchange = (*StackExchange)(nil)
  27. type (
  28. subscriber struct {
  29. conn *neffos.Conn
  30. subConn *nats.Conn
  31. // To unsubscribe a connection per namespace, set on subscribe channel.
  32. // Key is the subject pattern, with lock for any case, although
  33. // they shouldn't execute in parallel from neffos conn itself.
  34. subscriptions map[string]*nats.Subscription
  35. mu sync.RWMutex
  36. }
  37. subscribeAction struct {
  38. conn *neffos.Conn
  39. namespace string
  40. }
  41. unsubscribeAction struct {
  42. conn *neffos.Conn
  43. namespace string
  44. }
  45. closeAction struct {
  46. conn *neffos.Conn
  47. }
  48. )
  49. // With accepts a nats.Options structure
  50. // which contains the whole configuration
  51. // and returns a nats.Option which can be passed
  52. // to the `NewStackExchange`'s second input variadic argument.
  53. // Note that use this method only when you want to override the default options
  54. // at once.
  55. func With(options nats.Options) nats.Option {
  56. return func(opts *nats.Options) error {
  57. *opts = options
  58. return nil
  59. }
  60. }
  61. // NewStackExchange returns a new nats StackExchange.
  62. // The required field is "url" which should be in the form
  63. // of nats connection string, e.g. nats://username:pass@localhost:4222.
  64. // Other option is to leave the url with localhost:4222 and pass
  65. // authentication options such as `nats.UserInfo(username, pass)` or
  66. // nats.UserCredentials("./userCredsFile") at the second variadic input argument.
  67. //
  68. // Options can be used to register nats error and close handlers too.
  69. //
  70. // Alternatively, use the `With(nats.Options)` function to
  71. // customize the client through struct fields.
  72. func NewStackExchange(url string, options ...nats.Option) (*StackExchange, error) {
  73. // For subscribing:
  74. // Use a single client or create new for each new incoming websocket connection?
  75. // - nats does not have a connection pool and
  76. // - it uses callbacks for subscribers and
  77. // so I assumed it's tend to be uses as single client BUT inside its source code:
  78. // - the connect itself is done under its nats.go/Conn.connect()
  79. // - the reading is done through loop waits for each server message
  80. // and it parses and stores field data using connection-level locks.
  81. // - and the subscriber at nats.go/Conn#waitForMsgs(s *Subscription) for channel use
  82. // also uses connection-level locks. ^ this is slower than callbacks,
  83. // callbacks are more low level there as far as my research goes.
  84. // So I will proceed with making a new nats connection for each websocket connection,
  85. // if anyone with more experience on nats than me has a different approach
  86. // we should listen to and process with actions on making it more efficient.
  87. // For publishing:
  88. // Create a connection, here, which will only be used to Publish.
  89. // Cache the options to be used on every client and
  90. // respect any customization by caller.
  91. opts := nats.GetDefaultOptions()
  92. if url == "" {
  93. url = nats.DefaultURL
  94. }
  95. opts.Url = url
  96. // TODO: export the neffos.debugEnabled
  97. // and set that:
  98. // opts.Verbose = true
  99. opts.NoEcho = true
  100. for _, opt := range options {
  101. if opt == nil {
  102. continue
  103. }
  104. if err := opt(&opts); err != nil {
  105. return nil, err
  106. }
  107. }
  108. // opts.Url may change from caller, use the struct's field to respect it.
  109. servers := strings.Split(opts.Url, ",")
  110. for i, s := range servers {
  111. servers[i] = strings.TrimSpace(s)
  112. }
  113. // append to make sure that any custom servers from caller
  114. // are respected, no check for duplications.
  115. opts.Servers = append(opts.Servers, servers...)
  116. pubConn, err := opts.Connect()
  117. if err != nil {
  118. return nil, err
  119. }
  120. exc := &StackExchange{
  121. opts: opts,
  122. SubjectPrefix: "neffos",
  123. publisher: pubConn,
  124. subscribers: make(map[*neffos.Conn]*subscriber),
  125. addSubscriber: make(chan *subscriber),
  126. delSubscriber: make(chan closeAction),
  127. subscribe: make(chan subscribeAction),
  128. unsubscribe: make(chan unsubscribeAction),
  129. }
  130. go exc.run()
  131. return exc, nil
  132. }
  133. func (exc *StackExchange) run() {
  134. for {
  135. select {
  136. case s := <-exc.addSubscriber:
  137. // neffos.Debugf("[%s] added to potential subscribers", s.conn.ID())
  138. exc.subscribers[s.conn] = s
  139. case m := <-exc.subscribe:
  140. if sub, ok := exc.subscribers[m.conn]; ok {
  141. if sub.subConn.IsClosed() {
  142. // neffos.Debugf("[%s] has an unexpected nats connection closing on subscribe", m.conn.ID())
  143. delete(exc.subscribers, m.conn)
  144. continue
  145. }
  146. subject := exc.getSubject(m.namespace, "", "")
  147. // neffos.Debugf("[%s] subscribed to [%s]", m.conn.ID(), subject)
  148. subscription, err := sub.subConn.Subscribe(subject, makeMsgHandler(sub.conn))
  149. if err != nil {
  150. continue
  151. }
  152. sub.subConn.Flush()
  153. if err = sub.subConn.LastError(); err != nil {
  154. // neffos.Debugf("[%s] OnSubscribe [%s] Last Error: %v", m.conn, subject, err)
  155. continue
  156. }
  157. sub.mu.Lock()
  158. if sub.subscriptions == nil {
  159. sub.subscriptions = make(map[string]*nats.Subscription)
  160. }
  161. sub.subscriptions[subject] = subscription
  162. sub.mu.Unlock()
  163. }
  164. case m := <-exc.unsubscribe:
  165. if sub, ok := exc.subscribers[m.conn]; ok {
  166. if sub.subConn.IsClosed() {
  167. // neffos.Debugf("[%s] has an unexpected nats connection closing on unsubscribe", m.conn.ID())
  168. delete(exc.subscribers, m.conn)
  169. continue
  170. }
  171. subject := exc.getSubject(m.namespace, "", "")
  172. // neffos.Debugf("[%s] unsubscribed from [%s]", subject)
  173. if sub.subscriptions == nil {
  174. continue
  175. }
  176. sub.mu.RLock()
  177. subscription, ok := sub.subscriptions[subject]
  178. sub.mu.RUnlock()
  179. if ok {
  180. subscription.Unsubscribe()
  181. }
  182. }
  183. case m := <-exc.delSubscriber:
  184. if sub, ok := exc.subscribers[m.conn]; ok {
  185. // neffos.Debugf("[%s] disconnected", m.conn.ID())
  186. if sub.subConn.IsConnected() {
  187. sub.subConn.Close()
  188. }
  189. delete(exc.subscribers, m.conn)
  190. }
  191. }
  192. }
  193. }
  194. // Nats does not allow ending with ".", it uses pattern matching.
  195. func (exc *StackExchange) getSubject(namespace, room, connID string) string {
  196. if connID != "" {
  197. // publish direct and let the server-side do the checks
  198. // of valid or invalid message to send on this particular client.
  199. return exc.SubjectPrefix + "." + connID
  200. }
  201. if namespace == "" && room != "" {
  202. // should never happen but give info for debugging.
  203. panic("namespace cannot be empty when sending to a namespace's room")
  204. }
  205. return exc.SubjectPrefix + "." + namespace
  206. }
  207. func makeMsgHandler(c *neffos.Conn) nats.MsgHandler {
  208. return func(m *nats.Msg) {
  209. msg := c.DeserializeMessage(neffos.TextMessage, m.Data)
  210. msg.FromStackExchange = true
  211. c.Write(msg)
  212. }
  213. }
  214. // OnConnect prepares the connection nats subscriber
  215. // and subscribes to itself for direct neffos messages.
  216. // It's called automatically after the neffos server's OnConnect (if any)
  217. // on incoming client connections.
  218. func (exc *StackExchange) OnConnect(c *neffos.Conn) error {
  219. subConn, err := exc.opts.Connect()
  220. if err != nil {
  221. // neffos.Debugf("[%s] OnConnect Error: %v", c, err)
  222. return err
  223. }
  224. selfSubject := exc.getSubject("", "", c.ID())
  225. // unsubscribes automatically on close.
  226. _, err = subConn.Subscribe(selfSubject, makeMsgHandler(c))
  227. if err != nil {
  228. // neffos.Debugf("[%s] OnConnect.SelfSubscribe Error: %v", c, err)
  229. return err
  230. }
  231. subConn.Flush()
  232. if err = subConn.LastError(); err != nil {
  233. // maybe an invalid subject, send back to the client which will window.alert it.
  234. // neffos.Debugf("[%s] OnConnect.SelfSubscribe Last Error: %v", c, err)
  235. return err
  236. }
  237. s := &subscriber{
  238. conn: c,
  239. subConn: subConn,
  240. }
  241. exc.addSubscriber <- s
  242. return nil
  243. }
  244. // Publish publishes messages through nats.
  245. // It's called automatically on neffos broadcasting.
  246. func (exc *StackExchange) Publish(msgs []neffos.Message) bool {
  247. for _, msg := range msgs {
  248. if !exc.publish(msg) {
  249. return false
  250. }
  251. }
  252. return true
  253. }
  254. func (exc *StackExchange) publish(msg neffos.Message) bool {
  255. subject := exc.getSubject(msg.Namespace, msg.Room, msg.To)
  256. b := msg.Serialize()
  257. err := exc.publisher.Publish(subject, b)
  258. // Let's not add logging options, let
  259. // any custom nats error handler alone.
  260. return err == nil
  261. }
  262. // Ask implements server Ask for nats. It blocks.
  263. func (exc *StackExchange) Ask(ctx context.Context, msg neffos.Message, token string) (response neffos.Message, err error) {
  264. // for some reason we can't use the exc.publisher.Subscribe,
  265. // so create a new connection for subscription which will be terminated on message receive or timeout.
  266. subConn, err := exc.opts.Connect()
  267. if err != nil {
  268. return
  269. }
  270. ch := make(chan neffos.Message)
  271. sub, err := subConn.Subscribe(token, func(m *nats.Msg) {
  272. ch <- neffos.DeserializeMessage(neffos.TextMessage, m.Data, false, false)
  273. })
  274. if err != nil {
  275. return response, err
  276. }
  277. defer sub.Unsubscribe()
  278. defer subConn.Close()
  279. if !exc.publish(msg) {
  280. return response, neffos.ErrWrite
  281. }
  282. select {
  283. case <-ctx.Done():
  284. return response, ctx.Err()
  285. case response = <-ch:
  286. return response, response.Err
  287. }
  288. }
  289. // NotifyAsk notifies and unblocks a "msg" subscriber, called on a server connection's read when expects a result.
  290. func (exc *StackExchange) NotifyAsk(msg neffos.Message, token string) error {
  291. msg.ClearWait()
  292. err := exc.publisher.Publish(token, msg.Serialize())
  293. if err != nil {
  294. return err
  295. }
  296. exc.publisher.Flush()
  297. return exc.publisher.LastError()
  298. }
  299. // Subscribe subscribes to a specific namespace,
  300. // it's called automatically on neffos namespace connected.
  301. func (exc *StackExchange) Subscribe(c *neffos.Conn, namespace string) {
  302. exc.subscribe <- subscribeAction{
  303. conn: c,
  304. namespace: namespace,
  305. }
  306. }
  307. // Unsubscribe unsubscribes from a specific namespace,
  308. // it's called automatically on neffos namespace disconnect.
  309. func (exc *StackExchange) Unsubscribe(c *neffos.Conn, namespace string) {
  310. exc.unsubscribe <- unsubscribeAction{
  311. conn: c,
  312. namespace: namespace,
  313. }
  314. }
  315. // OnDisconnect terminates the connection's subscriber that
  316. // created on the `OnConnect` method.
  317. // It unsubscribes to all opened channels and
  318. // closes the internal read messages channel.
  319. // It's called automatically when a connection goes offline,
  320. // manually by server or client or by network failure.
  321. func (exc *StackExchange) OnDisconnect(c *neffos.Conn) {
  322. exc.delSubscriber <- closeAction{conn: c}
  323. }