options.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. /*
  2. * Copyright (c) 2021 IBM Corp and others.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Seth Hoenig
  15. * Allan Stockdill-Mander
  16. * Mike Robertson
  17. * Måns Ansgariusson
  18. */
  19. // Portions copyright © 2018 TIBCO Software Inc.
  20. package mqtt
  21. import (
  22. "crypto/tls"
  23. "net"
  24. "net/http"
  25. "net/url"
  26. "strings"
  27. "time"
  28. )
  29. // CredentialsProvider allows the username and password to be updated
  30. // before reconnecting. It should return the current username and password.
  31. type CredentialsProvider func() (username string, password string)
  32. // MessageHandler is a callback type which can be set to be
  33. // executed upon the arrival of messages published to topics
  34. // to which the client is subscribed.
  35. type MessageHandler func(Client, Message)
  36. // ConnectionLostHandler is a callback type which can be set to be
  37. // executed upon an unintended disconnection from the MQTT broker.
  38. // Disconnects caused by calling Disconnect or ForceDisconnect will
  39. // not cause an OnConnectionLost callback to execute.
  40. type ConnectionLostHandler func(Client, error)
  41. // OnConnectHandler is a callback that is called when the client
  42. // state changes from unconnected/disconnected to connected. Both
  43. // at initial connection and on reconnection
  44. type OnConnectHandler func(Client)
  45. // ReconnectHandler is invoked prior to reconnecting after
  46. // the initial connection is lost
  47. type ReconnectHandler func(Client, *ClientOptions)
  48. // ConnectionAttemptHandler is invoked prior to making the initial connection.
  49. type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Config
  50. // OpenConnectionFunc is invoked to establish the underlying network connection
  51. // Its purpose if for custom network transports.
  52. // Does not carry out any MQTT specific handshakes.
  53. type OpenConnectionFunc func(uri *url.URL, options ClientOptions) (net.Conn, error)
  54. // ClientOptions contains configurable options for an Client. Note that these should be set using the
  55. // relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage.
  56. // WARNING: Create the below using NewClientOptions unless you have a compelling reason not to. It is easy
  57. // to create a configuration with difficult to trace issues (e.g. Mosquitto 2.0.12+ will reject connections
  58. // with KeepAlive=0 by default).
  59. type ClientOptions struct {
  60. Servers []*url.URL
  61. ClientID string
  62. Username string
  63. Password string
  64. CredentialsProvider CredentialsProvider
  65. CleanSession bool
  66. Order bool
  67. WillEnabled bool
  68. WillTopic string
  69. WillPayload []byte
  70. WillQos byte
  71. WillRetained bool
  72. ProtocolVersion uint
  73. protocolVersionExplicit bool
  74. TLSConfig *tls.Config
  75. KeepAlive int64 // Warning: Some brokers may reject connections with Keepalive = 0.
  76. PingTimeout time.Duration
  77. ConnectTimeout time.Duration
  78. MaxReconnectInterval time.Duration
  79. AutoReconnect bool
  80. ConnectRetryInterval time.Duration
  81. ConnectRetry bool
  82. Store Store
  83. DefaultPublishHandler MessageHandler
  84. OnConnect OnConnectHandler
  85. OnConnectionLost ConnectionLostHandler
  86. OnReconnecting ReconnectHandler
  87. OnConnectAttempt ConnectionAttemptHandler
  88. WriteTimeout time.Duration
  89. MessageChannelDepth uint
  90. ResumeSubs bool
  91. HTTPHeaders http.Header
  92. WebsocketOptions *WebsocketOptions
  93. MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming
  94. Dialer *net.Dialer
  95. CustomOpenConnectionFn OpenConnectionFunc
  96. AutoAckDisabled bool
  97. }
  98. // NewClientOptions will create a new ClientClientOptions type with some
  99. // default values.
  100. // Port: 1883
  101. // CleanSession: True
  102. // Order: True (note: it is recommended that this be set to FALSE unless order is important)
  103. // KeepAlive: 30 (seconds)
  104. // ConnectTimeout: 30 (seconds)
  105. // MaxReconnectInterval 10 (minutes)
  106. // AutoReconnect: True
  107. func NewClientOptions() *ClientOptions {
  108. o := &ClientOptions{
  109. Servers: nil,
  110. ClientID: "",
  111. Username: "",
  112. Password: "",
  113. CleanSession: true,
  114. Order: true,
  115. WillEnabled: false,
  116. WillTopic: "",
  117. WillPayload: nil,
  118. WillQos: 0,
  119. WillRetained: false,
  120. ProtocolVersion: 0,
  121. protocolVersionExplicit: false,
  122. KeepAlive: 30,
  123. PingTimeout: 10 * time.Second,
  124. ConnectTimeout: 30 * time.Second,
  125. MaxReconnectInterval: 10 * time.Minute,
  126. AutoReconnect: true,
  127. ConnectRetryInterval: 30 * time.Second,
  128. ConnectRetry: false,
  129. Store: nil,
  130. OnConnect: nil,
  131. OnConnectionLost: DefaultConnectionLostHandler,
  132. OnConnectAttempt: nil,
  133. WriteTimeout: 0, // 0 represents timeout disabled
  134. ResumeSubs: false,
  135. HTTPHeaders: make(map[string][]string),
  136. WebsocketOptions: &WebsocketOptions{},
  137. Dialer: &net.Dialer{Timeout: 30 * time.Second},
  138. CustomOpenConnectionFn: nil,
  139. AutoAckDisabled: false,
  140. }
  141. return o
  142. }
  143. // AddBroker adds a broker URI to the list of brokers to be used. The format should be
  144. // scheme://host:port
  145. // Where "scheme" is one of "tcp", "ssl", or "ws", "host" is the ip-address (or hostname)
  146. // and "port" is the port on which the broker is accepting connections.
  147. //
  148. // Default values for hostname is "127.0.0.1", for schema is "tcp://".
  149. //
  150. // An example broker URI would look like: tcp://foobar.com:1883
  151. func (o *ClientOptions) AddBroker(server string) *ClientOptions {
  152. if len(server) > 0 && server[0] == ':' {
  153. server = "127.0.0.1" + server
  154. }
  155. if !strings.Contains(server, "://") {
  156. server = "tcp://" + server
  157. }
  158. brokerURI, err := url.Parse(server)
  159. if err != nil {
  160. ERROR.Println(CLI, "Failed to parse %q broker address: %s", server, err)
  161. return o
  162. }
  163. o.Servers = append(o.Servers, brokerURI)
  164. return o
  165. }
  166. // SetResumeSubs will enable resuming of stored (un)subscribe messages when connecting
  167. // but not reconnecting if CleanSession is false. Otherwise these messages are discarded.
  168. func (o *ClientOptions) SetResumeSubs(resume bool) *ClientOptions {
  169. o.ResumeSubs = resume
  170. return o
  171. }
  172. // SetClientID will set the client id to be used by this client when
  173. // connecting to the MQTT broker. According to the MQTT v3.1 specification,
  174. // a client id must be no longer than 23 characters.
  175. func (o *ClientOptions) SetClientID(id string) *ClientOptions {
  176. o.ClientID = id
  177. return o
  178. }
  179. // SetUsername will set the username to be used by this client when connecting
  180. // to the MQTT broker. Note: without the use of SSL/TLS, this information will
  181. // be sent in plaintext across the wire.
  182. func (o *ClientOptions) SetUsername(u string) *ClientOptions {
  183. o.Username = u
  184. return o
  185. }
  186. // SetPassword will set the password to be used by this client when connecting
  187. // to the MQTT broker. Note: without the use of SSL/TLS, this information will
  188. // be sent in plaintext across the wire.
  189. func (o *ClientOptions) SetPassword(p string) *ClientOptions {
  190. o.Password = p
  191. return o
  192. }
  193. // SetCredentialsProvider will set a method to be called by this client when
  194. // connecting to the MQTT broker that provide the current username and password.
  195. // Note: without the use of SSL/TLS, this information will be sent
  196. // in plaintext across the wire.
  197. func (o *ClientOptions) SetCredentialsProvider(p CredentialsProvider) *ClientOptions {
  198. o.CredentialsProvider = p
  199. return o
  200. }
  201. // SetCleanSession will set the "clean session" flag in the connect message
  202. // when this client connects to an MQTT broker. By setting this flag, you are
  203. // indicating that no messages saved by the broker for this client should be
  204. // delivered. Any messages that were going to be sent by this client before
  205. // disconnecting previously but didn't will not be sent upon connecting to the
  206. // broker.
  207. func (o *ClientOptions) SetCleanSession(clean bool) *ClientOptions {
  208. o.CleanSession = clean
  209. return o
  210. }
  211. // SetOrderMatters will set the message routing to guarantee order within
  212. // each QoS level. By default, this value is true. If set to false (recommended),
  213. // this flag indicates that messages can be delivered asynchronously
  214. // from the client to the application and possibly arrive out of order.
  215. // Specifically, the message handler is called in its own go routine.
  216. // Note that setting this to true does not guarantee in-order delivery
  217. // (this is subject to broker settings like "max_inflight_messages=1" in mosquitto)
  218. // and if true then handlers must not block.
  219. func (o *ClientOptions) SetOrderMatters(order bool) *ClientOptions {
  220. o.Order = order
  221. return o
  222. }
  223. // SetTLSConfig will set an SSL/TLS configuration to be used when connecting
  224. // to an MQTT broker. Please read the official Go documentation for more
  225. // information.
  226. func (o *ClientOptions) SetTLSConfig(t *tls.Config) *ClientOptions {
  227. o.TLSConfig = t
  228. return o
  229. }
  230. // SetStore will set the implementation of the Store interface
  231. // used to provide message persistence in cases where QoS levels
  232. // QoS_ONE or QoS_TWO are used. If no store is provided, then the
  233. // client will use MemoryStore by default.
  234. func (o *ClientOptions) SetStore(s Store) *ClientOptions {
  235. o.Store = s
  236. return o
  237. }
  238. // SetKeepAlive will set the amount of time (in seconds) that the client
  239. // should wait before sending a PING request to the broker. This will
  240. // allow the client to know that a connection has not been lost with the
  241. // server.
  242. func (o *ClientOptions) SetKeepAlive(k time.Duration) *ClientOptions {
  243. o.KeepAlive = int64(k / time.Second)
  244. return o
  245. }
  246. // SetPingTimeout will set the amount of time (in seconds) that the client
  247. // will wait after sending a PING request to the broker, before deciding
  248. // that the connection has been lost. Default is 10 seconds.
  249. func (o *ClientOptions) SetPingTimeout(k time.Duration) *ClientOptions {
  250. o.PingTimeout = k
  251. return o
  252. }
  253. // SetProtocolVersion sets the MQTT version to be used to connect to the
  254. // broker. Legitimate values are currently 3 - MQTT 3.1 or 4 - MQTT 3.1.1
  255. func (o *ClientOptions) SetProtocolVersion(pv uint) *ClientOptions {
  256. if (pv >= 3 && pv <= 4) || (pv > 0x80) {
  257. o.ProtocolVersion = pv
  258. o.protocolVersionExplicit = true
  259. }
  260. return o
  261. }
  262. // UnsetWill will cause any set will message to be disregarded.
  263. func (o *ClientOptions) UnsetWill() *ClientOptions {
  264. o.WillEnabled = false
  265. return o
  266. }
  267. // SetWill accepts a string will message to be set. When the client connects,
  268. // it will give this will message to the broker, which will then publish the
  269. // provided payload (the will) to any clients that are subscribed to the provided
  270. // topic.
  271. func (o *ClientOptions) SetWill(topic string, payload string, qos byte, retained bool) *ClientOptions {
  272. o.SetBinaryWill(topic, []byte(payload), qos, retained)
  273. return o
  274. }
  275. // SetBinaryWill accepts a []byte will message to be set. When the client connects,
  276. // it will give this will message to the broker, which will then publish the
  277. // provided payload (the will) to any clients that are subscribed to the provided
  278. // topic.
  279. func (o *ClientOptions) SetBinaryWill(topic string, payload []byte, qos byte, retained bool) *ClientOptions {
  280. o.WillEnabled = true
  281. o.WillTopic = topic
  282. o.WillPayload = payload
  283. o.WillQos = qos
  284. o.WillRetained = retained
  285. return o
  286. }
  287. // SetDefaultPublishHandler sets the MessageHandler that will be called when a message
  288. // is received that does not match any known subscriptions.
  289. //
  290. // If OrderMatters is true (the defaultHandler) then callback must not block or
  291. // call functions within this package that may block (e.g. Publish) other than in
  292. // a new go routine.
  293. // defaultHandler must be safe for concurrent use by multiple goroutines.
  294. func (o *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions {
  295. o.DefaultPublishHandler = defaultHandler
  296. return o
  297. }
  298. // SetOnConnectHandler sets the function to be called when the client is connected. Both
  299. // at initial connection time and upon automatic reconnect.
  300. func (o *ClientOptions) SetOnConnectHandler(onConn OnConnectHandler) *ClientOptions {
  301. o.OnConnect = onConn
  302. return o
  303. }
  304. // SetConnectionLostHandler will set the OnConnectionLost callback to be executed
  305. // in the case where the client unexpectedly loses connection with the MQTT broker.
  306. func (o *ClientOptions) SetConnectionLostHandler(onLost ConnectionLostHandler) *ClientOptions {
  307. o.OnConnectionLost = onLost
  308. return o
  309. }
  310. // SetReconnectingHandler sets the OnReconnecting callback to be executed prior
  311. // to the client attempting a reconnect to the MQTT broker.
  312. func (o *ClientOptions) SetReconnectingHandler(cb ReconnectHandler) *ClientOptions {
  313. o.OnReconnecting = cb
  314. return o
  315. }
  316. // SetConnectionAttemptHandler sets the ConnectionAttemptHandler callback to be executed prior
  317. // to each attempt to connect to an MQTT broker. Returns the *tls.Config that will be used when establishing
  318. // the connection (a copy of the tls.Config from ClientOptions will be passed in along with the broker URL).
  319. // This allows connection specific changes to be made to the *tls.Config.
  320. func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionAttemptHandler) *ClientOptions {
  321. o.OnConnectAttempt = onConnectAttempt
  322. return o
  323. }
  324. // SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a
  325. // timeout error. A duration of 0 never times out. Default never times out
  326. func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions {
  327. o.WriteTimeout = t
  328. return o
  329. }
  330. // SetConnectTimeout limits how long the client will wait when trying to open a connection
  331. // to an MQTT server before timing out. A duration of 0 never times out.
  332. // Default 30 seconds. Currently only operational on TCP/TLS connections.
  333. func (o *ClientOptions) SetConnectTimeout(t time.Duration) *ClientOptions {
  334. o.ConnectTimeout = t
  335. o.Dialer.Timeout = t
  336. return o
  337. }
  338. // SetMaxReconnectInterval sets the maximum time that will be waited between reconnection attempts
  339. // when connection is lost
  340. func (o *ClientOptions) SetMaxReconnectInterval(t time.Duration) *ClientOptions {
  341. o.MaxReconnectInterval = t
  342. return o
  343. }
  344. // SetAutoReconnect sets whether the automatic reconnection logic should be used
  345. // when the connection is lost, even if disabled the ConnectionLostHandler is still
  346. // called
  347. func (o *ClientOptions) SetAutoReconnect(a bool) *ClientOptions {
  348. o.AutoReconnect = a
  349. return o
  350. }
  351. // SetConnectRetryInterval sets the time that will be waited between connection attempts
  352. // when initially connecting if ConnectRetry is TRUE
  353. func (o *ClientOptions) SetConnectRetryInterval(t time.Duration) *ClientOptions {
  354. o.ConnectRetryInterval = t
  355. return o
  356. }
  357. // SetConnectRetry sets whether the connect function will automatically retry the connection
  358. // in the event of a failure (when true the token returned by the Connect function will
  359. // not complete until the connection is up or it is cancelled)
  360. // If ConnectRetry is true then subscriptions should be requested in OnConnect handler
  361. // Setting this to TRUE permits messages to be published before the connection is established
  362. func (o *ClientOptions) SetConnectRetry(a bool) *ClientOptions {
  363. o.ConnectRetry = a
  364. return o
  365. }
  366. // SetMessageChannelDepth DEPRECATED The value set here no longer has any effect, this function
  367. // remains so the API is not altered.
  368. func (o *ClientOptions) SetMessageChannelDepth(s uint) *ClientOptions {
  369. o.MessageChannelDepth = s
  370. return o
  371. }
  372. // SetHTTPHeaders sets the additional HTTP headers that will be sent in the WebSocket
  373. // opening handshake.
  374. func (o *ClientOptions) SetHTTPHeaders(h http.Header) *ClientOptions {
  375. o.HTTPHeaders = h
  376. return o
  377. }
  378. // SetWebsocketOptions sets the additional websocket options used in a WebSocket connection
  379. func (o *ClientOptions) SetWebsocketOptions(w *WebsocketOptions) *ClientOptions {
  380. o.WebsocketOptions = w
  381. return o
  382. }
  383. // SetMaxResumePubInFlight sets the maximum simultaneous publish messages that will be sent while resuming. Note that
  384. // this only applies to messages coming from the store (so additional sends may push us over the limit)
  385. // Note that the connect token will not be flagged as complete until all messages have been sent from the
  386. // store. If broker does not respond to messages then resume may not complete.
  387. // This option was put in place because resuming after downtime can saturate low capacity links.
  388. func (o *ClientOptions) SetMaxResumePubInFlight(MaxResumePubInFlight int) *ClientOptions {
  389. o.MaxResumePubInFlight = MaxResumePubInFlight
  390. return o
  391. }
  392. // SetDialer sets the tcp dialer options used in a tcp connection
  393. func (o *ClientOptions) SetDialer(dialer *net.Dialer) *ClientOptions {
  394. o.Dialer = dialer
  395. return o
  396. }
  397. // SetCustomOpenConnectionFn replaces the inbuilt function that establishes a network connection with a custom function.
  398. // The passed in function should return an open `net.Conn` or an error (see the existing openConnection function for an example)
  399. // It enables custom networking types in addition to the defaults (tcp, tls, websockets...)
  400. func (o *ClientOptions) SetCustomOpenConnectionFn(customOpenConnectionFn OpenConnectionFunc) *ClientOptions {
  401. if customOpenConnectionFn != nil {
  402. o.CustomOpenConnectionFn = customOpenConnectionFn
  403. }
  404. return o
  405. }
  406. // SetAutoAckDisabled enables or disables the Automated Acking of Messages received by the handler.
  407. // By default it is set to false. Setting it to true will disable the auto-ack globally.
  408. func (o *ClientOptions) SetAutoAckDisabled(autoAckDisabled bool) *ClientOptions {
  409. o.AutoAckDisabled = autoAckDisabled
  410. return o
  411. }