netconn.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. /*
  2. * Copyright (c) 2021 IBM Corp and others.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Seth Hoenig
  15. * Allan Stockdill-Mander
  16. * Mike Robertson
  17. * MAtt Brittan
  18. */
  19. package mqtt
  20. import (
  21. "crypto/tls"
  22. "errors"
  23. "net"
  24. "net/http"
  25. "net/url"
  26. "os"
  27. "time"
  28. "golang.org/x/net/proxy"
  29. )
  30. //
  31. // This just establishes the network connection; once established the type of connection should be irrelevant
  32. //
  33. // openConnection opens a network connection using the protocol indicated in the URL.
  34. // Does not carry out any MQTT specific handshakes.
  35. func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions, dialer *net.Dialer) (net.Conn, error) {
  36. switch uri.Scheme {
  37. case "ws":
  38. dialURI := *uri // #623 - Gorilla Websockets does not accept URL's where uri.User != nil
  39. dialURI.User = nil
  40. conn, err := NewWebsocket(dialURI.String(), nil, timeout, headers, websocketOptions)
  41. return conn, err
  42. case "wss":
  43. dialURI := *uri // #623 - Gorilla Websockets does not accept URL's where uri.User != nil
  44. dialURI.User = nil
  45. conn, err := NewWebsocket(dialURI.String(), tlsc, timeout, headers, websocketOptions)
  46. return conn, err
  47. case "mqtt", "tcp":
  48. allProxy := os.Getenv("all_proxy")
  49. if len(allProxy) == 0 {
  50. conn, err := dialer.Dial("tcp", uri.Host)
  51. if err != nil {
  52. return nil, err
  53. }
  54. return conn, nil
  55. }
  56. proxyDialer := proxy.FromEnvironment()
  57. conn, err := proxyDialer.Dial("tcp", uri.Host)
  58. if err != nil {
  59. return nil, err
  60. }
  61. return conn, nil
  62. case "unix":
  63. var conn net.Conn
  64. var err error
  65. // this check is preserved for compatibility with older versions
  66. // which used uri.Host only (it works for local paths, e.g. unix://socket.sock in current dir)
  67. if len(uri.Host) > 0 {
  68. conn, err = dialer.Dial("unix", uri.Host)
  69. } else {
  70. conn, err = dialer.Dial("unix", uri.Path)
  71. }
  72. if err != nil {
  73. return nil, err
  74. }
  75. return conn, nil
  76. case "ssl", "tls", "mqtts", "mqtt+ssl", "tcps":
  77. allProxy := os.Getenv("all_proxy")
  78. if len(allProxy) == 0 {
  79. conn, err := tls.DialWithDialer(dialer, "tcp", uri.Host, tlsc)
  80. if err != nil {
  81. return nil, err
  82. }
  83. return conn, nil
  84. }
  85. proxyDialer := proxy.FromEnvironment()
  86. conn, err := proxyDialer.Dial("tcp", uri.Host)
  87. if err != nil {
  88. return nil, err
  89. }
  90. tlsConn := tls.Client(conn, tlsc)
  91. err = tlsConn.Handshake()
  92. if err != nil {
  93. _ = conn.Close()
  94. return nil, err
  95. }
  96. return tlsConn, nil
  97. }
  98. return nil, errors.New("unknown protocol")
  99. }