message.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. package neffos
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "strconv"
  7. "strings"
  8. "time"
  9. )
  10. // The Message is the structure which describes the incoming and outcoming data.
  11. // Emitter's "body" argument is the `Message.Body` field.
  12. // Emitter's return non-nil error is the `Message.Err` field.
  13. // If native message sent then the `Message.Body` is filled with the body and
  14. // when incoming native message then the `Message.Event` is the `OnNativeMessage`,
  15. // native messages are allowed only when an empty namespace("") and its `OnNativeMessage` callback are present.
  16. //
  17. // The the raw data received/sent structured following this order:
  18. // <wait()>;
  19. // <namespace>;
  20. // <room>;
  21. // <event>;
  22. // <isError(0-1)>;
  23. // <isNoOp(0-1)>;
  24. // <body||error_message>
  25. //
  26. // Internal `serializeMessage` and
  27. // exported `DeserializeMessage` functions
  28. // do the job on `Conn#Write`, `NSConn#Emit` and `Room#Emit` calls.
  29. type Message struct {
  30. wait string
  31. // The Namespace that this message sent to/received from.
  32. Namespace string
  33. // The Room that this message sent to/received from.
  34. Room string
  35. // The Event that this message sent to/received from.
  36. Event string
  37. // The actual body of the incoming/outcoming data.
  38. Body []byte
  39. // The Err contains any message's error, if any.
  40. // Note that server-side and client-side connections can return an error instead of a message from each event callbacks,
  41. // except the clients's force Disconnect which its local event doesn't matter when disconnected manually.
  42. Err error
  43. // if true then `Err` is filled by the error message and
  44. // the last segment of incoming/outcoming serialized message is the error message instead of the body.
  45. isError bool
  46. isNoOp bool
  47. isInvalid bool
  48. // the CONN ID, filled automatically if `Server#Broadcast` first parameter of sender connection's ID is not empty,
  49. // not exposed to the subscribers (rest of the clients).
  50. // This is the ID across neffos servers when scale.
  51. from string
  52. // When sent by the same connection of the current running server instance.
  53. // This field is serialized/deserialized but it's clean on sending or receiving from a client
  54. // and it's only used on StackExchange feature.
  55. // It's serialized as the first parameter, instead of wait signal, if incoming starts with 0x.
  56. FromExplicit string // the exact Conn's pointer in this server instance.
  57. // Reports whether this message is coming from a stackexchange.
  58. // This field is not exposed and it's not serialized at all, ~local-use only~.
  59. //
  60. // The "wait" field can determinate if this message is coming from a stackexchange using its second char,
  61. // This value set based on "wait" on deserialization when coming from remote side.
  62. // Only server-side can actually set it.
  63. FromStackExchange bool
  64. // To is the connection ID of the receiver, used only when `Server#Broadcast` is called, indeed when we only need to send a message to a single connection.
  65. // The Namespace, Room are still respected at all.
  66. //
  67. // However, sending messages to a group of connections is done by the `Room` field for groups inside a namespace or just `Namespace` field as usual.
  68. // This field is not filled on sending/receiving.
  69. To string
  70. // True when event came from local (i.e client if running client) on force disconnection,
  71. // i.e OnNamespaceDisconnect and OnRoomLeave when closing a conn.
  72. // This field is not filled on sending/receiving.
  73. // Err does not matter and never sent to the other side.
  74. IsForced bool
  75. // True when asking the other side and fire the respond's event (which matches the sent for connect/disconnect/join/leave),
  76. // i.e if a client (or server) onnection want to connect
  77. // to a namespace or join to a room.
  78. // Should be used rarely, state can be checked by `Conn#IsClient() bool`.
  79. // This field is not filled on sending/receiving.
  80. IsLocal bool
  81. // True when user define it for writing, only its body is written as raw native websocket message, namespace, event and all other fields are empty.
  82. // The receiver should accept it on the `OnNativeMessage` event.
  83. // This field is not filled on sending/receiving.
  84. IsNative bool
  85. // Useful rarely internally on `Conn#Write` namespace and rooms checks, i.e `Conn#DisconnectAll` and `NSConn#RemoveAll`.
  86. // If true then the writer's checks will not lock connectedNamespacesMutex or roomsMutex again. May be useful in the future, keep that solution.
  87. locked bool
  88. // if server or client should write using Binary message or if the incoming message was readen as binary.
  89. SetBinary bool
  90. }
  91. func (m *Message) isConnect() bool {
  92. return m.Event == OnNamespaceConnect
  93. }
  94. func (m *Message) isDisconnect() bool {
  95. return m.Event == OnNamespaceDisconnect
  96. }
  97. func (m *Message) isRoomJoin() bool {
  98. return m.Event == OnRoomJoin
  99. }
  100. func (m *Message) isRoomLeft() bool {
  101. return m.Event == OnRoomLeft
  102. }
  103. // Serialize returns this message's transport format.
  104. func (m Message) Serialize() []byte {
  105. return serializeMessage(m)
  106. }
  107. type (
  108. // MessageObjectMarshaler is an optional interface that "objects"
  109. // can implement to customize their byte representation, see `Object` package-level function.
  110. MessageObjectMarshaler interface {
  111. Marshal() ([]byte, error)
  112. }
  113. // MessageObjectUnmarshaler is an optional interface that "objects"
  114. // can implement to customize their structure, see `Message.Object` method.
  115. MessageObjectUnmarshaler interface {
  116. Unmarshal(body []byte) error
  117. }
  118. )
  119. var (
  120. // DefaultMarshaler is a global, package-level alternative for `MessageObjectMarshaler`.
  121. // It's used when the `Marshal.v` parameter is not a `MessageObjectMarshaler`.
  122. DefaultMarshaler = json.Marshal
  123. // DefaultUnmarshaler is a global, package-level alternative for `MessageObjectMarshaler`.
  124. // It's used when the `Message.Unmarshal.outPtr` parameter is not a `MessageObjectUnmarshaler`.
  125. DefaultUnmarshaler = json.Unmarshal
  126. )
  127. // Marshal marshals the "v" value and returns a Message's Body.
  128. // If the "v" value is `MessageObjectMarshaler` then it returns the result of its `Marshal` method,
  129. // otherwise the DefaultMarshaler will be used instead.
  130. // Errors are pushed to the result, use the object's Marshal method to catch those when necessary.
  131. func Marshal(v interface{}) []byte {
  132. if v == nil {
  133. panic("nil assigment")
  134. }
  135. var (
  136. body []byte
  137. err error
  138. )
  139. if marshaler, ok := v.(MessageObjectMarshaler); ok {
  140. body, err = marshaler.Marshal()
  141. } else {
  142. body, err = DefaultMarshaler(v)
  143. }
  144. if err != nil {
  145. return []byte(err.Error())
  146. }
  147. return body
  148. }
  149. // Unmarshal unmarshals this Message's body to the "outPtr".
  150. // The "outPtr" must be a pointer to a value that can customize its decoded value
  151. // by implementing the `MessageObjectUnmarshaler`, otherwise the `DefaultUnmarshaler` will be used instead.
  152. func (m *Message) Unmarshal(outPtr interface{}) error {
  153. if outPtr == nil {
  154. panic("nil assigment")
  155. }
  156. if unmarshaler, ok := outPtr.(MessageObjectUnmarshaler); ok {
  157. return unmarshaler.Unmarshal(m.Body)
  158. }
  159. return DefaultUnmarshaler(m.Body, outPtr)
  160. }
  161. const (
  162. waitIsConfirmationPrefix = '#'
  163. waitComesFromClientPrefix = '$'
  164. waitComesFromStackExchange = '!'
  165. )
  166. // IsWait reports whether this message waits for a response back.
  167. func (m *Message) IsWait(isClientConn bool) bool {
  168. if m.wait == "" {
  169. return false
  170. }
  171. if m.wait[0] == waitIsConfirmationPrefix {
  172. // true even if it's not client-client but it's a confirmation message.
  173. return true
  174. }
  175. if m.wait[0] == waitComesFromClientPrefix {
  176. return isClientConn
  177. }
  178. return true
  179. }
  180. // ClearWait clears the wait token, rarely used.
  181. func (m *Message) ClearWait() bool {
  182. if m.FromExplicit == "" && m.wait != "" {
  183. m.wait = ""
  184. return true
  185. }
  186. return false
  187. }
  188. func genWait(isClientConn bool) string {
  189. now := time.Now().UnixNano()
  190. wait := strconv.FormatInt(now, 10)
  191. if isClientConn {
  192. wait = string(waitComesFromClientPrefix) + wait
  193. }
  194. return wait
  195. }
  196. // func genWaitConfirmation(wait string) string {
  197. // return string(waitIsConfirmationPrefix) + wait
  198. // }
  199. func genWaitStackExchange(wait string) string {
  200. if len(wait) < 2 {
  201. return ""
  202. }
  203. // This is the second special character.
  204. // If found, it is removed on the deserialization
  205. // and Message.FromStackExchange is set to true.
  206. return string(wait[0]+waitComesFromStackExchange) + wait[1:]
  207. }
  208. var (
  209. trueByte = []byte{'1'}
  210. falseByte = []byte{'0'}
  211. messageSeparatorString = ";"
  212. messageSeparator = []byte(messageSeparatorString)
  213. // we use this because has zero chance to be part of end-developer's Message.Namespace, Room, Event, To and Err fields,
  214. // semicolon has higher probability to exists on those values. See `escape` and `unescape`.
  215. messageFieldSeparatorReplacement = "@%!semicolon@%!"
  216. )
  217. // called on `serializeMessage` to all message's fields except the body (and error).
  218. func escape(s string) string {
  219. if len(s) == 0 {
  220. return s
  221. }
  222. return strings.Replace(s, messageSeparatorString, messageFieldSeparatorReplacement, -1)
  223. }
  224. // called on `DeserializeMessage` to all message's fields except the body (and error).
  225. func unescape(s string) string {
  226. if len(s) == 0 {
  227. return s
  228. }
  229. return strings.Replace(s, messageFieldSeparatorReplacement, messageSeparatorString, -1)
  230. }
  231. func serializeMessage(msg Message) (out []byte) {
  232. if msg.IsNative && msg.wait == "" {
  233. out = msg.Body
  234. } else {
  235. if msg.FromExplicit != "" {
  236. if msg.wait != "" {
  237. // this should never happen unless manual set of FromExplicit by end-developer which is forbidden by the higher level calls.
  238. panic("msg.wait and msg.FromExplicit cannot work together")
  239. }
  240. msg.wait = msg.FromExplicit
  241. }
  242. out = serializeOutput(msg.wait, escape(msg.Namespace), escape(msg.Room), escape(msg.Event), msg.Body, msg.Err, msg.isNoOp)
  243. }
  244. return out
  245. }
  246. func serializeOutput(wait, namespace, room, event string,
  247. body []byte,
  248. err error,
  249. isNoOp bool,
  250. ) []byte {
  251. var (
  252. isErrorByte = falseByte
  253. isNoOpByte = falseByte
  254. waitByte = []byte{}
  255. )
  256. if err != nil {
  257. if b, ok := isReply(err); ok {
  258. body = b
  259. } else {
  260. body = []byte(err.Error())
  261. isErrorByte = trueByte
  262. }
  263. }
  264. if isNoOp {
  265. isNoOpByte = trueByte
  266. }
  267. if wait != "" {
  268. waitByte = []byte(wait)
  269. }
  270. msg := bytes.Join([][]byte{ // this number of fields should match the deserializer's, see `validMessageSepCount`.
  271. waitByte,
  272. []byte(namespace),
  273. []byte(room),
  274. []byte(event),
  275. isErrorByte,
  276. isNoOpByte,
  277. body,
  278. }, messageSeparator)
  279. return msg
  280. }
  281. // DeserializeMessage accepts a serialized message []byte
  282. // and returns a neffos Message.
  283. // When allowNativeMessages only Body is filled and check about message format is skipped.
  284. func DeserializeMessage(msgTyp MessageType, b []byte, allowNativeMessages, shouldHandleOnlyNativeMessages bool) Message {
  285. wait, namespace, room, event, body, err, isNoOp, isInvalid := deserializeInput(b, allowNativeMessages, shouldHandleOnlyNativeMessages)
  286. fromExplicit := ""
  287. if isServerConnID(wait) {
  288. fromExplicit = wait
  289. wait = ""
  290. }
  291. fromStackExchange := len(wait) > 2 && wait[1] == waitComesFromStackExchange
  292. if fromStackExchange {
  293. // remove the second special char, we need to reform it,
  294. // this wait token is compared to the waiter side as it's without the information about stackexchnage.
  295. wait = string(wait[0]) + wait[2:]
  296. }
  297. return Message{
  298. wait: wait,
  299. Namespace: unescape(namespace),
  300. Room: unescape(room),
  301. Event: unescape(event),
  302. Body: body,
  303. Err: err,
  304. isError: err != nil,
  305. isNoOp: isNoOp,
  306. isInvalid: isInvalid,
  307. from: "",
  308. FromExplicit: fromExplicit,
  309. FromStackExchange: fromStackExchange,
  310. To: "",
  311. IsForced: false,
  312. IsLocal: false,
  313. IsNative: allowNativeMessages && event == OnNativeMessage,
  314. locked: false,
  315. SetBinary: msgTyp == BinaryMessage,
  316. }
  317. }
  318. const validMessageSepCount = 7
  319. var knownErrors = []error{ErrBadNamespace, ErrBadRoom, ErrWrite, ErrInvalidPayload}
  320. // RegisterKnownError registers an error that it's "known" to both server and client sides.
  321. // This simply adds an error to a list which, if its static text matches
  322. // an incoming error text then its value is set to the `Message.Error` field on the events callbacks.
  323. //
  324. // For dynamic text error, there is a special case which if
  325. // the error "err" contains
  326. // a `ResolveError(errorText string) bool` method then,
  327. // it is used to report whether this "err" is match to the incoming error text.
  328. func RegisterKnownError(err error) {
  329. for _, knownErr := range knownErrors {
  330. if err == knownErr {
  331. return
  332. }
  333. }
  334. knownErrors = append(knownErrors, err)
  335. }
  336. func resolveError(errorText string) error {
  337. for _, knownErr := range knownErrors {
  338. if resolver, ok := knownErr.(interface {
  339. ResolveError(errorText string) bool
  340. }); ok {
  341. if resolver.ResolveError(errorText) {
  342. return knownErr
  343. }
  344. }
  345. if knownErr.Error() == errorText {
  346. return knownErr
  347. }
  348. }
  349. return errors.New(errorText)
  350. }
  351. func deserializeInput(b []byte, allowNativeMessages, shouldHandleOnlyNativeMessages bool) ( // go-lint: ignore line
  352. wait,
  353. namespace,
  354. room,
  355. event string,
  356. body []byte,
  357. err error,
  358. isNoOp bool,
  359. isInvalid bool,
  360. ) {
  361. if len(b) == 0 {
  362. isInvalid = true
  363. return
  364. }
  365. if shouldHandleOnlyNativeMessages {
  366. event = OnNativeMessage
  367. body = b
  368. return
  369. }
  370. // Note: Go's SplitN returns the remainder in[6] but JavasSript's string.split behaves differently.
  371. dts := bytes.SplitN(b, messageSeparator, validMessageSepCount)
  372. if len(dts) != validMessageSepCount {
  373. if !allowNativeMessages {
  374. isInvalid = true
  375. return
  376. }
  377. event = OnNativeMessage
  378. body = b
  379. return
  380. }
  381. wait = string(dts[0])
  382. namespace = string(dts[1])
  383. room = string(dts[2])
  384. event = string(dts[3])
  385. isError := bytes.Equal(dts[4], trueByte)
  386. isNoOp = bytes.Equal(dts[5], trueByte)
  387. if b := dts[6]; len(b) > 0 {
  388. if isError {
  389. errorText := string(b)
  390. err = resolveError(errorText)
  391. } else {
  392. body = b // keep it like that.
  393. }
  394. }
  395. return
  396. }
  397. func genEmptyReplyToWait(wait string) []byte {
  398. return append([]byte(wait), bytes.Repeat(messageSeparator, validMessageSepCount-1)...)
  399. }