123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- package nats
- import (
- "context"
- "strings"
- "sync"
- "github.com/kataras/neffos"
- "github.com/nats-io/nats.go"
- )
- // StackExchange is a `neffos.StackExchange` for nats
- // based on https://nats-io.github.io/docs/developer/tutorials/pubsub.html.
- type StackExchange struct {
- // options holds the nats options for clients.
- // Defaults to the `nats.GetDefaultOptions()` which
- // can be overridden by the `With` function on `NewStackExchange`.
- opts nats.Options
- // If you use the same nats server instance for multiple neffos apps,
- // set this to different values across your apps.
- SubjectPrefix string
- publisher *nats.Conn
- subscribers map[*neffos.Conn]*subscriber
- addSubscriber chan *subscriber
- subscribe chan subscribeAction
- unsubscribe chan unsubscribeAction
- delSubscriber chan closeAction
- }
- var _ neffos.StackExchange = (*StackExchange)(nil)
- type (
- subscriber struct {
- conn *neffos.Conn
- subConn *nats.Conn
- // To unsubscribe a connection per namespace, set on subscribe channel.
- // Key is the subject pattern, with lock for any case, although
- // they shouldn't execute in parallel from neffos conn itself.
- subscriptions map[string]*nats.Subscription
- mu sync.RWMutex
- }
- subscribeAction struct {
- conn *neffos.Conn
- namespace string
- }
- unsubscribeAction struct {
- conn *neffos.Conn
- namespace string
- }
- closeAction struct {
- conn *neffos.Conn
- }
- )
- // With accepts a nats.Options structure
- // which contains the whole configuration
- // and returns a nats.Option which can be passed
- // to the `NewStackExchange`'s second input variadic argument.
- // Note that use this method only when you want to override the default options
- // at once.
- func With(options nats.Options) nats.Option {
- return func(opts *nats.Options) error {
- *opts = options
- return nil
- }
- }
- // NewStackExchange returns a new nats StackExchange.
- // The required field is "url" which should be in the form
- // of nats connection string, e.g. nats://username:pass@localhost:4222.
- // Other option is to leave the url with localhost:4222 and pass
- // authentication options such as `nats.UserInfo(username, pass)` or
- // nats.UserCredentials("./userCredsFile") at the second variadic input argument.
- //
- // Options can be used to register nats error and close handlers too.
- //
- // Alternatively, use the `With(nats.Options)` function to
- // customize the client through struct fields.
- func NewStackExchange(url string, options ...nats.Option) (*StackExchange, error) {
- // For subscribing:
- // Use a single client or create new for each new incoming websocket connection?
- // - nats does not have a connection pool and
- // - it uses callbacks for subscribers and
- // so I assumed it's tend to be uses as single client BUT inside its source code:
- // - the connect itself is done under its nats.go/Conn.connect()
- // - the reading is done through loop waits for each server message
- // and it parses and stores field data using connection-level locks.
- // - and the subscriber at nats.go/Conn#waitForMsgs(s *Subscription) for channel use
- // also uses connection-level locks. ^ this is slower than callbacks,
- // callbacks are more low level there as far as my research goes.
- // So I will proceed with making a new nats connection for each websocket connection,
- // if anyone with more experience on nats than me has a different approach
- // we should listen to and process with actions on making it more efficient.
- // For publishing:
- // Create a connection, here, which will only be used to Publish.
- // Cache the options to be used on every client and
- // respect any customization by caller.
- opts := nats.GetDefaultOptions()
- if url == "" {
- url = nats.DefaultURL
- }
- opts.Url = url
- // TODO: export the neffos.debugEnabled
- // and set that:
- // opts.Verbose = true
- opts.NoEcho = true
- for _, opt := range options {
- if opt == nil {
- continue
- }
- if err := opt(&opts); err != nil {
- return nil, err
- }
- }
- // opts.Url may change from caller, use the struct's field to respect it.
- servers := strings.Split(opts.Url, ",")
- for i, s := range servers {
- servers[i] = strings.TrimSpace(s)
- }
- // append to make sure that any custom servers from caller
- // are respected, no check for duplications.
- opts.Servers = append(opts.Servers, servers...)
- pubConn, err := opts.Connect()
- if err != nil {
- return nil, err
- }
- exc := &StackExchange{
- opts: opts,
- SubjectPrefix: "neffos",
- publisher: pubConn,
- subscribers: make(map[*neffos.Conn]*subscriber),
- addSubscriber: make(chan *subscriber),
- delSubscriber: make(chan closeAction),
- subscribe: make(chan subscribeAction),
- unsubscribe: make(chan unsubscribeAction),
- }
- go exc.run()
- return exc, nil
- }
- func (exc *StackExchange) run() {
- for {
- select {
- case s := <-exc.addSubscriber:
- // neffos.Debugf("[%s] added to potential subscribers", s.conn.ID())
- exc.subscribers[s.conn] = s
- case m := <-exc.subscribe:
- if sub, ok := exc.subscribers[m.conn]; ok {
- if sub.subConn.IsClosed() {
- // neffos.Debugf("[%s] has an unexpected nats connection closing on subscribe", m.conn.ID())
- delete(exc.subscribers, m.conn)
- continue
- }
- subject := exc.getSubject(m.namespace, "", "")
- // neffos.Debugf("[%s] subscribed to [%s]", m.conn.ID(), subject)
- subscription, err := sub.subConn.Subscribe(subject, makeMsgHandler(sub.conn))
- if err != nil {
- continue
- }
- sub.subConn.Flush()
- if err = sub.subConn.LastError(); err != nil {
- // neffos.Debugf("[%s] OnSubscribe [%s] Last Error: %v", m.conn, subject, err)
- continue
- }
- sub.mu.Lock()
- if sub.subscriptions == nil {
- sub.subscriptions = make(map[string]*nats.Subscription)
- }
- sub.subscriptions[subject] = subscription
- sub.mu.Unlock()
- }
- case m := <-exc.unsubscribe:
- if sub, ok := exc.subscribers[m.conn]; ok {
- if sub.subConn.IsClosed() {
- // neffos.Debugf("[%s] has an unexpected nats connection closing on unsubscribe", m.conn.ID())
- delete(exc.subscribers, m.conn)
- continue
- }
- subject := exc.getSubject(m.namespace, "", "")
- // neffos.Debugf("[%s] unsubscribed from [%s]", subject)
- if sub.subscriptions == nil {
- continue
- }
- sub.mu.RLock()
- subscription, ok := sub.subscriptions[subject]
- sub.mu.RUnlock()
- if ok {
- subscription.Unsubscribe()
- }
- }
- case m := <-exc.delSubscriber:
- if sub, ok := exc.subscribers[m.conn]; ok {
- // neffos.Debugf("[%s] disconnected", m.conn.ID())
- if sub.subConn.IsConnected() {
- sub.subConn.Close()
- }
- delete(exc.subscribers, m.conn)
- }
- }
- }
- }
- // Nats does not allow ending with ".", it uses pattern matching.
- func (exc *StackExchange) getSubject(namespace, room, connID string) string {
- if connID != "" {
- // publish direct and let the server-side do the checks
- // of valid or invalid message to send on this particular client.
- return exc.SubjectPrefix + "." + connID
- }
- if namespace == "" && room != "" {
- // should never happen but give info for debugging.
- panic("namespace cannot be empty when sending to a namespace's room")
- }
- return exc.SubjectPrefix + "." + namespace
- }
- func makeMsgHandler(c *neffos.Conn) nats.MsgHandler {
- return func(m *nats.Msg) {
- msg := c.DeserializeMessage(neffos.TextMessage, m.Data)
- msg.FromStackExchange = true
- c.Write(msg)
- }
- }
- // OnConnect prepares the connection nats subscriber
- // and subscribes to itself for direct neffos messages.
- // It's called automatically after the neffos server's OnConnect (if any)
- // on incoming client connections.
- func (exc *StackExchange) OnConnect(c *neffos.Conn) error {
- subConn, err := exc.opts.Connect()
- if err != nil {
- // neffos.Debugf("[%s] OnConnect Error: %v", c, err)
- return err
- }
- selfSubject := exc.getSubject("", "", c.ID())
- // unsubscribes automatically on close.
- _, err = subConn.Subscribe(selfSubject, makeMsgHandler(c))
- if err != nil {
- // neffos.Debugf("[%s] OnConnect.SelfSubscribe Error: %v", c, err)
- return err
- }
- subConn.Flush()
- if err = subConn.LastError(); err != nil {
- // maybe an invalid subject, send back to the client which will window.alert it.
- // neffos.Debugf("[%s] OnConnect.SelfSubscribe Last Error: %v", c, err)
- return err
- }
- s := &subscriber{
- conn: c,
- subConn: subConn,
- }
- exc.addSubscriber <- s
- return nil
- }
- // Publish publishes messages through nats.
- // It's called automatically on neffos broadcasting.
- func (exc *StackExchange) Publish(msgs []neffos.Message) bool {
- for _, msg := range msgs {
- if !exc.publish(msg) {
- return false
- }
- }
- return true
- }
- func (exc *StackExchange) publish(msg neffos.Message) bool {
- subject := exc.getSubject(msg.Namespace, msg.Room, msg.To)
- b := msg.Serialize()
- err := exc.publisher.Publish(subject, b)
- // Let's not add logging options, let
- // any custom nats error handler alone.
- return err == nil
- }
- // Ask implements server Ask for nats. It blocks.
- func (exc *StackExchange) Ask(ctx context.Context, msg neffos.Message, token string) (response neffos.Message, err error) {
- // for some reason we can't use the exc.publisher.Subscribe,
- // so create a new connection for subscription which will be terminated on message receive or timeout.
- subConn, err := exc.opts.Connect()
- if err != nil {
- return
- }
- ch := make(chan neffos.Message)
- sub, err := subConn.Subscribe(token, func(m *nats.Msg) {
- ch <- neffos.DeserializeMessage(neffos.TextMessage, m.Data, false, false)
- })
- if err != nil {
- return response, err
- }
- defer sub.Unsubscribe()
- defer subConn.Close()
- if !exc.publish(msg) {
- return response, neffos.ErrWrite
- }
- select {
- case <-ctx.Done():
- return response, ctx.Err()
- case response = <-ch:
- return response, response.Err
- }
- }
- // NotifyAsk notifies and unblocks a "msg" subscriber, called on a server connection's read when expects a result.
- func (exc *StackExchange) NotifyAsk(msg neffos.Message, token string) error {
- msg.ClearWait()
- err := exc.publisher.Publish(token, msg.Serialize())
- if err != nil {
- return err
- }
- exc.publisher.Flush()
- return exc.publisher.LastError()
- }
- // Subscribe subscribes to a specific namespace,
- // it's called automatically on neffos namespace connected.
- func (exc *StackExchange) Subscribe(c *neffos.Conn, namespace string) {
- exc.subscribe <- subscribeAction{
- conn: c,
- namespace: namespace,
- }
- }
- // Unsubscribe unsubscribes from a specific namespace,
- // it's called automatically on neffos namespace disconnect.
- func (exc *StackExchange) Unsubscribe(c *neffos.Conn, namespace string) {
- exc.unsubscribe <- unsubscribeAction{
- conn: c,
- namespace: namespace,
- }
- }
- // OnDisconnect terminates the connection's subscriber that
- // created on the `OnConnect` method.
- // It unsubscribes to all opened channels and
- // closes the internal read messages channel.
- // It's called automatically when a connection goes offline,
- // manually by server or client or by network failure.
- func (exc *StackExchange) OnDisconnect(c *neffos.Conn) {
- exc.delSubscriber <- closeAction{conn: c}
- }
|