ws.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780
  1. // Copyright 2021-2023 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package nats
  14. import (
  15. "bufio"
  16. "bytes"
  17. "crypto/rand"
  18. "crypto/sha1"
  19. "encoding/base64"
  20. "encoding/binary"
  21. "errors"
  22. "fmt"
  23. "io"
  24. mrand "math/rand"
  25. "net/http"
  26. "net/url"
  27. "strings"
  28. "time"
  29. "unicode/utf8"
  30. "github.com/klauspost/compress/flate"
  31. )
  32. type wsOpCode int
  33. const (
  34. // From https://tools.ietf.org/html/rfc6455#section-5.2
  35. wsTextMessage = wsOpCode(1)
  36. wsBinaryMessage = wsOpCode(2)
  37. wsCloseMessage = wsOpCode(8)
  38. wsPingMessage = wsOpCode(9)
  39. wsPongMessage = wsOpCode(10)
  40. wsFinalBit = 1 << 7
  41. wsRsv1Bit = 1 << 6 // Used for compression, from https://tools.ietf.org/html/rfc7692#section-6
  42. wsRsv2Bit = 1 << 5
  43. wsRsv3Bit = 1 << 4
  44. wsMaskBit = 1 << 7
  45. wsContinuationFrame = 0
  46. wsMaxFrameHeaderSize = 14
  47. wsMaxControlPayloadSize = 125
  48. wsCloseSatusSize = 2
  49. // From https://tools.ietf.org/html/rfc6455#section-11.7
  50. wsCloseStatusNormalClosure = 1000
  51. wsCloseStatusNoStatusReceived = 1005
  52. wsCloseStatusAbnormalClosure = 1006
  53. wsCloseStatusInvalidPayloadData = 1007
  54. wsScheme = "ws"
  55. wsSchemeTLS = "wss"
  56. wsPMCExtension = "permessage-deflate" // per-message compression
  57. wsPMCSrvNoCtx = "server_no_context_takeover"
  58. wsPMCCliNoCtx = "client_no_context_takeover"
  59. wsPMCReqHeaderValue = wsPMCExtension + "; " + wsPMCSrvNoCtx + "; " + wsPMCCliNoCtx
  60. )
  61. // From https://tools.ietf.org/html/rfc6455#section-1.3
  62. var wsGUID = []byte("258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
  63. var compressFinalBlock = []byte{0x00, 0x00, 0xff, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff}
  64. type websocketReader struct {
  65. r io.Reader
  66. pending [][]byte
  67. ib []byte
  68. ff bool
  69. fc bool
  70. nl bool
  71. dc *wsDecompressor
  72. nc *Conn
  73. }
  74. type wsDecompressor struct {
  75. flate io.ReadCloser
  76. bufs [][]byte
  77. off int
  78. }
  79. type websocketWriter struct {
  80. w io.Writer
  81. compress bool
  82. compressor *flate.Writer
  83. ctrlFrames [][]byte // pending frames that should be sent at the next Write()
  84. cm []byte // close message that needs to be sent when everything else has been sent
  85. cmDone bool // a close message has been added or sent (never going back to false)
  86. noMoreSend bool // if true, even if there is a Write() call, we should not send anything
  87. }
  88. func (d *wsDecompressor) Read(dst []byte) (int, error) {
  89. if len(dst) == 0 {
  90. return 0, nil
  91. }
  92. if len(d.bufs) == 0 {
  93. return 0, io.EOF
  94. }
  95. copied := 0
  96. rem := len(dst)
  97. for buf := d.bufs[0]; buf != nil && rem > 0; {
  98. n := len(buf[d.off:])
  99. if n > rem {
  100. n = rem
  101. }
  102. copy(dst[copied:], buf[d.off:d.off+n])
  103. copied += n
  104. rem -= n
  105. d.off += n
  106. buf = d.nextBuf()
  107. }
  108. return copied, nil
  109. }
  110. func (d *wsDecompressor) nextBuf() []byte {
  111. // We still have remaining data in the first buffer
  112. if d.off != len(d.bufs[0]) {
  113. return d.bufs[0]
  114. }
  115. // We read the full first buffer. Reset offset.
  116. d.off = 0
  117. // We were at the last buffer, so we are done.
  118. if len(d.bufs) == 1 {
  119. d.bufs = nil
  120. return nil
  121. }
  122. // Here we move to the next buffer.
  123. d.bufs = d.bufs[1:]
  124. return d.bufs[0]
  125. }
  126. func (d *wsDecompressor) ReadByte() (byte, error) {
  127. if len(d.bufs) == 0 {
  128. return 0, io.EOF
  129. }
  130. b := d.bufs[0][d.off]
  131. d.off++
  132. d.nextBuf()
  133. return b, nil
  134. }
  135. func (d *wsDecompressor) addBuf(b []byte) {
  136. d.bufs = append(d.bufs, b)
  137. }
  138. func (d *wsDecompressor) decompress() ([]byte, error) {
  139. d.off = 0
  140. // As per https://tools.ietf.org/html/rfc7692#section-7.2.2
  141. // add 0x00, 0x00, 0xff, 0xff and then a final block so that flate reader
  142. // does not report unexpected EOF.
  143. d.bufs = append(d.bufs, compressFinalBlock)
  144. // Create or reset the decompressor with his object (wsDecompressor)
  145. // that provides Read() and ReadByte() APIs that will consume from
  146. // the compressed buffers (d.bufs).
  147. if d.flate == nil {
  148. d.flate = flate.NewReader(d)
  149. } else {
  150. d.flate.(flate.Resetter).Reset(d, nil)
  151. }
  152. b, err := io.ReadAll(d.flate)
  153. // Now reset the compressed buffers list
  154. d.bufs = nil
  155. return b, err
  156. }
  157. func wsNewReader(r io.Reader) *websocketReader {
  158. return &websocketReader{r: r, ff: true}
  159. }
  160. // From now on, reads will be from the readLoop and we will need to
  161. // acquire the connection lock should we have to send/write a control
  162. // message from handleControlFrame.
  163. //
  164. // Note: this runs under the connection lock.
  165. func (r *websocketReader) doneWithConnect() {
  166. r.nl = true
  167. }
  168. func (r *websocketReader) Read(p []byte) (int, error) {
  169. var err error
  170. var buf []byte
  171. if l := len(r.ib); l > 0 {
  172. buf = r.ib
  173. r.ib = nil
  174. } else {
  175. if len(r.pending) > 0 {
  176. return r.drainPending(p), nil
  177. }
  178. // Get some data from the underlying reader.
  179. n, err := r.r.Read(p)
  180. if err != nil {
  181. return 0, err
  182. }
  183. buf = p[:n]
  184. }
  185. // Now parse this and decode frames. We will possibly read more to
  186. // ensure that we get a full frame.
  187. var (
  188. tmpBuf []byte
  189. pos int
  190. max = len(buf)
  191. rem = 0
  192. )
  193. for pos < max {
  194. b0 := buf[pos]
  195. frameType := wsOpCode(b0 & 0xF)
  196. final := b0&wsFinalBit != 0
  197. compressed := b0&wsRsv1Bit != 0
  198. pos++
  199. tmpBuf, pos, err = wsGet(r.r, buf, pos, 1)
  200. if err != nil {
  201. return 0, err
  202. }
  203. b1 := tmpBuf[0]
  204. // Store size in case it is < 125
  205. rem = int(b1 & 0x7F)
  206. switch frameType {
  207. case wsPingMessage, wsPongMessage, wsCloseMessage:
  208. if rem > wsMaxControlPayloadSize {
  209. return 0, fmt.Errorf(
  210. fmt.Sprintf("control frame length bigger than maximum allowed of %v bytes",
  211. wsMaxControlPayloadSize))
  212. }
  213. if compressed {
  214. return 0, errors.New("control frame should not be compressed")
  215. }
  216. if !final {
  217. return 0, errors.New("control frame does not have final bit set")
  218. }
  219. case wsTextMessage, wsBinaryMessage:
  220. if !r.ff {
  221. return 0, errors.New("new message started before final frame for previous message was received")
  222. }
  223. r.ff = final
  224. r.fc = compressed
  225. case wsContinuationFrame:
  226. // Compressed bit must be only set in the first frame
  227. if r.ff || compressed {
  228. return 0, errors.New("invalid continuation frame")
  229. }
  230. r.ff = final
  231. default:
  232. return 0, fmt.Errorf("unknown opcode %v", frameType)
  233. }
  234. // If the encoded size is <= 125, then `rem` is simply the remainder size of the
  235. // frame. If it is 126, then the actual size is encoded as a uint16. For larger
  236. // frames, `rem` will initially be 127 and the actual size is encoded as a uint64.
  237. switch rem {
  238. case 126:
  239. tmpBuf, pos, err = wsGet(r.r, buf, pos, 2)
  240. if err != nil {
  241. return 0, err
  242. }
  243. rem = int(binary.BigEndian.Uint16(tmpBuf))
  244. case 127:
  245. tmpBuf, pos, err = wsGet(r.r, buf, pos, 8)
  246. if err != nil {
  247. return 0, err
  248. }
  249. rem = int(binary.BigEndian.Uint64(tmpBuf))
  250. }
  251. // Handle control messages in place...
  252. if wsIsControlFrame(frameType) {
  253. pos, err = r.handleControlFrame(frameType, buf, pos, rem)
  254. if err != nil {
  255. return 0, err
  256. }
  257. rem = 0
  258. continue
  259. }
  260. var b []byte
  261. // This ensures that we get the full payload for this frame.
  262. b, pos, err = wsGet(r.r, buf, pos, rem)
  263. if err != nil {
  264. return 0, err
  265. }
  266. // We read the full frame.
  267. rem = 0
  268. addToPending := true
  269. if r.fc {
  270. // Don't add to pending if we are not dealing with the final frame.
  271. addToPending = r.ff
  272. // Add the compressed payload buffer to the list.
  273. r.addCBuf(b)
  274. // Decompress only when this is the final frame.
  275. if r.ff {
  276. b, err = r.dc.decompress()
  277. if err != nil {
  278. return 0, err
  279. }
  280. r.fc = false
  281. }
  282. }
  283. // Add to the pending list if dealing with uncompressed frames or
  284. // after we have received the full compressed message and decompressed it.
  285. if addToPending {
  286. r.pending = append(r.pending, b)
  287. }
  288. }
  289. // In case of compression, there may be nothing to drain
  290. if len(r.pending) > 0 {
  291. return r.drainPending(p), nil
  292. }
  293. return 0, nil
  294. }
  295. func (r *websocketReader) addCBuf(b []byte) {
  296. if r.dc == nil {
  297. r.dc = &wsDecompressor{}
  298. }
  299. // Add a copy of the incoming buffer to the list of compressed buffers.
  300. r.dc.addBuf(append([]byte(nil), b...))
  301. }
  302. func (r *websocketReader) drainPending(p []byte) int {
  303. var n int
  304. var max = len(p)
  305. for i, buf := range r.pending {
  306. if n+len(buf) <= max {
  307. copy(p[n:], buf)
  308. n += len(buf)
  309. } else {
  310. // Is there room left?
  311. if n < max {
  312. // Write the partial and update this slice.
  313. rem := max - n
  314. copy(p[n:], buf[:rem])
  315. n += rem
  316. r.pending[i] = buf[rem:]
  317. }
  318. // These are the remaining slices that will need to be used at
  319. // the next Read() call.
  320. r.pending = r.pending[i:]
  321. return n
  322. }
  323. }
  324. r.pending = r.pending[:0]
  325. return n
  326. }
  327. func wsGet(r io.Reader, buf []byte, pos, needed int) ([]byte, int, error) {
  328. avail := len(buf) - pos
  329. if avail >= needed {
  330. return buf[pos : pos+needed], pos + needed, nil
  331. }
  332. b := make([]byte, needed)
  333. start := copy(b, buf[pos:])
  334. for start != needed {
  335. n, err := r.Read(b[start:cap(b)])
  336. start += n
  337. if err != nil {
  338. return b, start, err
  339. }
  340. }
  341. return b, pos + avail, nil
  342. }
  343. func (r *websocketReader) handleControlFrame(frameType wsOpCode, buf []byte, pos, rem int) (int, error) {
  344. var payload []byte
  345. var err error
  346. if rem > 0 {
  347. payload, pos, err = wsGet(r.r, buf, pos, rem)
  348. if err != nil {
  349. return pos, err
  350. }
  351. }
  352. switch frameType {
  353. case wsCloseMessage:
  354. status := wsCloseStatusNoStatusReceived
  355. var body string
  356. lp := len(payload)
  357. // If there is a payload, the status is represented as a 2-byte
  358. // unsigned integer (in network byte order). Then, there may be an
  359. // optional body.
  360. hasStatus, hasBody := lp >= wsCloseSatusSize, lp > wsCloseSatusSize
  361. if hasStatus {
  362. // Decode the status
  363. status = int(binary.BigEndian.Uint16(payload[:wsCloseSatusSize]))
  364. // Now if there is a body, capture it and make sure this is a valid UTF-8.
  365. if hasBody {
  366. body = string(payload[wsCloseSatusSize:])
  367. if !utf8.ValidString(body) {
  368. // https://tools.ietf.org/html/rfc6455#section-5.5.1
  369. // If body is present, it must be a valid utf8
  370. status = wsCloseStatusInvalidPayloadData
  371. body = "invalid utf8 body in close frame"
  372. }
  373. }
  374. }
  375. r.nc.wsEnqueueCloseMsg(r.nl, status, body)
  376. // Return io.EOF so that readLoop will close the connection as client closed
  377. // after processing pending buffers.
  378. return pos, io.EOF
  379. case wsPingMessage:
  380. r.nc.wsEnqueueControlMsg(r.nl, wsPongMessage, payload)
  381. case wsPongMessage:
  382. // Nothing to do..
  383. }
  384. return pos, nil
  385. }
  386. func (w *websocketWriter) Write(p []byte) (int, error) {
  387. if w.noMoreSend {
  388. return 0, nil
  389. }
  390. var total int
  391. var n int
  392. var err error
  393. // If there are control frames, they can be sent now. Actually spec says
  394. // that they should be sent ASAP, so we will send before any application data.
  395. if len(w.ctrlFrames) > 0 {
  396. n, err = w.writeCtrlFrames()
  397. if err != nil {
  398. return n, err
  399. }
  400. total += n
  401. }
  402. // Do the following only if there is something to send.
  403. // We will end with checking for need to send close message.
  404. if len(p) > 0 {
  405. if w.compress {
  406. buf := &bytes.Buffer{}
  407. if w.compressor == nil {
  408. w.compressor, _ = flate.NewWriter(buf, flate.BestSpeed)
  409. } else {
  410. w.compressor.Reset(buf)
  411. }
  412. if n, err = w.compressor.Write(p); err != nil {
  413. return n, err
  414. }
  415. if err = w.compressor.Flush(); err != nil {
  416. return n, err
  417. }
  418. b := buf.Bytes()
  419. p = b[:len(b)-4]
  420. }
  421. fh, key := wsCreateFrameHeader(w.compress, wsBinaryMessage, len(p))
  422. wsMaskBuf(key, p)
  423. n, err = w.w.Write(fh)
  424. total += n
  425. if err == nil {
  426. n, err = w.w.Write(p)
  427. total += n
  428. }
  429. }
  430. if err == nil && w.cm != nil {
  431. n, err = w.writeCloseMsg()
  432. total += n
  433. }
  434. return total, err
  435. }
  436. func (w *websocketWriter) writeCtrlFrames() (int, error) {
  437. var (
  438. n int
  439. total int
  440. i int
  441. err error
  442. )
  443. for ; i < len(w.ctrlFrames); i++ {
  444. buf := w.ctrlFrames[i]
  445. n, err = w.w.Write(buf)
  446. total += n
  447. if err != nil {
  448. break
  449. }
  450. }
  451. if i != len(w.ctrlFrames) {
  452. w.ctrlFrames = w.ctrlFrames[i+1:]
  453. } else {
  454. w.ctrlFrames = w.ctrlFrames[:0]
  455. }
  456. return total, err
  457. }
  458. func (w *websocketWriter) writeCloseMsg() (int, error) {
  459. n, err := w.w.Write(w.cm)
  460. w.cm, w.noMoreSend = nil, true
  461. return n, err
  462. }
  463. func wsMaskBuf(key, buf []byte) {
  464. for i := 0; i < len(buf); i++ {
  465. buf[i] ^= key[i&3]
  466. }
  467. }
  468. // Create the frame header.
  469. // Encodes the frame type and optional compression flag, and the size of the payload.
  470. func wsCreateFrameHeader(compressed bool, frameType wsOpCode, l int) ([]byte, []byte) {
  471. fh := make([]byte, wsMaxFrameHeaderSize)
  472. n, key := wsFillFrameHeader(fh, compressed, frameType, l)
  473. return fh[:n], key
  474. }
  475. func wsFillFrameHeader(fh []byte, compressed bool, frameType wsOpCode, l int) (int, []byte) {
  476. var n int
  477. b := byte(frameType)
  478. b |= wsFinalBit
  479. if compressed {
  480. b |= wsRsv1Bit
  481. }
  482. b1 := byte(wsMaskBit)
  483. switch {
  484. case l <= 125:
  485. n = 2
  486. fh[0] = b
  487. fh[1] = b1 | byte(l)
  488. case l < 65536:
  489. n = 4
  490. fh[0] = b
  491. fh[1] = b1 | 126
  492. binary.BigEndian.PutUint16(fh[2:], uint16(l))
  493. default:
  494. n = 10
  495. fh[0] = b
  496. fh[1] = b1 | 127
  497. binary.BigEndian.PutUint64(fh[2:], uint64(l))
  498. }
  499. var key []byte
  500. var keyBuf [4]byte
  501. if _, err := io.ReadFull(rand.Reader, keyBuf[:4]); err != nil {
  502. kv := mrand.Int31()
  503. binary.LittleEndian.PutUint32(keyBuf[:4], uint32(kv))
  504. }
  505. copy(fh[n:], keyBuf[:4])
  506. key = fh[n : n+4]
  507. n += 4
  508. return n, key
  509. }
  510. func (nc *Conn) wsInitHandshake(u *url.URL) error {
  511. compress := nc.Opts.Compression
  512. tlsRequired := u.Scheme == wsSchemeTLS || nc.Opts.Secure || nc.Opts.TLSConfig != nil || nc.Opts.TLSCertCB != nil || nc.Opts.RootCAsCB != nil
  513. // Do TLS here as needed.
  514. if tlsRequired {
  515. if err := nc.makeTLSConn(); err != nil {
  516. return err
  517. }
  518. } else {
  519. nc.bindToNewConn()
  520. }
  521. var err error
  522. // For http request, we need the passed URL to contain either http or https scheme.
  523. scheme := "http"
  524. if tlsRequired {
  525. scheme = "https"
  526. }
  527. ustr := fmt.Sprintf("%s://%s", scheme, u.Host)
  528. if nc.Opts.ProxyPath != "" {
  529. proxyPath := nc.Opts.ProxyPath
  530. if !strings.HasPrefix(proxyPath, "/") {
  531. proxyPath = "/" + proxyPath
  532. }
  533. ustr += proxyPath
  534. }
  535. u, err = url.Parse(ustr)
  536. if err != nil {
  537. return err
  538. }
  539. req := &http.Request{
  540. Method: "GET",
  541. URL: u,
  542. Proto: "HTTP/1.1",
  543. ProtoMajor: 1,
  544. ProtoMinor: 1,
  545. Header: make(http.Header),
  546. Host: u.Host,
  547. }
  548. wsKey, err := wsMakeChallengeKey()
  549. if err != nil {
  550. return err
  551. }
  552. req.Header["Upgrade"] = []string{"websocket"}
  553. req.Header["Connection"] = []string{"Upgrade"}
  554. req.Header["Sec-WebSocket-Key"] = []string{wsKey}
  555. req.Header["Sec-WebSocket-Version"] = []string{"13"}
  556. if compress {
  557. req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
  558. }
  559. if err := req.Write(nc.conn); err != nil {
  560. return err
  561. }
  562. var resp *http.Response
  563. br := bufio.NewReaderSize(nc.conn, 4096)
  564. nc.conn.SetReadDeadline(time.Now().Add(nc.Opts.Timeout))
  565. resp, err = http.ReadResponse(br, req)
  566. if err == nil &&
  567. (resp.StatusCode != 101 ||
  568. !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
  569. !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
  570. resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
  571. err = fmt.Errorf("invalid websocket connection")
  572. }
  573. // Check compression extension...
  574. if err == nil && compress {
  575. // Check that not only permessage-deflate extension is present, but that
  576. // we also have server and client no context take over.
  577. srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header)
  578. // If server does not support compression, then simply disable it in our side.
  579. if !srvCompress {
  580. compress = false
  581. } else if !noCtxTakeover {
  582. err = fmt.Errorf("compression negotiation error")
  583. }
  584. }
  585. if resp != nil {
  586. resp.Body.Close()
  587. }
  588. nc.conn.SetReadDeadline(time.Time{})
  589. if err != nil {
  590. return err
  591. }
  592. wsr := wsNewReader(nc.br.r)
  593. wsr.nc = nc
  594. // We have to slurp whatever is in the bufio reader and copy to br.r
  595. if n := br.Buffered(); n != 0 {
  596. wsr.ib, _ = br.Peek(n)
  597. }
  598. nc.br.r = wsr
  599. nc.bw.w = &websocketWriter{w: nc.bw.w, compress: compress}
  600. nc.ws = true
  601. return nil
  602. }
  603. func (nc *Conn) wsClose() {
  604. nc.mu.Lock()
  605. defer nc.mu.Unlock()
  606. if !nc.ws {
  607. return
  608. }
  609. nc.wsEnqueueCloseMsgLocked(wsCloseStatusNormalClosure, _EMPTY_)
  610. }
  611. func (nc *Conn) wsEnqueueCloseMsg(needsLock bool, status int, payload string) {
  612. // In some low-level unit tests it will happen...
  613. if nc == nil {
  614. return
  615. }
  616. if needsLock {
  617. nc.mu.Lock()
  618. defer nc.mu.Unlock()
  619. }
  620. nc.wsEnqueueCloseMsgLocked(status, payload)
  621. }
  622. func (nc *Conn) wsEnqueueCloseMsgLocked(status int, payload string) {
  623. wr, ok := nc.bw.w.(*websocketWriter)
  624. if !ok || wr.cmDone {
  625. return
  626. }
  627. statusAndPayloadLen := 2 + len(payload)
  628. frame := make([]byte, 2+4+statusAndPayloadLen)
  629. n, key := wsFillFrameHeader(frame, false, wsCloseMessage, statusAndPayloadLen)
  630. // Set the status
  631. binary.BigEndian.PutUint16(frame[n:], uint16(status))
  632. // If there is a payload, copy
  633. if len(payload) > 0 {
  634. copy(frame[n+2:], payload)
  635. }
  636. // Mask status + payload
  637. wsMaskBuf(key, frame[n:n+statusAndPayloadLen])
  638. wr.cm = frame
  639. wr.cmDone = true
  640. nc.bw.flush()
  641. if c := wr.compressor; c != nil {
  642. c.Close()
  643. }
  644. }
  645. func (nc *Conn) wsEnqueueControlMsg(needsLock bool, frameType wsOpCode, payload []byte) {
  646. // In some low-level unit tests it will happen...
  647. if nc == nil {
  648. return
  649. }
  650. if needsLock {
  651. nc.mu.Lock()
  652. defer nc.mu.Unlock()
  653. }
  654. wr, ok := nc.bw.w.(*websocketWriter)
  655. if !ok {
  656. return
  657. }
  658. fh, key := wsCreateFrameHeader(false, frameType, len(payload))
  659. wr.ctrlFrames = append(wr.ctrlFrames, fh)
  660. if len(payload) > 0 {
  661. wsMaskBuf(key, payload)
  662. wr.ctrlFrames = append(wr.ctrlFrames, payload)
  663. }
  664. nc.bw.flush()
  665. }
  666. func wsPMCExtensionSupport(header http.Header) (bool, bool) {
  667. for _, extensionList := range header["Sec-Websocket-Extensions"] {
  668. extensions := strings.Split(extensionList, ",")
  669. for _, extension := range extensions {
  670. extension = strings.Trim(extension, " \t")
  671. params := strings.Split(extension, ";")
  672. for i, p := range params {
  673. p = strings.Trim(p, " \t")
  674. if strings.EqualFold(p, wsPMCExtension) {
  675. var snc bool
  676. var cnc bool
  677. for j := i + 1; j < len(params); j++ {
  678. p = params[j]
  679. p = strings.Trim(p, " \t")
  680. if strings.EqualFold(p, wsPMCSrvNoCtx) {
  681. snc = true
  682. } else if strings.EqualFold(p, wsPMCCliNoCtx) {
  683. cnc = true
  684. }
  685. if snc && cnc {
  686. return true, true
  687. }
  688. }
  689. return true, false
  690. }
  691. }
  692. }
  693. }
  694. return false, false
  695. }
  696. func wsMakeChallengeKey() (string, error) {
  697. p := make([]byte, 16)
  698. if _, err := io.ReadFull(rand.Reader, p); err != nil {
  699. return "", err
  700. }
  701. return base64.StdEncoding.EncodeToString(p), nil
  702. }
  703. func wsAcceptKey(key string) string {
  704. h := sha1.New()
  705. h.Write([]byte(key))
  706. h.Write(wsGUID)
  707. return base64.StdEncoding.EncodeToString(h.Sum(nil))
  708. }
  709. // Returns true if the op code corresponds to a control frame.
  710. func wsIsControlFrame(frameType wsOpCode) bool {
  711. return frameType >= wsCloseMessage
  712. }
  713. func isWebsocketScheme(u *url.URL) bool {
  714. return u.Scheme == wsScheme || u.Scheme == wsSchemeTLS
  715. }