conn_namespace.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. package neffos
  2. import (
  3. "context"
  4. "reflect"
  5. "sync"
  6. )
  7. // NSConn describes a connection connected to a specific namespace,
  8. // it emits with the `Message.Namespace` filled and it can join to multiple rooms.
  9. // A single `Conn` can be connected to one or more namespaces,
  10. // each connected namespace is described by this structure.
  11. type NSConn struct {
  12. Conn *Conn
  13. // Static from server, client can select which to use or not.
  14. // Client and server can ask to connect.
  15. // Server can forcely disconnect.
  16. namespace string
  17. // Static from server, client can select which to use or not.
  18. events Events
  19. // Dynamically channels/rooms for each connected namespace.
  20. // Client can ask to join, server can forcely join a connection to a room.
  21. // Namespace(room(fire event)).
  22. rooms map[string]*Room
  23. roomsMutex sync.RWMutex
  24. // value is just a temporarily value.
  25. // Storage across event callbacks for this namespace.
  26. value reflect.Value
  27. }
  28. func newNSConn(c *Conn, namespace string, events Events) *NSConn {
  29. return &NSConn{
  30. Conn: c,
  31. namespace: namespace,
  32. events: events,
  33. rooms: make(map[string]*Room),
  34. }
  35. }
  36. // String method simply returns the Conn's ID().
  37. // Useful method to this connected to a namespace connection to be passed on `Server#Broadcast` method
  38. // to exclude itself from the broadcasted message's receivers.
  39. func (ns *NSConn) String() string {
  40. return ns.Conn.String()
  41. }
  42. // Emit method sends a message to the remote side
  43. // with its `Message.Namespace` filled to this specific namespace.
  44. func (ns *NSConn) Emit(event string, body []byte) bool {
  45. if ns == nil {
  46. return false
  47. }
  48. return ns.Conn.Write(Message{Namespace: ns.namespace, Event: event, Body: body})
  49. }
  50. // EmitBinary acts like `Emit` but it sets the `Message.SetBinary` to true
  51. // and sends the data as binary, the receiver's Message in javascript-side is Uint8Array.
  52. func (ns *NSConn) EmitBinary(event string, body []byte) bool {
  53. if ns == nil {
  54. return false
  55. }
  56. return ns.Conn.Write(Message{Namespace: ns.namespace, Event: event, Body: body, SetBinary: true})
  57. }
  58. // Ask method writes a message to the remote side and blocks until a response or an error received.
  59. func (ns *NSConn) Ask(ctx context.Context, event string, body []byte) (Message, error) {
  60. if ns == nil {
  61. return Message{}, ErrWrite
  62. }
  63. return ns.Conn.Ask(ctx, Message{Namespace: ns.namespace, Event: event, Body: body})
  64. }
  65. // JoinRoom method can be used to join a connection to a specific room, rooms are dynamic.
  66. // Returns the joined `Room`.
  67. func (ns *NSConn) JoinRoom(ctx context.Context, roomName string) (*Room, error) {
  68. if ns == nil {
  69. return nil, ErrWrite
  70. }
  71. return ns.askRoomJoin(ctx, roomName)
  72. }
  73. // Room method returns a joined `Room`.
  74. func (ns *NSConn) Room(roomName string) *Room {
  75. if ns == nil {
  76. return nil
  77. }
  78. ns.roomsMutex.RLock()
  79. room := ns.rooms[roomName]
  80. ns.roomsMutex.RUnlock()
  81. return room
  82. }
  83. // Rooms returns a slice copy of the joined rooms.
  84. func (ns *NSConn) Rooms() []*Room {
  85. ns.roomsMutex.RLock()
  86. rooms := make([]*Room, len(ns.rooms))
  87. i := 0
  88. for _, room := range ns.rooms {
  89. rooms[i] = room
  90. i++
  91. }
  92. ns.roomsMutex.RUnlock()
  93. return rooms
  94. }
  95. // LeaveAll method sends a remote and local leave room signal `OnRoomLeave` to and for all rooms
  96. // and fires the `OnRoomLeft` event if succeed.
  97. func (ns *NSConn) LeaveAll(ctx context.Context) error {
  98. if ns == nil {
  99. return nil
  100. }
  101. ns.roomsMutex.Lock()
  102. defer ns.roomsMutex.Unlock()
  103. leaveMsg := Message{Namespace: ns.namespace, Event: OnRoomLeave, IsLocal: true, locked: true}
  104. for room := range ns.rooms {
  105. leaveMsg.Room = room
  106. if err := ns.askRoomLeave(ctx, leaveMsg, false); err != nil {
  107. return err
  108. }
  109. }
  110. return nil
  111. }
  112. func (ns *NSConn) forceLeaveAll(isLocal bool) {
  113. ns.roomsMutex.Lock()
  114. defer ns.roomsMutex.Unlock()
  115. leaveMsg := Message{Namespace: ns.namespace, Event: OnRoomLeave, IsForced: true, IsLocal: isLocal}
  116. for room := range ns.rooms {
  117. leaveMsg.Room = room
  118. ns.events.fireEvent(ns, leaveMsg)
  119. delete(ns.rooms, room)
  120. leaveMsg.Event = OnRoomLeft
  121. ns.events.fireEvent(ns, leaveMsg)
  122. leaveMsg.Event = OnRoomLeave
  123. }
  124. }
  125. // Disconnect method sends a disconnect signal to the remote side and fires the local `OnNamespaceDisconnect` event.
  126. func (ns *NSConn) Disconnect(ctx context.Context) error {
  127. if ns == nil {
  128. return nil
  129. }
  130. return ns.Conn.askDisconnect(ctx, Message{
  131. Namespace: ns.namespace,
  132. Event: OnNamespaceDisconnect,
  133. }, true)
  134. }
  135. func (ns *NSConn) askRoomJoin(ctx context.Context, roomName string) (*Room, error) {
  136. ns.roomsMutex.RLock()
  137. room, ok := ns.rooms[roomName]
  138. ns.roomsMutex.RUnlock()
  139. if ok {
  140. return room, nil
  141. }
  142. joinMsg := Message{
  143. Namespace: ns.namespace,
  144. Room: roomName,
  145. Event: OnRoomJoin,
  146. IsLocal: true,
  147. }
  148. _, err := ns.Conn.Ask(ctx, joinMsg)
  149. if err != nil {
  150. return nil, err
  151. }
  152. err = ns.events.fireEvent(ns, joinMsg)
  153. if err != nil {
  154. return nil, err
  155. }
  156. room = newRoom(ns, roomName)
  157. ns.roomsMutex.Lock()
  158. ns.rooms[roomName] = room
  159. ns.roomsMutex.Unlock()
  160. joinMsg.Event = OnRoomJoined
  161. ns.events.fireEvent(ns, joinMsg)
  162. return room, nil
  163. }
  164. func (ns *NSConn) replyRoomJoin(msg Message) {
  165. if ns == nil || msg.wait == "" || msg.isNoOp {
  166. return
  167. }
  168. ns.roomsMutex.RLock()
  169. _, ok := ns.rooms[msg.Room]
  170. ns.roomsMutex.RUnlock()
  171. if !ok {
  172. err := ns.events.fireEvent(ns, msg)
  173. if err != nil {
  174. msg.Err = err
  175. ns.Conn.Write(msg)
  176. return
  177. }
  178. ns.roomsMutex.Lock()
  179. ns.rooms[msg.Room] = newRoom(ns, msg.Room)
  180. ns.roomsMutex.Unlock()
  181. msg.Event = OnRoomJoined
  182. ns.events.fireEvent(ns, msg)
  183. }
  184. ns.Conn.writeEmptyReply(msg.wait)
  185. }
  186. func (ns *NSConn) askRoomLeave(ctx context.Context, msg Message, lock bool) error {
  187. if ns == nil {
  188. return nil
  189. }
  190. if lock {
  191. ns.roomsMutex.RLock()
  192. }
  193. _, ok := ns.rooms[msg.Room]
  194. if lock {
  195. ns.roomsMutex.RUnlock()
  196. }
  197. if !ok {
  198. return ErrBadRoom
  199. }
  200. _, err := ns.Conn.Ask(ctx, msg)
  201. if err != nil {
  202. return err
  203. }
  204. // msg.IsLocal = true
  205. err = ns.events.fireEvent(ns, msg)
  206. if err != nil {
  207. return err
  208. }
  209. if lock {
  210. ns.roomsMutex.Lock()
  211. }
  212. delete(ns.rooms, msg.Room)
  213. if lock {
  214. ns.roomsMutex.Unlock()
  215. }
  216. msg.Event = OnRoomLeft
  217. ns.events.fireEvent(ns, msg)
  218. return nil
  219. }
  220. func (ns *NSConn) replyRoomLeave(msg Message) {
  221. if ns == nil || msg.wait == "" || msg.isNoOp {
  222. return
  223. }
  224. room := ns.Room(msg.Room)
  225. if room == nil {
  226. ns.Conn.writeEmptyReply(msg.wait)
  227. return
  228. }
  229. // if client then we need to respond to server and delete the room without ask the local event.
  230. if ns.Conn.IsClient() {
  231. ns.events.fireEvent(ns, msg)
  232. ns.roomsMutex.Lock()
  233. delete(ns.rooms, msg.Room)
  234. ns.roomsMutex.Unlock()
  235. ns.Conn.writeEmptyReply(msg.wait)
  236. msg.Event = OnRoomLeft
  237. ns.events.fireEvent(ns, msg)
  238. return
  239. }
  240. // server-side, check for error on the local event first.
  241. err := ns.events.fireEvent(ns, msg)
  242. if err != nil {
  243. msg.Err = err
  244. ns.Conn.Write(msg)
  245. return
  246. }
  247. ns.roomsMutex.Lock()
  248. delete(ns.rooms, msg.Room)
  249. ns.roomsMutex.Unlock()
  250. msg.Event = OnRoomLeft
  251. ns.events.fireEvent(ns, msg)
  252. ns.Conn.writeEmptyReply(msg.wait)
  253. }