socket.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package gobwas
  2. import (
  3. "io"
  4. "io/ioutil"
  5. "net"
  6. "net/http"
  7. "sync"
  8. "time"
  9. "github.com/kataras/neffos"
  10. gobwas "github.com/gobwas/ws"
  11. "github.com/gobwas/ws/wsutil"
  12. )
  13. // Socket completes the `neffos.Socket` interface,
  14. // it describes the underline websocket connection.
  15. type Socket struct {
  16. UnderlyingConn net.Conn
  17. request *http.Request
  18. reader *wsutil.Reader
  19. controlHandler wsutil.FrameHandlerFunc
  20. state gobwas.State
  21. mu sync.Mutex
  22. }
  23. func newSocket(underline net.Conn, request *http.Request, client bool) *Socket {
  24. state := gobwas.StateServerSide
  25. if client {
  26. state = gobwas.StateClientSide
  27. }
  28. controlHandler := wsutil.ControlFrameHandler(underline, state)
  29. reader := &wsutil.Reader{
  30. Source: underline,
  31. State: state,
  32. CheckUTF8: true,
  33. SkipHeaderCheck: false,
  34. // "intermediate" frames, that possibly could
  35. // be received between text/binary continuation frames.
  36. // Read `gobwas/wsutil/reader#NextReader`.
  37. //
  38. OnIntermediate: controlHandler,
  39. }
  40. return &Socket{
  41. UnderlyingConn: underline,
  42. request: request,
  43. state: state,
  44. reader: reader,
  45. controlHandler: controlHandler,
  46. }
  47. }
  48. // NetConn returns the underline net connection.
  49. func (s *Socket) NetConn() net.Conn {
  50. return s.UnderlyingConn
  51. }
  52. // Request returns the http request value.
  53. func (s *Socket) Request() *http.Request {
  54. return s.request
  55. }
  56. // ReadData reads binary or text messages from the remote connection.
  57. func (s *Socket) ReadData(timeout time.Duration) ([]byte, neffos.MessageType, error) {
  58. for {
  59. if timeout > 0 {
  60. s.UnderlyingConn.SetReadDeadline(time.Now().Add(timeout))
  61. }
  62. hdr, err := s.reader.NextFrame()
  63. if err != nil {
  64. if err == io.EOF {
  65. return nil, 0, io.ErrUnexpectedEOF // for io.ReadAll to return an error if connection remotely closed.
  66. }
  67. return nil, 0, err
  68. }
  69. if hdr.OpCode == gobwas.OpClose {
  70. return nil, 0, io.ErrUnexpectedEOF // for io.ReadAll to return an error if connection remotely closed.
  71. }
  72. if hdr.OpCode.IsControl() {
  73. err = s.controlHandler(hdr, s.reader)
  74. if err != nil {
  75. return nil, 0, err
  76. }
  77. continue
  78. }
  79. if hdr.OpCode&gobwas.OpBinary == 0 && hdr.OpCode&gobwas.OpText == 0 {
  80. err = s.reader.Discard()
  81. if err != nil {
  82. return nil, 0, err
  83. }
  84. continue
  85. }
  86. b, err := ioutil.ReadAll(s.reader)
  87. if err != nil {
  88. return nil, 0, err
  89. }
  90. return b, neffos.MessageType(hdr.OpCode), nil
  91. }
  92. // for {
  93. // if timeout > 0 {
  94. // s.UnderlyingConn.SetReadDeadline(time.Now().Add(timeout))
  95. // }
  96. // b, code, err := wsutil.ReadData(s.UnderlyingConn, s.state)
  97. // if err != nil {
  98. // return nil, err
  99. // }
  100. // if code != defaultOp {
  101. // continue
  102. // }
  103. // return b, nil
  104. // }
  105. }
  106. // WriteBinary sends a binary message to the remote connection.
  107. func (s *Socket) WriteBinary(body []byte, timeout time.Duration) error {
  108. return s.write(body, gobwas.OpBinary, timeout)
  109. }
  110. // WriteText sends a text message to the remote connection.
  111. func (s *Socket) WriteText(body []byte, timeout time.Duration) error {
  112. return s.write(body, gobwas.OpText, timeout)
  113. }
  114. func (s *Socket) write(body []byte, op gobwas.OpCode, timeout time.Duration) error {
  115. s.mu.Lock()
  116. if timeout > 0 {
  117. s.UnderlyingConn.SetWriteDeadline(time.Now().Add(timeout))
  118. }
  119. // println("write: " + string(body))
  120. err := wsutil.WriteMessage(s.UnderlyingConn, s.state, op, body)
  121. s.mu.Unlock()
  122. return err
  123. }