stackexchange.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package neffos
  2. import (
  3. "context"
  4. )
  5. // StackExchange is an optional interface
  6. // that can be used to change the way neffos
  7. // sends messages to its clients, i.e
  8. // communication between multiple neffos servers.
  9. //
  10. // See the "kataras/neffos/stackexchange" subpackage for more details.
  11. // Real-World example and usage documentation
  12. // can be found at: "kataras/neffos/_examples/redis".
  13. type StackExchange interface {
  14. // OnConnect should prepare the connection's subscriber.
  15. // It's called automatically after the neffos server's OnConnect (if any)
  16. // on incoming client connections.
  17. OnConnect(c *Conn) error
  18. // OnDisconnect should close the connection's subscriber that
  19. // created on the `OnConnect` method.
  20. // It's called automatically when a connection goes offline,
  21. // manually by server or client or by network failure.
  22. OnDisconnect(c *Conn)
  23. // Publish should publish messages through a stackexchange.
  24. // It's called automatically on neffos broadcasting.
  25. Publish(msgs []Message) bool
  26. // Subscribe should subscribe to a specific namespace,
  27. // it's called automatically on neffos namespace connected.
  28. Subscribe(c *Conn, namespace string)
  29. // Unsubscribe should unsubscribe from a specific namespace,
  30. // it's called automatically on neffos namespace disconnect.
  31. Unsubscribe(c *Conn, namespace string) // should close the subscriber.
  32. // Ask should be able to perform a server Ask to a specific client or to all clients
  33. // It blocks until response from a specific client if msg.To is filled,
  34. // otherwise will return on the first responder's reply.
  35. Ask(ctx context.Context, msg Message, token string) (Message, error)
  36. // NotifyAsk should notify and unblock a subscribed connection for this
  37. // specific message, "token" is the neffos wait signal for this message.
  38. NotifyAsk(msg Message, token string) error
  39. }
  40. // StackExchangeInitializer is an optional interface for a `StackExchange`.
  41. // It contains a single `Init` method which accepts
  42. // the registered server namespaces and returns error.
  43. // It does not called on manual `Server.StackExchange` field set,
  44. // use the `Server.UseStackExchange` to make sure that this implementation is respected.
  45. type StackExchangeInitializer interface {
  46. // Init should initialize a stackexchange, it's optional.
  47. Init(Namespaces) error
  48. }
  49. func stackExchangeInit(s StackExchange, namespaces Namespaces) error {
  50. if s != nil {
  51. if sinit, ok := s.(StackExchangeInitializer); ok {
  52. return sinit.Init(namespaces)
  53. }
  54. }
  55. return nil
  56. }
  57. // internal use only when more than one stack exchanges are registered.
  58. type stackExchangeWrapper struct {
  59. // read-only fields.
  60. parent StackExchange
  61. current StackExchange
  62. }
  63. func wrapStackExchanges(existingExc StackExchange, newExc StackExchange) StackExchange {
  64. return &stackExchangeWrapper{
  65. parent: existingExc,
  66. current: newExc,
  67. }
  68. }
  69. func (s *stackExchangeWrapper) OnConnect(c *Conn) error {
  70. // return on first error, do not wrap errors,
  71. // the server should NOT run if at least one is errored.
  72. err := s.parent.OnConnect(c)
  73. if err != nil {
  74. return err
  75. }
  76. return s.current.OnConnect(c)
  77. }
  78. func (s *stackExchangeWrapper) OnDisconnect(c *Conn) {
  79. s.parent.OnDisconnect(c)
  80. s.current.OnDisconnect(c)
  81. }
  82. func (s *stackExchangeWrapper) Publish(msgs []Message) bool {
  83. // keep try on the next but return false on any failure.
  84. okParent := s.parent.Publish(msgs)
  85. okCurrent := s.current.Publish(msgs)
  86. return okParent && okCurrent
  87. }
  88. func (s *stackExchangeWrapper) Ask(ctx context.Context, msg Message, token string) (Message, error) {
  89. // we run Ask and if one is failing then we keep trying for all stackexchanges.
  90. msg, err := s.parent.Ask(ctx, msg, token)
  91. if err != nil {
  92. msg, err = s.current.Ask(ctx, msg, token)
  93. }
  94. return msg, err
  95. }
  96. func (s *stackExchangeWrapper) NotifyAsk(msg Message, token string) error {
  97. err := s.parent.NotifyAsk(msg, token)
  98. if err != nil {
  99. return s.current.NotifyAsk(msg, token)
  100. }
  101. return nil
  102. }
  103. func (s *stackExchangeWrapper) Subscribe(c *Conn, namespace string) {
  104. s.parent.Subscribe(c, namespace)
  105. s.current.Subscribe(c, namespace)
  106. }
  107. func (s *stackExchangeWrapper) Unsubscribe(c *Conn, namespace string) {
  108. s.parent.Unsubscribe(c, namespace)
  109. s.current.Unsubscribe(c, namespace)
  110. }