websocket.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. /*
  2. * This program and the accompanying materials
  3. * are made available under the terms of the Eclipse Public License v2.0
  4. * and Eclipse Distribution License v1.0 which accompany this distribution.
  5. *
  6. * The Eclipse Public License is available at
  7. * https://www.eclipse.org/legal/epl-2.0/
  8. * and the Eclipse Distribution License is available at
  9. * http://www.eclipse.org/org/documents/edl-v10.php.
  10. *
  11. * Contributors:
  12. */
  13. package mqtt
  14. import (
  15. "crypto/tls"
  16. "fmt"
  17. "io"
  18. "net"
  19. "net/http"
  20. "net/url"
  21. "sync"
  22. "time"
  23. "github.com/gorilla/websocket"
  24. )
  25. // WebsocketOptions are config options for a websocket dialer
  26. type WebsocketOptions struct {
  27. ReadBufferSize int
  28. WriteBufferSize int
  29. Proxy ProxyFunction
  30. }
  31. type ProxyFunction func(req *http.Request) (*url.URL, error)
  32. // NewWebsocket returns a new websocket and returns a net.Conn compatible interface using the gorilla/websocket package
  33. func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, requestHeader http.Header, options *WebsocketOptions) (net.Conn, error) {
  34. if timeout == 0 {
  35. timeout = 10 * time.Second
  36. }
  37. if options == nil {
  38. // Apply default options
  39. options = &WebsocketOptions{}
  40. }
  41. if options.Proxy == nil {
  42. options.Proxy = http.ProxyFromEnvironment
  43. }
  44. dialer := &websocket.Dialer{
  45. Proxy: options.Proxy,
  46. HandshakeTimeout: timeout,
  47. EnableCompression: false,
  48. TLSClientConfig: tlsc,
  49. Subprotocols: []string{"mqtt"},
  50. ReadBufferSize: options.ReadBufferSize,
  51. WriteBufferSize: options.WriteBufferSize,
  52. }
  53. ws, resp, err := dialer.Dial(host, requestHeader)
  54. if err != nil {
  55. if resp != nil {
  56. WARN.Println(CLI, fmt.Sprintf("Websocket handshake failure. StatusCode: %d. Body: %s", resp.StatusCode, resp.Body))
  57. }
  58. return nil, err
  59. }
  60. wrapper := &websocketConnector{
  61. Conn: ws,
  62. }
  63. return wrapper, err
  64. }
  65. // websocketConnector is a websocket wrapper so it satisfies the net.Conn interface so it is a
  66. // drop in replacement of the golang.org/x/net/websocket package.
  67. // Implementation guide taken from https://github.com/gorilla/websocket/issues/282
  68. type websocketConnector struct {
  69. *websocket.Conn
  70. r io.Reader
  71. rio sync.Mutex
  72. wio sync.Mutex
  73. }
  74. // SetDeadline sets both the read and write deadlines
  75. func (c *websocketConnector) SetDeadline(t time.Time) error {
  76. if err := c.SetReadDeadline(t); err != nil {
  77. return err
  78. }
  79. err := c.SetWriteDeadline(t)
  80. return err
  81. }
  82. // Write writes data to the websocket
  83. func (c *websocketConnector) Write(p []byte) (int, error) {
  84. c.wio.Lock()
  85. defer c.wio.Unlock()
  86. err := c.WriteMessage(websocket.BinaryMessage, p)
  87. if err != nil {
  88. return 0, err
  89. }
  90. return len(p), nil
  91. }
  92. // Read reads the current websocket frame
  93. func (c *websocketConnector) Read(p []byte) (int, error) {
  94. c.rio.Lock()
  95. defer c.rio.Unlock()
  96. for {
  97. if c.r == nil {
  98. // Advance to next message.
  99. var err error
  100. _, c.r, err = c.NextReader()
  101. if err != nil {
  102. return 0, err
  103. }
  104. }
  105. n, err := c.r.Read(p)
  106. if err == io.EOF {
  107. // At end of message.
  108. c.r = nil
  109. if n > 0 {
  110. return n, nil
  111. }
  112. // No data read, continue to next message.
  113. continue
  114. }
  115. return n, err
  116. }
  117. }