conn.go 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045
  1. package neffos
  2. import (
  3. "context"
  4. "errors"
  5. "net"
  6. "net/http"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. type (
  12. // Socket is the interface that an underline protocol implementation should implement.
  13. Socket interface {
  14. // NetConn returns the underline net connection.
  15. NetConn() net.Conn
  16. // Request returns the http request value.
  17. Request() *http.Request
  18. // ReadData reads binary or text messages from the remote connection.
  19. ReadData(timeout time.Duration) (body []byte, typ MessageType, err error)
  20. // WriteBinary sends a binary message to the remote connection.
  21. WriteBinary(body []byte, timeout time.Duration) error
  22. // WriteText sends a text message to the remote connection.
  23. WriteText(body []byte, timeout time.Duration) error
  24. }
  25. // MessageType is a type for readen and to-send data, helpful to set `msg.SetBinary`
  26. // to the rest of the clients through a Broadcast, as SetBinary is not part of the deserialization.
  27. MessageType uint8
  28. )
  29. // See `MessageType` definition for details.
  30. const (
  31. TextMessage = iota + 1
  32. BinaryMessage
  33. )
  34. // Conn contains the websocket connection and the neffos communication functionality.
  35. // Its `Connection` will return a new `NSConn` instance.
  36. // Each connection can connect to one or more declared namespaces.
  37. // Each `NSConn` can join to multiple rooms.
  38. type Conn struct {
  39. // the ID generated by `Server#IDGenerator`.
  40. id string
  41. // serverConnID is unique per server instance and it can be comparable only within the
  42. // same server instance. Even if Server#IDGenerator
  43. // returns the same ID from the request.
  44. serverConnID string
  45. // a context-scope storage, initialized on first `Set`.
  46. store map[string]interface{}
  47. storeMutex sync.RWMutex
  48. // the gorilla or gobwas socket.
  49. socket Socket
  50. // ReconnectTries, if > 0 then this connection is a result of a client-side reconnection,
  51. // see `WasReconnected() bool`.
  52. ReconnectTries int
  53. // non-nil if server-side connection.
  54. server *Server
  55. // when sever or client is ready to handle messages,
  56. // ack and queue is available,
  57. // see `Server#ServeHTTP.?OnConnect!=nil`.
  58. readiness *waiterOnce
  59. // maximum wait time allowed to read a message from the connection.
  60. // Defaults to no timeout.
  61. readTimeout time.Duration
  62. // maximum wait time allowed to write a message to the connection.
  63. // Defaults to no timeout.
  64. writeTimeout time.Duration
  65. // the defined namespaces, allowed to connect.
  66. namespaces Namespaces
  67. // more than 0 if acknowledged.
  68. acknowledged *uint32
  69. // the connection's current connected namespace.
  70. connectedNamespaces map[string]*NSConn
  71. connectedNamespacesMutex sync.RWMutex
  72. // used to block certain actions until other action is finished,
  73. // i.e `askConnect: myNamespace` blocks the `tryNamespace: myNamespace` until finish.
  74. processes *processes
  75. isInsideHandler *uint32
  76. // messages that this connection waits for a reply.
  77. waitingMessages map[string]chan Message
  78. waitingMessagesMutex sync.RWMutex
  79. allowNativeMessages bool
  80. shouldHandleOnlyNativeMessages bool
  81. queue map[MessageType][][]byte
  82. queueMutex sync.Mutex
  83. // used to fire `conn#Close` once.
  84. closed *uint32
  85. // useful to terminate the broadcaster, see `Server#ServeHTTP.waitMessages`.
  86. closeCh chan struct{}
  87. }
  88. func newConn(socket Socket, namespaces Namespaces) *Conn {
  89. c := &Conn{
  90. socket: socket,
  91. namespaces: namespaces,
  92. readiness: newWaiterOnce(),
  93. acknowledged: new(uint32),
  94. connectedNamespaces: make(map[string]*NSConn),
  95. processes: newProcesses(),
  96. isInsideHandler: new(uint32),
  97. waitingMessages: make(map[string]chan Message),
  98. allowNativeMessages: false,
  99. shouldHandleOnlyNativeMessages: false,
  100. closed: new(uint32),
  101. closeCh: make(chan struct{}),
  102. }
  103. if emptyNamespace := namespaces[""]; emptyNamespace != nil && emptyNamespace[OnNativeMessage] != nil {
  104. c.allowNativeMessages = true
  105. // if allow native messages and only this namespace empty namespaces is registered (via Events{} for example)
  106. // and the only one event is the `OnNativeMessage`
  107. // then no need to call Connect(...) because:
  108. // client-side can use raw websocket without the neffos.js library
  109. // so no access to connect to a namespace.
  110. if len(c.namespaces) == 1 && len(emptyNamespace) == 1 {
  111. c.connectedNamespaces[""] = newNSConn(c, "", emptyNamespace)
  112. c.shouldHandleOnlyNativeMessages = true
  113. atomic.StoreUint32(c.acknowledged, 1)
  114. c.readiness.unwait(nil)
  115. }
  116. }
  117. return c
  118. }
  119. // Is reports whether the "connID" is part of this server's connections and their IDs are equal.
  120. func (c *Conn) Is(connID string) bool {
  121. if connID == "" {
  122. return false
  123. }
  124. if c.IsClient() {
  125. return c.id == connID
  126. }
  127. return c.serverConnID == connID
  128. }
  129. // ID method returns the unique identifier of the connection.
  130. // If this is a server-side connection then this value is the generated one by the `Server#IDGenerator`.
  131. // If this is a client-side connection then this value is filled on the acknowledgment process which is done on the `Client#Dial`.
  132. func (c *Conn) ID() string {
  133. return c.id
  134. }
  135. // String method simply returns the ID(). Useful for fmt usage and
  136. // to a connection to be passed on `Server#Broadcast` method
  137. // to exclude itself from the broadcasted message's receivers.
  138. func (c *Conn) String() string {
  139. return c.ID()
  140. }
  141. // Socket method returns the underline socket implementation.
  142. func (c *Conn) Socket() Socket {
  143. return c.socket
  144. }
  145. // IsClient method reports whether this connections is a client-side connetion.
  146. func (c *Conn) IsClient() bool {
  147. return c.server == nil
  148. }
  149. // Server method returns the backend server, it returns null on client-side connections.
  150. func (c *Conn) Server() *Server {
  151. if c.IsClient() {
  152. return nil
  153. }
  154. return c.server
  155. }
  156. // Set sets a value to this connection's store.
  157. func (c *Conn) Set(key string, value interface{}) {
  158. c.storeMutex.Lock()
  159. if c.store == nil {
  160. c.store = make(map[string]interface{})
  161. }
  162. c.store[key] = value
  163. c.storeMutex.Unlock()
  164. }
  165. // Get retruns a value based on the given "key"
  166. func (c *Conn) Get(key string) interface{} {
  167. c.storeMutex.RLock()
  168. if c.store == nil {
  169. c.storeMutex.RUnlock()
  170. return nil
  171. }
  172. v := c.store[key]
  173. c.storeMutex.RUnlock()
  174. return v
  175. }
  176. // Increment works like `Set` method.
  177. // It's just a helper for incrementing integer values.
  178. // If value does exist,
  179. // and it's an integer then it increments it by 1,
  180. // otherwise the value is overridden to value 1.
  181. // If value does not exist,
  182. // then it assumes the default value is 0 and it increments it by one,
  183. // the result will be 1.
  184. //
  185. // Returns the incremented value.
  186. func (c *Conn) Increment(key string) int {
  187. value := c.Get(key)
  188. if value == nil {
  189. c.Set(key, 1)
  190. return 1
  191. }
  192. intValue, ok := value.(int)
  193. if !ok {
  194. // override.
  195. c.Set(key, 1)
  196. return 1
  197. }
  198. intValue++
  199. c.Set(key, intValue)
  200. return intValue
  201. }
  202. // Decrement works like `Set` method.
  203. // It's just a helper for decrementing integer values.
  204. // If value does exist,
  205. // and it's an integer then it decrements it by 1,
  206. // otherwise the value is overridden to value -1.
  207. // If value does not exist,
  208. // then it assumes the default value is 0 and it decrements it by one,
  209. // the result will be -1.
  210. //
  211. // Calling it twice for example it will set the value to -2,
  212. // even if doesn't exist before.
  213. //
  214. // Returns the decremented value.
  215. func (c *Conn) Decrement(key string) int {
  216. value := c.Get(key)
  217. if value == nil {
  218. c.Set(key, -1)
  219. return -1
  220. }
  221. intValue, ok := value.(int)
  222. if !ok {
  223. // override.
  224. c.Set(key, -1)
  225. return -1
  226. }
  227. intValue--
  228. c.Set(key, intValue)
  229. return intValue
  230. }
  231. // WasReconnected reports whether the current connection is a result of a client-side reconnection.
  232. // To get the numbers of total retries see the `ReconnectTries` field.
  233. func (c *Conn) WasReconnected() bool {
  234. return c.ReconnectTries > 0
  235. }
  236. func (c *Conn) isAcknowledged() bool {
  237. return atomic.LoadUint32(c.acknowledged) > 0
  238. }
  239. const (
  240. ackBinary = 'M' // byte(0x1) // comes from client to server at startup.
  241. ackIDBinary = 'A' // byte(0x2) // comes from server to client after ackBinary and ready as a prefix, the rest message is the conn's ID.
  242. // ackOKBinary = 'K' // byte(0x3) // comes from client to server when id received and set-ed.
  243. ackNotOKBinary = 'H' // byte(0x4) // comes from server to client if `Server#OnConnected` errored as a prefix, the rest message is the error text.
  244. )
  245. var (
  246. ackBinaryB = []byte{ackBinary}
  247. ackIDBinaryB = []byte{ackIDBinary}
  248. ackNotOKBinaryB = []byte{ackNotOKBinary}
  249. )
  250. func (c *Conn) sendClientACK() error {
  251. // if neffos client used but in reality nor of its features are used
  252. // because end-dev set it as native only sender and receiver so any webscoket client can be used
  253. // even the browser's default; we can't accept a custom ack neither a namespace connection or two-way error handling.
  254. if c.shouldHandleOnlyNativeMessages {
  255. return nil
  256. }
  257. ok := c.write(ackBinaryB, false)
  258. if !ok {
  259. c.Close()
  260. return ErrWrite
  261. }
  262. err := c.readiness.wait()
  263. if err != nil {
  264. c.Close()
  265. }
  266. return err
  267. }
  268. func (c *Conn) startReader() {
  269. if c.IsClosed() {
  270. return
  271. }
  272. defer c.Close()
  273. // CLIENT is ready when ACK done
  274. // SERVER is ready when ACK is done AND `Server#OnConnected` returns with nil error.
  275. for {
  276. b, msgTyp, err := c.socket.ReadData(c.readTimeout)
  277. if err != nil {
  278. c.readiness.unwait(err)
  279. return
  280. }
  281. if len(b) == 0 {
  282. continue
  283. }
  284. if !c.isAcknowledged() {
  285. if !c.handleACK(msgTyp, b) {
  286. return
  287. }
  288. continue
  289. }
  290. atomic.StoreUint32(c.isInsideHandler, 1)
  291. c.HandlePayload(msgTyp, b)
  292. atomic.StoreUint32(c.isInsideHandler, 0)
  293. }
  294. }
  295. func (c *Conn) handleACK(msgTyp MessageType, b []byte) bool {
  296. switch typ := b[0]; typ {
  297. case ackBinary:
  298. // from client startup to server.
  299. err := c.readiness.wait()
  300. if err != nil {
  301. // it's not Ok, send error which client's Dial should return.
  302. c.write(append(ackNotOKBinaryB, []byte(err.Error())...), false)
  303. return false
  304. }
  305. atomic.StoreUint32(c.acknowledged, 1)
  306. c.handleQueue()
  307. // it's ok send ID.
  308. return c.write(append(ackIDBinaryB, []byte(c.id)...), false)
  309. // case ackOKBinary:
  310. // // from client to server.
  311. // atomic.StoreUint32(c.acknowledged, 1)
  312. // c.handleQueue()
  313. case ackIDBinary:
  314. // from server to client.
  315. id := string(b[1:])
  316. c.id = id
  317. atomic.StoreUint32(c.acknowledged, 1)
  318. c.readiness.unwait(nil)
  319. // c.write([]byte{ackOKBinary})
  320. // println("ackIDBinary: pass with nil")
  321. // c.handleQueue()
  322. case ackNotOKBinary:
  323. // from server to client.
  324. errText := string(b[1:])
  325. err := errors.New(errText)
  326. c.readiness.unwait(err)
  327. return false
  328. default:
  329. c.queueMutex.Lock()
  330. if c.queue == nil {
  331. c.queue = make(map[MessageType][][]byte)
  332. }
  333. c.queue[msgTyp] = append(c.queue[msgTyp], b)
  334. c.queueMutex.Unlock()
  335. }
  336. return true
  337. }
  338. func (c *Conn) handleQueue() {
  339. c.queueMutex.Lock()
  340. defer c.queueMutex.Unlock()
  341. for msgTyp, q := range c.queue {
  342. for _, b := range q {
  343. c.HandlePayload(msgTyp, b)
  344. }
  345. delete(c.queue, msgTyp)
  346. }
  347. }
  348. // ErrInvalidPayload can be returned by the internal `handleMessage`.
  349. // In the future it may be exposed by an error listener.
  350. var ErrInvalidPayload = errors.New("invalid payload")
  351. func (c *Conn) handleMessage(msg Message) error {
  352. if msg.isInvalid {
  353. return ErrInvalidPayload
  354. }
  355. if msg.IsNative && c.shouldHandleOnlyNativeMessages {
  356. ns := c.Namespace("")
  357. return ns.events.fireEvent(ns, msg)
  358. }
  359. if isClient := c.IsClient(); msg.IsWait(isClient) {
  360. if !isClient {
  361. if msg.FromStackExchange && c.server.usesStackExchange() {
  362. // Currently let's not export the wait field, instead
  363. // just accept it on the stackexchange.
  364. return c.server.StackExchange.NotifyAsk(msg, msg.wait)
  365. }
  366. c.server.waitingMessagesMutex.RLock()
  367. ch, ok := c.server.waitingMessages[msg.wait]
  368. c.server.waitingMessagesMutex.RUnlock()
  369. if ok {
  370. ch <- msg
  371. return nil
  372. }
  373. }
  374. c.waitingMessagesMutex.RLock()
  375. ch, ok := c.waitingMessages[msg.wait]
  376. c.waitingMessagesMutex.RUnlock()
  377. if ok {
  378. ch <- msg
  379. return nil
  380. }
  381. }
  382. switch msg.Event {
  383. case OnNamespaceConnect:
  384. c.replyConnect(msg)
  385. case OnNamespaceDisconnect:
  386. c.replyDisconnect(msg)
  387. case OnRoomJoin:
  388. if ns, ok := c.tryNamespace(msg); ok {
  389. ns.replyRoomJoin(msg)
  390. }
  391. case OnRoomLeave:
  392. if ns, ok := c.tryNamespace(msg); ok {
  393. ns.replyRoomLeave(msg)
  394. }
  395. default:
  396. ns, ok := c.tryNamespace(msg)
  397. if !ok {
  398. // println(msg.Namespace + " namespace and incoming message of event: " + msg.Event + " is not connected or not exists and wait?: " + msg.wait + "\n\n")
  399. return ErrBadNamespace
  400. }
  401. msg.IsLocal = false
  402. err := ns.events.fireEvent(ns, msg)
  403. if err != nil {
  404. msg.Err = err
  405. c.Write(msg)
  406. return err
  407. }
  408. }
  409. return nil
  410. }
  411. // DeserializeMessage returns a Message from the "payload".
  412. func (c *Conn) DeserializeMessage(msgTyp MessageType, payload []byte) Message {
  413. return DeserializeMessage(msgTyp, payload, c.allowNativeMessages, c.shouldHandleOnlyNativeMessages)
  414. }
  415. // HandlePayload fires manually a local event based on the "payload".
  416. func (c *Conn) HandlePayload(msgTyp MessageType, payload []byte) error {
  417. return c.handleMessage(c.DeserializeMessage(msgTyp, payload))
  418. }
  419. const syncWaitDur = 15 * time.Millisecond
  420. // 10 seconds is high value which is not realistic on healthy networks, but may useful for slow connections.
  421. // This value is used just for the ack(which is usually done before the Connect call itself) wait on Connect when on server-side only.
  422. const maxSyncWaitDur = 10 * time.Second
  423. // Connect method returns a new connected to the specific "namespace" `NSConn` value.
  424. // The "namespace" should be declared in the `connHandler` of both server and client sides.
  425. // If this is a client-side connection then the server-side namespace's `OnNamespaceConnect` event callback MUST return null
  426. // in order to allow this client-side connection to connect, otherwise a non-nil error is returned instead.
  427. func (c *Conn) Connect(ctx context.Context, namespace string) (*NSConn, error) {
  428. // if c.IsClosed() {
  429. // return nil, ErrWrite
  430. // }
  431. if !c.IsClient() {
  432. c.readiness.unwait(nil)
  433. // server-side check for ack-ed, it should be done almost immediately the client connected
  434. // but give it sometime for slow networks and add an extra check for closed after 5 seconds and a deadline of 10seconds.
  435. t := maxSyncWaitDur
  436. for !c.isAcknowledged() {
  437. time.Sleep(syncWaitDur)
  438. t -= syncWaitDur
  439. if t <= maxSyncWaitDur/2 { // check once after 5 seconds if closed.
  440. if c.IsClosed() {
  441. return nil, ErrWrite
  442. }
  443. }
  444. if t == 0 {
  445. // when maxSyncWaitDur passed,
  446. // we could use the context's deadline but it will make things slower (extracting its value slower than the sleep time).
  447. if c.IsClosed() {
  448. return nil, ErrWrite
  449. }
  450. return nil, context.DeadlineExceeded
  451. }
  452. }
  453. }
  454. return c.askConnect(ctx, namespace)
  455. }
  456. // const defaultNS = ""
  457. // func (c *Conn) DefaultNamespace() *NSConn {
  458. // ns, _ := c.Connect(nil, defaultNS)
  459. // return ns
  460. // }
  461. // WaitConnect method can be used instead of the `Connect` if the other side force-calls `Connect` to this connection
  462. // and this side wants to "waits" for that signal.
  463. //
  464. // Nil context means try without timeout, wait until it connects to the specific namespace.
  465. // Note that, this function will not return an `ErrBadNamespace` if namespace does not exist in the server-side
  466. // or it's not defined in the client-side, it waits until deadline (if any, or loop forever, so a context with deadline is highly recommended).
  467. func (c *Conn) WaitConnect(ctx context.Context, namespace string) (ns *NSConn, err error) {
  468. if ctx == nil {
  469. ctx = context.TODO()
  470. }
  471. for {
  472. select {
  473. case <-ctx.Done():
  474. return nil, ctx.Err()
  475. default:
  476. if ns == nil {
  477. ns = c.Namespace(namespace)
  478. }
  479. if ns != nil && c.isAcknowledged() {
  480. return
  481. }
  482. time.Sleep(syncWaitDur)
  483. }
  484. }
  485. }
  486. // Namespace method returns an already-connected `NSConn` value based on the given "namespace".
  487. func (c *Conn) Namespace(namespace string) *NSConn {
  488. c.connectedNamespacesMutex.RLock()
  489. ns := c.connectedNamespaces[namespace]
  490. c.connectedNamespacesMutex.RUnlock()
  491. return ns
  492. }
  493. func (c *Conn) tryNamespace(in Message) (*NSConn, bool) {
  494. c.processes.get(in.Namespace).Wait() // wait any `askConnect` process (if any) of that "in.Namespace".
  495. ns := c.Namespace(in.Namespace)
  496. if ns == nil {
  497. // if _, canConnect := c.namespaces[msg.Namespace]; !canConnect {
  498. // msg.Err = ErrForbiddenNamespace
  499. // }
  500. in.Err = ErrBadNamespace
  501. c.Write(in)
  502. return nil, false
  503. }
  504. return ns, true
  505. }
  506. // server#OnConnected -> conn#Connect
  507. // client#WaitConnect
  508. // or
  509. // client#Connect
  510. func (c *Conn) askConnect(ctx context.Context, namespace string) (*NSConn, error) {
  511. p := c.processes.get(namespace)
  512. p.Start() // block any `tryNamespace` with that "namespace".
  513. defer p.Done() // unblock.
  514. // defer c.processes.get(namespace).run()()
  515. // for !atomic.CompareAndSwapUint32(c.isConnectingProcess, 0, 1) {
  516. // }
  517. // defer atomic.StoreUint32(c.isConnectingProcess, 0)
  518. ns := c.Namespace(namespace)
  519. if ns != nil {
  520. return ns, nil
  521. }
  522. events, ok := c.namespaces[namespace]
  523. if !ok {
  524. return nil, ErrBadNamespace
  525. }
  526. connectMessage := Message{
  527. Namespace: namespace,
  528. Event: OnNamespaceConnect,
  529. IsLocal: true,
  530. }
  531. ns = newNSConn(c, namespace, events)
  532. err := events.fireEvent(ns, connectMessage)
  533. if err != nil {
  534. return nil, err
  535. }
  536. // println("ask connect")
  537. _, err = c.Ask(ctx, connectMessage) // waits for answer no matter if already connected on the other side.
  538. if err != nil {
  539. return nil, err
  540. }
  541. // println("got connect")
  542. // re-check, maybe connected so far (can happen by a simultaneously `Connect` calls on both server and client, which is not the standard way)
  543. // c.connectedNamespacesMutex.RLock()
  544. // ns, ok = c.connectedNamespaces[namespace]
  545. // c.connectedNamespacesMutex.RUnlock()
  546. // if ok {
  547. // return ns, nil
  548. // }
  549. c.connectedNamespacesMutex.Lock()
  550. c.connectedNamespaces[namespace] = ns
  551. c.connectedNamespacesMutex.Unlock()
  552. // println("we're connected")
  553. // c.writeEmptyReply(genWaitConfirmation(reply.wait))
  554. // println("wrote: " + genWaitConfirmation(reply.wait))
  555. // c.sendConfirmation(reply.wait)
  556. c.notifyNamespaceConnected(ns, connectMessage)
  557. return ns, nil
  558. }
  559. func (c *Conn) replyConnect(msg Message) {
  560. // must give answer even a noOp if already connected.
  561. if msg.wait == "" || msg.isNoOp {
  562. return
  563. }
  564. ns := c.Namespace(msg.Namespace)
  565. if ns != nil {
  566. c.writeEmptyReply(msg.wait)
  567. return
  568. }
  569. events, ok := c.namespaces[msg.Namespace]
  570. if !ok {
  571. msg.Err = ErrBadNamespace
  572. c.Write(msg)
  573. return
  574. }
  575. ns = newNSConn(c, msg.Namespace, events)
  576. err := events.fireEvent(ns, msg)
  577. if err != nil {
  578. msg.Err = err
  579. c.Write(msg)
  580. return
  581. }
  582. c.connectedNamespacesMutex.Lock()
  583. c.connectedNamespaces[msg.Namespace] = ns
  584. c.connectedNamespacesMutex.Unlock()
  585. c.writeEmptyReply(msg.wait)
  586. c.notifyNamespaceConnected(ns, msg)
  587. }
  588. func (c *Conn) notifyNamespaceConnected(ns *NSConn, connectMsg Message) {
  589. connectMsg.Event = OnNamespaceConnected
  590. ns.events.fireEvent(ns, connectMsg) // omit error, it's connected.
  591. if !c.IsClient() && c.server.usesStackExchange() {
  592. c.server.StackExchange.Subscribe(c, ns.namespace)
  593. }
  594. }
  595. func (c *Conn) notifyNamespaceDisconnect(ns *NSConn, disconnectMsg Message) {
  596. if !c.IsClient() && c.server.usesStackExchange() {
  597. c.server.StackExchange.Unsubscribe(c, disconnectMsg.Namespace)
  598. }
  599. }
  600. // DisconnectAll method disconnects from all namespaces,
  601. // `OnNamespaceDisconnect` even will be fired and its `Message.IsLocal` will be true.
  602. // The remote side gets notified.
  603. func (c *Conn) DisconnectAll(ctx context.Context) error {
  604. if c.shouldHandleOnlyNativeMessages {
  605. return nil
  606. }
  607. c.connectedNamespacesMutex.Lock()
  608. defer c.connectedNamespacesMutex.Unlock()
  609. disconnectMsg := Message{Event: OnNamespaceDisconnect, IsLocal: true, locked: true}
  610. for namespace := range c.connectedNamespaces {
  611. disconnectMsg.Namespace = namespace
  612. if err := c.askDisconnect(ctx, disconnectMsg, false); err != nil {
  613. return err
  614. }
  615. }
  616. return nil
  617. }
  618. func (c *Conn) askDisconnect(ctx context.Context, msg Message, lock bool) error {
  619. if lock {
  620. c.connectedNamespacesMutex.RLock()
  621. }
  622. ns := c.connectedNamespaces[msg.Namespace]
  623. if lock {
  624. c.connectedNamespacesMutex.RUnlock()
  625. }
  626. if ns == nil {
  627. return ErrBadNamespace
  628. }
  629. _, err := c.Ask(ctx, msg)
  630. if err != nil {
  631. return err
  632. }
  633. // if disconnect is allowed then leave rooms first with force property
  634. // before namespace's deletion.
  635. ns.forceLeaveAll(true)
  636. if lock {
  637. c.connectedNamespacesMutex.Lock()
  638. }
  639. delete(c.connectedNamespaces, msg.Namespace)
  640. if lock {
  641. c.connectedNamespacesMutex.Unlock()
  642. }
  643. msg.IsLocal = true
  644. ns.events.fireEvent(ns, msg)
  645. c.notifyNamespaceDisconnect(ns, msg)
  646. return nil
  647. }
  648. func (c *Conn) replyDisconnect(msg Message) {
  649. if msg.wait == "" || msg.isNoOp {
  650. return
  651. }
  652. ns := c.Namespace(msg.Namespace)
  653. if ns == nil {
  654. c.writeEmptyReply(msg.wait)
  655. return
  656. }
  657. // if client then we need to respond to server and delete the namespace without ask the local event.
  658. if c.IsClient() {
  659. // if disconnect is allowed then leave rooms first with force property
  660. // before namespace's deletion.
  661. ns.forceLeaveAll(false)
  662. c.connectedNamespacesMutex.Lock()
  663. delete(c.connectedNamespaces, msg.Namespace)
  664. c.connectedNamespacesMutex.Unlock()
  665. c.writeEmptyReply(msg.wait)
  666. ns.events.fireEvent(ns, msg)
  667. return
  668. }
  669. // server-side, check for error on the local event first.
  670. err := ns.events.fireEvent(ns, msg)
  671. if err != nil {
  672. msg.Err = err
  673. c.Write(msg)
  674. return
  675. }
  676. ns.forceLeaveAll(false)
  677. c.connectedNamespacesMutex.Lock()
  678. delete(c.connectedNamespaces, msg.Namespace)
  679. c.connectedNamespacesMutex.Unlock()
  680. c.notifyNamespaceDisconnect(ns, msg)
  681. c.writeEmptyReply(msg.wait)
  682. }
  683. func (c *Conn) write(b []byte, binary bool) bool {
  684. var err error
  685. if binary {
  686. err = c.socket.WriteBinary(b, c.writeTimeout)
  687. } else {
  688. err = c.socket.WriteText(b, c.writeTimeout)
  689. }
  690. if err != nil {
  691. if IsCloseError(err) {
  692. c.Close()
  693. }
  694. return false
  695. }
  696. return true
  697. }
  698. func (c *Conn) canWrite(msg Message) bool {
  699. if c.IsClosed() {
  700. return false
  701. }
  702. if !c.IsClient() {
  703. // for server-side if tries to send, then error will be not ignored but events should continue.
  704. c.readiness.unwait(nil)
  705. }
  706. if !msg.isConnect() && !msg.isDisconnect() {
  707. if !msg.locked {
  708. c.connectedNamespacesMutex.RLock()
  709. }
  710. ns := c.connectedNamespaces[msg.Namespace]
  711. if !msg.locked {
  712. c.connectedNamespacesMutex.RUnlock()
  713. }
  714. if ns == nil {
  715. return false
  716. }
  717. if msg.Room != "" && !msg.isRoomJoin() && !msg.isRoomLeft() {
  718. if !msg.locked {
  719. ns.roomsMutex.RLock()
  720. }
  721. _, ok := ns.rooms[msg.Room]
  722. if !msg.locked {
  723. ns.roomsMutex.RUnlock()
  724. }
  725. if !ok {
  726. // tried to send to a not joined room.
  727. return false
  728. }
  729. }
  730. }
  731. // if !c.IsClient() && !msg.FromStackExchange {
  732. // if exc := c.Server().StackExchange; exc != nil {
  733. // if exc.Publish(c, msg) {
  734. // return true
  735. // }
  736. // }
  737. // }
  738. // don't write if explicit "from" field is set
  739. // to this server's instance client connection ~~~but give a chance to Publish
  740. // it to other instances with the same conn ID, if any~~~.
  741. if c.Is(msg.FromExplicit) {
  742. return false
  743. }
  744. return true
  745. }
  746. // Write method sends a message to the remote side,
  747. // reports whether the connection is still available
  748. // or when this message is not allowed to be sent to the remote side.
  749. func (c *Conn) Write(msg Message) bool {
  750. if !c.canWrite(msg) {
  751. return false
  752. }
  753. msg.FromExplicit = ""
  754. return c.write(serializeMessage(msg), msg.SetBinary)
  755. }
  756. // used when `Ask` caller cares only for successful call and not the message, for performance reasons we just use raw bytes.
  757. func (c *Conn) writeEmptyReply(wait string) bool {
  758. return c.write(genEmptyReplyToWait(wait), false)
  759. }
  760. // func (c *Conn) waitConfirmation(wait string) {
  761. // wait = genWaitConfirmation(wait)
  762. // ch := make(chan Message)
  763. // c.waitingMessagesMutex.Lock()
  764. // c.waitingMessages[wait] = ch
  765. // c.waitingMessagesMutex.Unlock()
  766. // <-ch
  767. // }
  768. // func (c *Conn) sendConfirmation(wait string) {
  769. // wait = genWaitConfirmation(wait)
  770. // c.writeEmptyReply(wait)
  771. // }
  772. // Ask method sends a message to the remote side and blocks until a response or an error received from the specific `Message.Event`.
  773. func (c *Conn) Ask(ctx context.Context, msg Message) (Message, error) {
  774. mustWaitOnlyTheNextMessage := atomic.LoadUint32(c.isInsideHandler) == 1
  775. return c.ask(ctx, msg, mustWaitOnlyTheNextMessage)
  776. }
  777. func (c *Conn) ask(ctx context.Context, msg Message, mustWaitOnlyTheNextMessage bool) (Message, error) {
  778. if c.shouldHandleOnlyNativeMessages {
  779. // should panic or...
  780. return Message{}, nil
  781. }
  782. if c.IsClosed() {
  783. return msg, CloseError{Code: -1, error: ErrWrite}
  784. }
  785. if ctx == nil {
  786. ctx = context.TODO()
  787. } else if ctx == context.TODO() {
  788. } else {
  789. if deadline, has := ctx.Deadline(); has {
  790. if deadline.Before(time.Now().Add(-1 * time.Second)) {
  791. return Message{}, context.DeadlineExceeded
  792. }
  793. }
  794. }
  795. ch := make(chan Message, 1)
  796. msg.wait = genWait(c.IsClient())
  797. if mustWaitOnlyTheNextMessage {
  798. // msg.wait is not required on this state
  799. // but we still set it.
  800. go func() {
  801. b, msgTyp, err := c.Socket().ReadData(c.readTimeout)
  802. if err != nil {
  803. ch <- Message{Err: err, isError: true}
  804. return
  805. }
  806. ch <- c.DeserializeMessage(msgTyp, b)
  807. }()
  808. } else {
  809. c.waitingMessagesMutex.Lock()
  810. c.waitingMessages[msg.wait] = ch
  811. c.waitingMessagesMutex.Unlock()
  812. }
  813. if !c.Write(msg) {
  814. return Message{}, ErrWrite
  815. }
  816. select {
  817. case <-ctx.Done():
  818. if c.IsClosed() {
  819. return Message{}, ErrWrite
  820. }
  821. return Message{}, ctx.Err()
  822. case receive := <-ch:
  823. if !mustWaitOnlyTheNextMessage {
  824. c.waitingMessagesMutex.Lock()
  825. delete(c.waitingMessages, msg.wait)
  826. c.waitingMessagesMutex.Unlock()
  827. }
  828. return receive, receive.Err
  829. }
  830. }
  831. // Close method will force-disconnect from all connected namespaces and force-leave from all joined rooms
  832. // and finally will terminate the underline websocket connection.
  833. // After this method call the `Conn` is not usable anymore, a new `Dial` call is required.
  834. func (c *Conn) Close() {
  835. if atomic.CompareAndSwapUint32(c.closed, 0, 1) {
  836. if !c.shouldHandleOnlyNativeMessages {
  837. disconnectMsg := Message{Event: OnNamespaceDisconnect, IsForced: true, IsLocal: true}
  838. c.connectedNamespacesMutex.Lock()
  839. for namespace, ns := range c.connectedNamespaces {
  840. // leave rooms first with force and local property before remove the namespace completely.
  841. ns.forceLeaveAll(true)
  842. disconnectMsg.Namespace = ns.namespace
  843. ns.events.fireEvent(ns, disconnectMsg)
  844. delete(c.connectedNamespaces, namespace)
  845. }
  846. c.connectedNamespacesMutex.Unlock()
  847. c.waitingMessagesMutex.Lock()
  848. for wait := range c.waitingMessages {
  849. delete(c.waitingMessages, wait)
  850. }
  851. c.waitingMessagesMutex.Unlock()
  852. }
  853. atomic.StoreUint32(c.acknowledged, 0)
  854. if !c.IsClient() {
  855. go func() {
  856. c.server.disconnect <- c
  857. }()
  858. }
  859. close(c.closeCh)
  860. c.socket.NetConn().Close()
  861. }
  862. }
  863. // IsClosed method reports whether this connection is remotely or manually terminated.
  864. func (c *Conn) IsClosed() bool {
  865. return atomic.LoadUint32(c.closed) > 0
  866. }