websocket.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package websocket
  2. import (
  3. "net/http"
  4. "github.com/kataras/iris/v12/context"
  5. "github.com/kataras/neffos"
  6. "github.com/kataras/neffos/gobwas"
  7. "github.com/kataras/neffos/gorilla"
  8. "github.com/kataras/neffos/stackexchange/nats"
  9. "github.com/kataras/neffos/stackexchange/redis"
  10. )
  11. var (
  12. // EnableDebug enables debug mode for websocket module,
  13. // for MVC this is done automatically
  14. // when the app's logger level is set to "debug".
  15. EnableDebug = neffos.EnableDebug
  16. // GorillaUpgrader is an upgrader type for the gorilla/websocket subprotocol implementation.
  17. // Should be used on `New` to construct the websocket server.
  18. GorillaUpgrader = gorilla.Upgrader
  19. // GobwasUpgrader is an upgrader type for the gobwas/ws subprotocol implementation.
  20. // Should be used on `New` to construct the websocket server.
  21. GobwasUpgrader = gobwas.Upgrader
  22. // DefaultGorillaUpgrader is a gorilla/websocket Upgrader with all fields set to the default values.
  23. DefaultGorillaUpgrader = gorilla.DefaultUpgrader
  24. // DefaultGobwasUpgrader is a gobwas/ws Upgrader with all fields set to the default values.
  25. DefaultGobwasUpgrader = gobwas.DefaultUpgrader
  26. // New constructs and returns a new websocket server.
  27. // Listens to incoming connections automatically, no further action is required from the caller.
  28. // The second parameter is the "connHandler", it can be
  29. // filled as `Namespaces`, `Events` or `WithTimeout`, same namespaces and events can be used on the client-side as well,
  30. // Use the `Conn#IsClient` on any event callback to determinate if it's a client-side connection or a server-side one.
  31. //
  32. // See examples for more.
  33. New = neffos.New
  34. // DefaultIDGenerator returns a universal unique identifier for a new connection.
  35. // It's the default `IDGenerator` if missing.
  36. DefaultIDGenerator = func(ctx *context.Context) string {
  37. return neffos.DefaultIDGenerator(ctx.ResponseWriter(), ctx.Request())
  38. }
  39. // NewRedisStackExchange returns a new redis StackExchange.
  40. // The "channel" input argument is the channel prefix for publish and subscribe.
  41. NewRedisStackExchange = redis.NewStackExchange
  42. // NewNatsStackExchange returns a new nats StackExchange.
  43. // The "url" input argument is the connection string of your nats server.
  44. // The second variadic input argument can be used to use custom `nats.Option`s
  45. // such as authentication and more nats servers addresses.
  46. NewNatsStackExchange = nats.NewStackExchange
  47. // WithNatsOptions can be used as the second input argument of `NewNatsStackExchange`
  48. // to declare a struct-based configuration for the nats server(s).
  49. WithNatsOptions = nats.With
  50. // GorillaDialer is a `Dialer` type for the gorilla/websocket subprotocol implementation.
  51. // Should be used on `Dial` to create a new client/client-side connection.
  52. GorillaDialer = gorilla.Dialer
  53. // GobwasDialer is a `Dialer` type for the gobwas/ws subprotocol implementation.
  54. // Should be used on `Dial` to create a new client/client-side connection.
  55. GobwasDialer = gobwas.Dialer
  56. // DefaultGorillaDialer is a gorilla/websocket dialer with all fields set to the default values.
  57. DefaultGorillaDialer = gorilla.DefaultDialer
  58. // DefaultGobwasDialer is a gobwas/ws dialer with all fields set to the default values.
  59. DefaultGobwasDialer = gobwas.DefaultDialer
  60. // Dial establishes a new websocket client connection.
  61. // Context "ctx" is used for handshake timeout.
  62. // Dialer "dial" can be either `GorillaDialer` or `GobwasDialer`,
  63. // custom dialers can be used as well when complete the `Socket` and `Dialer` interfaces for valid client.
  64. // URL "url" is the endpoint of the websocket server, i.e "ws://localhost:8080/echo".
  65. // The last parameter, and the most important one is the "connHandler", it can be
  66. // filled as `Namespaces`, `Events` or `WithTimeout`, same namespaces and events can be used on the server-side as well.
  67. //
  68. // See examples for more.
  69. Dial = neffos.Dial
  70. // IsTryingToReconnect reports whether the returning "err" from the `Server#Upgrade`
  71. // is from a client that was trying to reconnect to the websocket server.
  72. //
  73. // Look the `Conn#WasReconnected` and `Conn#ReconnectTries` too.
  74. IsTryingToReconnect = neffos.IsTryingToReconnect
  75. // NewStruct returns the `Struct` Conn Handler based on ptr value.
  76. NewStruct = neffos.NewStruct
  77. // JoinConnHandlers combines two or more ConnHandlers as one.
  78. JoinConnHandlers = neffos.JoinConnHandlers
  79. // OnNamespaceConnect is the event name which its callback is fired right before namespace connect,
  80. // if non-nil error then the remote connection's `Conn.Connect` will fail and send that error text.
  81. // Connection is not ready to emit data to the namespace.
  82. OnNamespaceConnect = neffos.OnNamespaceConnect
  83. // OnNamespaceConnected is the event name which its callback is fired after namespace successfully connected.
  84. // Connection is ready to emit data back to the namespace.
  85. OnNamespaceConnected = neffos.OnNamespaceConnected
  86. // OnNamespaceDisconnect is the event name which its callback is fired when
  87. // remote namespace disconnection or local namespace disconnection is happening.
  88. // For server-side connections the reply matters, so if error returned then the client-side cannot disconnect yet,
  89. // for client-side the return value does not matter.
  90. OnNamespaceDisconnect = neffos.OnNamespaceDisconnect // if allowed to connect then it's allowed to disconnect as well.
  91. // OnRoomJoin is the event name which its callback is fired right before room join.
  92. OnRoomJoin = neffos.OnRoomJoin // able to check if allowed to join.
  93. // OnRoomJoined is the event name which its callback is fired after the connection has successfully joined to a room.
  94. OnRoomJoined = neffos.OnRoomJoined // able to broadcast messages to room.
  95. // OnRoomLeave is the event name which its callback is fired right before room leave.
  96. OnRoomLeave = neffos.OnRoomLeave // able to broadcast bye-bye messages to room.
  97. // OnRoomLeft is the event name which its callback is fired after the connection has successfully left from a room.
  98. OnRoomLeft = neffos.OnRoomLeft // if allowed to join to a room, then its allowed to leave from it.
  99. // OnAnyEvent is the event name which its callback is fired when incoming message's event is not declared to the ConnHandler(`Events` or `Namespaces`).
  100. OnAnyEvent = neffos.OnAnyEvent // when event no match.
  101. // OnNativeMessage is fired on incoming native/raw websocket messages.
  102. // If this event defined then an incoming message can pass the check (it's an invalid message format)
  103. // with just the Message's Body filled, the Event is "OnNativeMessage" and IsNative always true.
  104. // This event should be defined under an empty namespace in order this to work.
  105. OnNativeMessage = neffos.OnNativeMessage
  106. // IsSystemEvent reports whether the "event" is a system event,
  107. // OnNamespaceConnect, OnNamespaceConnected, OnNamespaceDisconnect,
  108. // OnRoomJoin, OnRoomJoined, OnRoomLeave and OnRoomLeft.
  109. IsSystemEvent = neffos.IsSystemEvent
  110. // Reply is a special type of custom error which sends a message back to the other side
  111. // with the exact same incoming Message's Namespace (and Room if specified)
  112. // except its body which would be the given "body".
  113. Reply = neffos.Reply
  114. // Marshal marshals the "v" value and returns a Message's Body.
  115. // The "v" value's serialized value can be customized by implementing a `Marshal() ([]byte, error) ` method,
  116. // otherwise the default one will be used instead ( see `SetDefaultMarshaler` and `SetDefaultUnmarshaler`).
  117. // Errors are pushed to the result, use the object's Marshal method to catch those when necessary.
  118. Marshal = neffos.Marshal
  119. )
  120. // SetDefaultMarshaler changes the default json marshaler.
  121. // See `Marshal` package-level function and `Message.Unmarshal` method for more.
  122. func SetDefaultMarshaler(fn func(v interface{}) ([]byte, error)) {
  123. neffos.DefaultMarshaler = fn
  124. }
  125. // SetDefaultUnmarshaler changes the default json unmarshaler.
  126. // See `Message.Unmarshal` method and package-level `Marshal` function for more.
  127. func SetDefaultUnmarshaler(fn func(data []byte, v interface{}) error) {
  128. neffos.DefaultUnmarshaler = fn
  129. }
  130. // IDGenerator is an iris-specific IDGenerator for new connections.
  131. type IDGenerator func(*context.Context) string
  132. func wrapIDGenerator(idGen IDGenerator) func(ctx *context.Context) neffos.IDGenerator {
  133. return func(ctx *context.Context) neffos.IDGenerator {
  134. return func(w http.ResponseWriter, r *http.Request) string {
  135. return idGen(ctx)
  136. }
  137. }
  138. }
  139. // Handler returns an Iris handler to be served in a route of an Iris application.
  140. // Accepts the neffos websocket server as its first input argument
  141. // and optionally an Iris-specific `IDGenerator` as its second one.
  142. //
  143. // This SHOULD be the last handler in the route's chain as it hijacks the connection and the context.
  144. func Handler(s *neffos.Server, idGenerator ...IDGenerator) context.Handler {
  145. idGen := DefaultIDGenerator
  146. if len(idGenerator) > 0 {
  147. idGen = idGenerator[0]
  148. }
  149. if cb := s.OnDisconnect; cb != nil {
  150. s.OnDisconnect = func(c *neffos.Conn) {
  151. cb(c)
  152. manualReleaseWithoutResp(GetContext(c))
  153. }
  154. } else {
  155. s.OnDisconnect = func(c *neffos.Conn) {
  156. manualReleaseWithoutResp(GetContext(c))
  157. }
  158. }
  159. return func(ctx *context.Context) {
  160. if ctx.IsStopped() {
  161. // let the framework release it as always;
  162. // socket was not created so disconnect event will not called and the
  163. // DisablePoolRelease was not even called yet.
  164. return
  165. }
  166. Upgrade(ctx, idGen, s)
  167. }
  168. }
  169. // Upgrade upgrades the request and returns a new websocket Conn.
  170. // Use `Handler` for higher-level implementation instead.
  171. func Upgrade(ctx *context.Context, idGen IDGenerator, s *neffos.Server) *neffos.Conn {
  172. /* Do NOT return the error as it is captured on the OnUpgradeError listener,
  173. the end-developer should not be able to write to this http client directly. */
  174. ctx.DisablePoolRelease()
  175. conn, upgradeErr := s.Upgrade(ctx.ResponseWriter(), ctx.Request(), func(socket neffos.Socket) neffos.Socket {
  176. return &socketWrapper{
  177. Socket: socket,
  178. ctx: ctx,
  179. }
  180. }, wrapIDGenerator(idGen)(ctx))
  181. if upgradeErr != nil {
  182. manualReleaseWithoutResp(ctx)
  183. }
  184. return conn
  185. }
  186. func manualReleaseWithoutResp(ctx *context.Context) {
  187. ctx.ResponseWriter().EndResponse() // relases the response writer (common, recorder & compress).
  188. ctx.Application().GetContextPool().ReleaseLight(ctx) // just releases the context.
  189. }
  190. type socketWrapper struct {
  191. neffos.Socket
  192. ctx *context.Context
  193. }
  194. // GetContext returns the Iris Context from a websocket connection.
  195. //
  196. // Note that writing to the client connection through this Context is not allowed
  197. // from a websocket event because the connection is hijacked.
  198. // If used, you are limited for read-only access of the request e.g. read the request headers.
  199. func GetContext(c *neffos.Conn) *context.Context {
  200. if sw, ok := c.Socket().(*socketWrapper); ok {
  201. return sw.ctx
  202. }
  203. return nil
  204. }