123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- package gobwas
- import (
- "io"
- "io/ioutil"
- "net"
- "net/http"
- "sync"
- "time"
- "github.com/kataras/neffos"
- gobwas "github.com/gobwas/ws"
- "github.com/gobwas/ws/wsutil"
- )
- // Socket completes the `neffos.Socket` interface,
- // it describes the underline websocket connection.
- type Socket struct {
- UnderlyingConn net.Conn
- request *http.Request
- reader *wsutil.Reader
- controlHandler wsutil.FrameHandlerFunc
- state gobwas.State
- mu sync.Mutex
- }
- func newSocket(underline net.Conn, request *http.Request, client bool) *Socket {
- state := gobwas.StateServerSide
- if client {
- state = gobwas.StateClientSide
- }
- controlHandler := wsutil.ControlFrameHandler(underline, state)
- reader := &wsutil.Reader{
- Source: underline,
- State: state,
- CheckUTF8: true,
- SkipHeaderCheck: false,
- // "intermediate" frames, that possibly could
- // be received between text/binary continuation frames.
- // Read `gobwas/wsutil/reader#NextReader`.
- //
- OnIntermediate: controlHandler,
- }
- return &Socket{
- UnderlyingConn: underline,
- request: request,
- state: state,
- reader: reader,
- controlHandler: controlHandler,
- }
- }
- // NetConn returns the underline net connection.
- func (s *Socket) NetConn() net.Conn {
- return s.UnderlyingConn
- }
- // Request returns the http request value.
- func (s *Socket) Request() *http.Request {
- return s.request
- }
- // ReadData reads binary or text messages from the remote connection.
- func (s *Socket) ReadData(timeout time.Duration) ([]byte, neffos.MessageType, error) {
- for {
- if timeout > 0 {
- s.UnderlyingConn.SetReadDeadline(time.Now().Add(timeout))
- }
- hdr, err := s.reader.NextFrame()
- if err != nil {
- if err == io.EOF {
- return nil, 0, io.ErrUnexpectedEOF // for io.ReadAll to return an error if connection remotely closed.
- }
- return nil, 0, err
- }
- if hdr.OpCode == gobwas.OpClose {
- return nil, 0, io.ErrUnexpectedEOF // for io.ReadAll to return an error if connection remotely closed.
- }
- if hdr.OpCode.IsControl() {
- err = s.controlHandler(hdr, s.reader)
- if err != nil {
- return nil, 0, err
- }
- continue
- }
- if hdr.OpCode&gobwas.OpBinary == 0 && hdr.OpCode&gobwas.OpText == 0 {
- err = s.reader.Discard()
- if err != nil {
- return nil, 0, err
- }
- continue
- }
- b, err := ioutil.ReadAll(s.reader)
- if err != nil {
- return nil, 0, err
- }
- return b, neffos.MessageType(hdr.OpCode), nil
- }
- // for {
- // if timeout > 0 {
- // s.UnderlyingConn.SetReadDeadline(time.Now().Add(timeout))
- // }
- // b, code, err := wsutil.ReadData(s.UnderlyingConn, s.state)
- // if err != nil {
- // return nil, err
- // }
- // if code != defaultOp {
- // continue
- // }
- // return b, nil
- // }
- }
- // WriteBinary sends a binary message to the remote connection.
- func (s *Socket) WriteBinary(body []byte, timeout time.Duration) error {
- return s.write(body, gobwas.OpBinary, timeout)
- }
- // WriteText sends a text message to the remote connection.
- func (s *Socket) WriteText(body []byte, timeout time.Duration) error {
- return s.write(body, gobwas.OpText, timeout)
- }
- func (s *Socket) write(body []byte, op gobwas.OpCode, timeout time.Duration) error {
- s.mu.Lock()
- if timeout > 0 {
- s.UnderlyingConn.SetWriteDeadline(time.Now().Add(timeout))
- }
- // println("write: " + string(body))
- err := wsutil.WriteMessage(s.UnderlyingConn, s.state, op, body)
- s.mu.Unlock()
- return err
- }
|