1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045 |
- package neffos
- import (
- "context"
- "errors"
- "net"
- "net/http"
- "sync"
- "sync/atomic"
- "time"
- )
- type (
-
- Socket interface {
-
- NetConn() net.Conn
-
- Request() *http.Request
-
- ReadData(timeout time.Duration) (body []byte, typ MessageType, err error)
-
- WriteBinary(body []byte, timeout time.Duration) error
-
- WriteText(body []byte, timeout time.Duration) error
- }
-
-
- MessageType uint8
- )
- const (
- TextMessage = iota + 1
- BinaryMessage
- )
- type Conn struct {
-
- id string
-
-
-
- serverConnID string
-
- store map[string]interface{}
- storeMutex sync.RWMutex
-
- socket Socket
-
-
- ReconnectTries int
-
- server *Server
-
-
-
- readiness *waiterOnce
-
-
- readTimeout time.Duration
-
-
- writeTimeout time.Duration
-
- namespaces Namespaces
-
- acknowledged *uint32
-
- connectedNamespaces map[string]*NSConn
- connectedNamespacesMutex sync.RWMutex
-
-
- processes *processes
- isInsideHandler *uint32
-
- waitingMessages map[string]chan Message
- waitingMessagesMutex sync.RWMutex
- allowNativeMessages bool
- shouldHandleOnlyNativeMessages bool
- queue map[MessageType][][]byte
- queueMutex sync.Mutex
-
- closed *uint32
-
- closeCh chan struct{}
- }
- func newConn(socket Socket, namespaces Namespaces) *Conn {
- c := &Conn{
- socket: socket,
- namespaces: namespaces,
- readiness: newWaiterOnce(),
- acknowledged: new(uint32),
- connectedNamespaces: make(map[string]*NSConn),
- processes: newProcesses(),
- isInsideHandler: new(uint32),
- waitingMessages: make(map[string]chan Message),
- allowNativeMessages: false,
- shouldHandleOnlyNativeMessages: false,
- closed: new(uint32),
- closeCh: make(chan struct{}),
- }
- if emptyNamespace := namespaces[""]; emptyNamespace != nil && emptyNamespace[OnNativeMessage] != nil {
- c.allowNativeMessages = true
-
-
-
-
-
- if len(c.namespaces) == 1 && len(emptyNamespace) == 1 {
- c.connectedNamespaces[""] = newNSConn(c, "", emptyNamespace)
- c.shouldHandleOnlyNativeMessages = true
- atomic.StoreUint32(c.acknowledged, 1)
- c.readiness.unwait(nil)
- }
- }
- return c
- }
- func (c *Conn) Is(connID string) bool {
- if connID == "" {
- return false
- }
- if c.IsClient() {
- return c.id == connID
- }
- return c.serverConnID == connID
- }
- func (c *Conn) ID() string {
- return c.id
- }
- func (c *Conn) String() string {
- return c.ID()
- }
- func (c *Conn) Socket() Socket {
- return c.socket
- }
- func (c *Conn) IsClient() bool {
- return c.server == nil
- }
- func (c *Conn) Server() *Server {
- if c.IsClient() {
- return nil
- }
- return c.server
- }
- func (c *Conn) Set(key string, value interface{}) {
- c.storeMutex.Lock()
- if c.store == nil {
- c.store = make(map[string]interface{})
- }
- c.store[key] = value
- c.storeMutex.Unlock()
- }
- func (c *Conn) Get(key string) interface{} {
- c.storeMutex.RLock()
- if c.store == nil {
- c.storeMutex.RUnlock()
- return nil
- }
- v := c.store[key]
- c.storeMutex.RUnlock()
- return v
- }
- func (c *Conn) Increment(key string) int {
- value := c.Get(key)
- if value == nil {
- c.Set(key, 1)
- return 1
- }
- intValue, ok := value.(int)
- if !ok {
-
- c.Set(key, 1)
- return 1
- }
- intValue++
- c.Set(key, intValue)
- return intValue
- }
- func (c *Conn) Decrement(key string) int {
- value := c.Get(key)
- if value == nil {
- c.Set(key, -1)
- return -1
- }
- intValue, ok := value.(int)
- if !ok {
-
- c.Set(key, -1)
- return -1
- }
- intValue--
- c.Set(key, intValue)
- return intValue
- }
- func (c *Conn) WasReconnected() bool {
- return c.ReconnectTries > 0
- }
- func (c *Conn) isAcknowledged() bool {
- return atomic.LoadUint32(c.acknowledged) > 0
- }
- const (
- ackBinary = 'M'
- ackIDBinary = 'A'
-
- ackNotOKBinary = 'H'
- )
- var (
- ackBinaryB = []byte{ackBinary}
- ackIDBinaryB = []byte{ackIDBinary}
- ackNotOKBinaryB = []byte{ackNotOKBinary}
- )
- func (c *Conn) sendClientACK() error {
-
-
-
- if c.shouldHandleOnlyNativeMessages {
- return nil
- }
- ok := c.write(ackBinaryB, false)
- if !ok {
- c.Close()
- return ErrWrite
- }
- err := c.readiness.wait()
- if err != nil {
- c.Close()
- }
- return err
- }
- func (c *Conn) startReader() {
- if c.IsClosed() {
- return
- }
- defer c.Close()
-
-
- for {
- b, msgTyp, err := c.socket.ReadData(c.readTimeout)
- if err != nil {
- c.readiness.unwait(err)
- return
- }
- if len(b) == 0 {
- continue
- }
- if !c.isAcknowledged() {
- if !c.handleACK(msgTyp, b) {
- return
- }
- continue
- }
- atomic.StoreUint32(c.isInsideHandler, 1)
- c.HandlePayload(msgTyp, b)
- atomic.StoreUint32(c.isInsideHandler, 0)
- }
- }
- func (c *Conn) handleACK(msgTyp MessageType, b []byte) bool {
- switch typ := b[0]; typ {
- case ackBinary:
-
- err := c.readiness.wait()
- if err != nil {
-
- c.write(append(ackNotOKBinaryB, []byte(err.Error())...), false)
- return false
- }
- atomic.StoreUint32(c.acknowledged, 1)
- c.handleQueue()
-
- return c.write(append(ackIDBinaryB, []byte(c.id)...), false)
-
-
-
-
- case ackIDBinary:
-
- id := string(b[1:])
- c.id = id
- atomic.StoreUint32(c.acknowledged, 1)
- c.readiness.unwait(nil)
-
-
-
- case ackNotOKBinary:
-
- errText := string(b[1:])
- err := errors.New(errText)
- c.readiness.unwait(err)
- return false
- default:
- c.queueMutex.Lock()
- if c.queue == nil {
- c.queue = make(map[MessageType][][]byte)
- }
- c.queue[msgTyp] = append(c.queue[msgTyp], b)
- c.queueMutex.Unlock()
- }
- return true
- }
- func (c *Conn) handleQueue() {
- c.queueMutex.Lock()
- defer c.queueMutex.Unlock()
- for msgTyp, q := range c.queue {
- for _, b := range q {
- c.HandlePayload(msgTyp, b)
- }
- delete(c.queue, msgTyp)
- }
- }
- var ErrInvalidPayload = errors.New("invalid payload")
- func (c *Conn) handleMessage(msg Message) error {
- if msg.isInvalid {
- return ErrInvalidPayload
- }
- if msg.IsNative && c.shouldHandleOnlyNativeMessages {
- ns := c.Namespace("")
- return ns.events.fireEvent(ns, msg)
- }
- if isClient := c.IsClient(); msg.IsWait(isClient) {
- if !isClient {
- if msg.FromStackExchange && c.server.usesStackExchange() {
-
-
- return c.server.StackExchange.NotifyAsk(msg, msg.wait)
- }
- c.server.waitingMessagesMutex.RLock()
- ch, ok := c.server.waitingMessages[msg.wait]
- c.server.waitingMessagesMutex.RUnlock()
- if ok {
- ch <- msg
- return nil
- }
- }
- c.waitingMessagesMutex.RLock()
- ch, ok := c.waitingMessages[msg.wait]
- c.waitingMessagesMutex.RUnlock()
- if ok {
- ch <- msg
- return nil
- }
- }
- switch msg.Event {
- case OnNamespaceConnect:
- c.replyConnect(msg)
- case OnNamespaceDisconnect:
- c.replyDisconnect(msg)
- case OnRoomJoin:
- if ns, ok := c.tryNamespace(msg); ok {
- ns.replyRoomJoin(msg)
- }
- case OnRoomLeave:
- if ns, ok := c.tryNamespace(msg); ok {
- ns.replyRoomLeave(msg)
- }
- default:
- ns, ok := c.tryNamespace(msg)
- if !ok {
-
- return ErrBadNamespace
- }
- msg.IsLocal = false
- err := ns.events.fireEvent(ns, msg)
- if err != nil {
- msg.Err = err
- c.Write(msg)
- return err
- }
- }
- return nil
- }
- func (c *Conn) DeserializeMessage(msgTyp MessageType, payload []byte) Message {
- return DeserializeMessage(msgTyp, payload, c.allowNativeMessages, c.shouldHandleOnlyNativeMessages)
- }
- func (c *Conn) HandlePayload(msgTyp MessageType, payload []byte) error {
- return c.handleMessage(c.DeserializeMessage(msgTyp, payload))
- }
- const syncWaitDur = 15 * time.Millisecond
- const maxSyncWaitDur = 10 * time.Second
- func (c *Conn) Connect(ctx context.Context, namespace string) (*NSConn, error) {
-
-
-
- if !c.IsClient() {
- c.readiness.unwait(nil)
-
-
- t := maxSyncWaitDur
- for !c.isAcknowledged() {
- time.Sleep(syncWaitDur)
- t -= syncWaitDur
- if t <= maxSyncWaitDur/2 {
- if c.IsClosed() {
- return nil, ErrWrite
- }
- }
- if t == 0 {
-
-
- if c.IsClosed() {
- return nil, ErrWrite
- }
- return nil, context.DeadlineExceeded
- }
- }
- }
- return c.askConnect(ctx, namespace)
- }
- func (c *Conn) WaitConnect(ctx context.Context, namespace string) (ns *NSConn, err error) {
- if ctx == nil {
- ctx = context.TODO()
- }
- for {
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- default:
- if ns == nil {
- ns = c.Namespace(namespace)
- }
- if ns != nil && c.isAcknowledged() {
- return
- }
- time.Sleep(syncWaitDur)
- }
- }
- }
- func (c *Conn) Namespace(namespace string) *NSConn {
- c.connectedNamespacesMutex.RLock()
- ns := c.connectedNamespaces[namespace]
- c.connectedNamespacesMutex.RUnlock()
- return ns
- }
- func (c *Conn) tryNamespace(in Message) (*NSConn, bool) {
- c.processes.get(in.Namespace).Wait()
- ns := c.Namespace(in.Namespace)
- if ns == nil {
-
-
-
- in.Err = ErrBadNamespace
- c.Write(in)
- return nil, false
- }
- return ns, true
- }
- func (c *Conn) askConnect(ctx context.Context, namespace string) (*NSConn, error) {
- p := c.processes.get(namespace)
- p.Start()
- defer p.Done()
-
-
-
-
- ns := c.Namespace(namespace)
- if ns != nil {
- return ns, nil
- }
- events, ok := c.namespaces[namespace]
- if !ok {
- return nil, ErrBadNamespace
- }
- connectMessage := Message{
- Namespace: namespace,
- Event: OnNamespaceConnect,
- IsLocal: true,
- }
- ns = newNSConn(c, namespace, events)
- err := events.fireEvent(ns, connectMessage)
- if err != nil {
- return nil, err
- }
-
- _, err = c.Ask(ctx, connectMessage)
- if err != nil {
- return nil, err
- }
-
-
-
-
-
-
-
-
- c.connectedNamespacesMutex.Lock()
- c.connectedNamespaces[namespace] = ns
- c.connectedNamespacesMutex.Unlock()
-
-
-
-
- c.notifyNamespaceConnected(ns, connectMessage)
- return ns, nil
- }
- func (c *Conn) replyConnect(msg Message) {
-
- if msg.wait == "" || msg.isNoOp {
- return
- }
- ns := c.Namespace(msg.Namespace)
- if ns != nil {
- c.writeEmptyReply(msg.wait)
- return
- }
- events, ok := c.namespaces[msg.Namespace]
- if !ok {
- msg.Err = ErrBadNamespace
- c.Write(msg)
- return
- }
- ns = newNSConn(c, msg.Namespace, events)
- err := events.fireEvent(ns, msg)
- if err != nil {
- msg.Err = err
- c.Write(msg)
- return
- }
- c.connectedNamespacesMutex.Lock()
- c.connectedNamespaces[msg.Namespace] = ns
- c.connectedNamespacesMutex.Unlock()
- c.writeEmptyReply(msg.wait)
- c.notifyNamespaceConnected(ns, msg)
- }
- func (c *Conn) notifyNamespaceConnected(ns *NSConn, connectMsg Message) {
- connectMsg.Event = OnNamespaceConnected
- ns.events.fireEvent(ns, connectMsg)
- if !c.IsClient() && c.server.usesStackExchange() {
- c.server.StackExchange.Subscribe(c, ns.namespace)
- }
- }
- func (c *Conn) notifyNamespaceDisconnect(ns *NSConn, disconnectMsg Message) {
- if !c.IsClient() && c.server.usesStackExchange() {
- c.server.StackExchange.Unsubscribe(c, disconnectMsg.Namespace)
- }
- }
- func (c *Conn) DisconnectAll(ctx context.Context) error {
- if c.shouldHandleOnlyNativeMessages {
- return nil
- }
- c.connectedNamespacesMutex.Lock()
- defer c.connectedNamespacesMutex.Unlock()
- disconnectMsg := Message{Event: OnNamespaceDisconnect, IsLocal: true, locked: true}
- for namespace := range c.connectedNamespaces {
- disconnectMsg.Namespace = namespace
- if err := c.askDisconnect(ctx, disconnectMsg, false); err != nil {
- return err
- }
- }
- return nil
- }
- func (c *Conn) askDisconnect(ctx context.Context, msg Message, lock bool) error {
- if lock {
- c.connectedNamespacesMutex.RLock()
- }
- ns := c.connectedNamespaces[msg.Namespace]
- if lock {
- c.connectedNamespacesMutex.RUnlock()
- }
- if ns == nil {
- return ErrBadNamespace
- }
- _, err := c.Ask(ctx, msg)
- if err != nil {
- return err
- }
-
-
- ns.forceLeaveAll(true)
- if lock {
- c.connectedNamespacesMutex.Lock()
- }
- delete(c.connectedNamespaces, msg.Namespace)
- if lock {
- c.connectedNamespacesMutex.Unlock()
- }
- msg.IsLocal = true
- ns.events.fireEvent(ns, msg)
- c.notifyNamespaceDisconnect(ns, msg)
- return nil
- }
- func (c *Conn) replyDisconnect(msg Message) {
- if msg.wait == "" || msg.isNoOp {
- return
- }
- ns := c.Namespace(msg.Namespace)
- if ns == nil {
- c.writeEmptyReply(msg.wait)
- return
- }
-
- if c.IsClient() {
-
-
- ns.forceLeaveAll(false)
- c.connectedNamespacesMutex.Lock()
- delete(c.connectedNamespaces, msg.Namespace)
- c.connectedNamespacesMutex.Unlock()
- c.writeEmptyReply(msg.wait)
- ns.events.fireEvent(ns, msg)
- return
- }
-
- err := ns.events.fireEvent(ns, msg)
- if err != nil {
- msg.Err = err
- c.Write(msg)
- return
- }
- ns.forceLeaveAll(false)
- c.connectedNamespacesMutex.Lock()
- delete(c.connectedNamespaces, msg.Namespace)
- c.connectedNamespacesMutex.Unlock()
- c.notifyNamespaceDisconnect(ns, msg)
- c.writeEmptyReply(msg.wait)
- }
- func (c *Conn) write(b []byte, binary bool) bool {
- var err error
- if binary {
- err = c.socket.WriteBinary(b, c.writeTimeout)
- } else {
- err = c.socket.WriteText(b, c.writeTimeout)
- }
- if err != nil {
- if IsCloseError(err) {
- c.Close()
- }
- return false
- }
- return true
- }
- func (c *Conn) canWrite(msg Message) bool {
- if c.IsClosed() {
- return false
- }
- if !c.IsClient() {
-
- c.readiness.unwait(nil)
- }
- if !msg.isConnect() && !msg.isDisconnect() {
- if !msg.locked {
- c.connectedNamespacesMutex.RLock()
- }
- ns := c.connectedNamespaces[msg.Namespace]
- if !msg.locked {
- c.connectedNamespacesMutex.RUnlock()
- }
- if ns == nil {
- return false
- }
- if msg.Room != "" && !msg.isRoomJoin() && !msg.isRoomLeft() {
- if !msg.locked {
- ns.roomsMutex.RLock()
- }
- _, ok := ns.rooms[msg.Room]
- if !msg.locked {
- ns.roomsMutex.RUnlock()
- }
- if !ok {
-
- return false
- }
- }
- }
-
-
-
-
-
-
-
-
-
-
- if c.Is(msg.FromExplicit) {
- return false
- }
- return true
- }
- func (c *Conn) Write(msg Message) bool {
- if !c.canWrite(msg) {
- return false
- }
- msg.FromExplicit = ""
- return c.write(serializeMessage(msg), msg.SetBinary)
- }
- func (c *Conn) writeEmptyReply(wait string) bool {
- return c.write(genEmptyReplyToWait(wait), false)
- }
- func (c *Conn) Ask(ctx context.Context, msg Message) (Message, error) {
- mustWaitOnlyTheNextMessage := atomic.LoadUint32(c.isInsideHandler) == 1
- return c.ask(ctx, msg, mustWaitOnlyTheNextMessage)
- }
- func (c *Conn) ask(ctx context.Context, msg Message, mustWaitOnlyTheNextMessage bool) (Message, error) {
- if c.shouldHandleOnlyNativeMessages {
-
- return Message{}, nil
- }
- if c.IsClosed() {
- return msg, CloseError{Code: -1, error: ErrWrite}
- }
- if ctx == nil {
- ctx = context.TODO()
- } else if ctx == context.TODO() {
- } else {
- if deadline, has := ctx.Deadline(); has {
- if deadline.Before(time.Now().Add(-1 * time.Second)) {
- return Message{}, context.DeadlineExceeded
- }
- }
- }
- ch := make(chan Message, 1)
- msg.wait = genWait(c.IsClient())
- if mustWaitOnlyTheNextMessage {
-
-
- go func() {
- b, msgTyp, err := c.Socket().ReadData(c.readTimeout)
- if err != nil {
- ch <- Message{Err: err, isError: true}
- return
- }
- ch <- c.DeserializeMessage(msgTyp, b)
- }()
- } else {
- c.waitingMessagesMutex.Lock()
- c.waitingMessages[msg.wait] = ch
- c.waitingMessagesMutex.Unlock()
- }
- if !c.Write(msg) {
- return Message{}, ErrWrite
- }
- select {
- case <-ctx.Done():
- if c.IsClosed() {
- return Message{}, ErrWrite
- }
- return Message{}, ctx.Err()
- case receive := <-ch:
- if !mustWaitOnlyTheNextMessage {
- c.waitingMessagesMutex.Lock()
- delete(c.waitingMessages, msg.wait)
- c.waitingMessagesMutex.Unlock()
- }
- return receive, receive.Err
- }
- }
- func (c *Conn) Close() {
- if atomic.CompareAndSwapUint32(c.closed, 0, 1) {
- if !c.shouldHandleOnlyNativeMessages {
- disconnectMsg := Message{Event: OnNamespaceDisconnect, IsForced: true, IsLocal: true}
- c.connectedNamespacesMutex.Lock()
- for namespace, ns := range c.connectedNamespaces {
-
- ns.forceLeaveAll(true)
- disconnectMsg.Namespace = ns.namespace
- ns.events.fireEvent(ns, disconnectMsg)
- delete(c.connectedNamespaces, namespace)
- }
- c.connectedNamespacesMutex.Unlock()
- c.waitingMessagesMutex.Lock()
- for wait := range c.waitingMessages {
- delete(c.waitingMessages, wait)
- }
- c.waitingMessagesMutex.Unlock()
- }
- atomic.StoreUint32(c.acknowledged, 0)
- if !c.IsClient() {
- go func() {
- c.server.disconnect <- c
- }()
- }
- close(c.closeCh)
- c.socket.NetConn().Close()
- }
- }
- func (c *Conn) IsClosed() bool {
- return atomic.LoadUint32(c.closed) > 0
- }
|