123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- /*
- * Copyright (c) 2013 IBM Corp.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Seth Hoenig
- * Allan Stockdill-Mander
- * Mike Robertson
- */
- package mqtt
- import (
- "crypto/tls"
- "net/url"
- "time"
- )
- // MessageHandler is a callback type which can be set to be
- // executed upon the arrival of messages published to topics
- // to which the client is subscribed.
- type MessageHandler func(*Client, Message)
- // ConnectionLostHandler is a callback type which can be set to be
- // executed upon an unintended disconnection from the MQTT broker.
- // Disconnects caused by calling Disconnect or ForceDisconnect will
- // not cause an OnConnectionLost callback to execute.
- type ConnectionLostHandler func(*Client, error)
- // OnConnectHandler is a callback that is called when the client
- // state changes from unconnected/disconnected to connected. Both
- // at initial connection and on reconnection
- type OnConnectHandler func(*Client)
- // ClientOptions contains configurable options for an Client.
- type ClientOptions struct {
- Servers []*url.URL
- ClientID string
- Username string
- Password string
- CleanSession bool
- Order bool
- WillEnabled bool
- WillTopic string
- WillPayload []byte
- WillQos byte
- WillRetained bool
- ProtocolVersion uint
- protocolVersionExplicit bool
- TLSConfig tls.Config
- KeepAlive time.Duration
- ConnectTimeout time.Duration
- MaxReconnectInterval time.Duration
- AutoReconnect bool
- Store Store
- DefaultPublishHander MessageHandler
- OnConnect OnConnectHandler
- OnConnectionLost ConnectionLostHandler
- WriteTimeout time.Duration
- }
- // NewClientOptions will create a new ClientClientOptions type with some
- // default values.
- // Port: 1883
- // CleanSession: True
- // Order: True
- // KeepAlive: 30 (seconds)
- // ConnectTimeout: 30 (seconds)
- // MaxReconnectInterval 10 (minutes)
- // AutoReconnect: True
- func NewClientOptions() *ClientOptions {
- o := &ClientOptions{
- Servers: nil,
- ClientID: "",
- Username: "",
- Password: "",
- CleanSession: true,
- Order: true,
- WillEnabled: false,
- WillTopic: "",
- WillPayload: nil,
- WillQos: 0,
- WillRetained: false,
- ProtocolVersion: 0,
- protocolVersionExplicit: false,
- TLSConfig: tls.Config{},
- KeepAlive: 30 * time.Second,
- ConnectTimeout: 30 * time.Second,
- MaxReconnectInterval: 10 * time.Minute,
- AutoReconnect: true,
- Store: nil,
- OnConnect: nil,
- OnConnectionLost: DefaultConnectionLostHandler,
- WriteTimeout: 0, // 0 represents timeout disabled
- }
- return o
- }
- // AddBroker adds a broker URI to the list of brokers to be used. The format should be
- // scheme://host:port
- // Where "scheme" is one of "tcp", "ssl", or "ws", "host" is the ip-address (or hostname)
- // and "port" is the port on which the broker is accepting connections.
- func (o *ClientOptions) AddBroker(server string) *ClientOptions {
- brokerURI, _ := url.Parse(server)
- o.Servers = append(o.Servers, brokerURI)
- return o
- }
- // SetClientID will set the client id to be used by this client when
- // connecting to the MQTT broker. According to the MQTT v3.1 specification,
- // a client id mus be no longer than 23 characters.
- func (o *ClientOptions) SetClientID(id string) *ClientOptions {
- o.ClientID = id
- return o
- }
- // SetUsername will set the username to be used by this client when connecting
- // to the MQTT broker. Note: without the use of SSL/TLS, this information will
- // be sent in plaintext accross the wire.
- func (o *ClientOptions) SetUsername(u string) *ClientOptions {
- o.Username = u
- return o
- }
- // SetPassword will set the password to be used by this client when connecting
- // to the MQTT broker. Note: without the use of SSL/TLS, this information will
- // be sent in plaintext accross the wire.
- func (o *ClientOptions) SetPassword(p string) *ClientOptions {
- o.Password = p
- return o
- }
- // SetCleanSession will set the "clean session" flag in the connect message
- // when this client connects to an MQTT broker. By setting this flag, you are
- // indicating that no messages saved by the broker for this client should be
- // delivered. Any messages that were going to be sent by this client before
- // diconnecting previously but didn't will not be sent upon connecting to the
- // broker.
- func (o *ClientOptions) SetCleanSession(clean bool) *ClientOptions {
- o.CleanSession = clean
- return o
- }
- // SetOrderMatters will set the message routing to guarantee order within
- // each QoS level. By default, this value is true. If set to false,
- // this flag indicates that messages can be delivered asynchronously
- // from the client to the application and possibly arrive out of order.
- func (o *ClientOptions) SetOrderMatters(order bool) *ClientOptions {
- o.Order = order
- return o
- }
- // SetTLSConfig will set an SSL/TLS configuration to be used when connecting
- // to an MQTT broker. Please read the official Go documentation for more
- // information.
- func (o *ClientOptions) SetTLSConfig(t *tls.Config) *ClientOptions {
- o.TLSConfig = *t
- return o
- }
- // SetStore will set the implementation of the Store interface
- // used to provide message persistence in cases where QoS levels
- // QoS_ONE or QoS_TWO are used. If no store is provided, then the
- // client will use MemoryStore by default.
- func (o *ClientOptions) SetStore(s Store) *ClientOptions {
- o.Store = s
- return o
- }
- // SetKeepAlive will set the amount of time (in seconds) that the client
- // should wait before sending a PING request to the broker. This will
- // allow the client to know that a connection has not been lost with the
- // server.
- func (o *ClientOptions) SetKeepAlive(k time.Duration) *ClientOptions {
- o.KeepAlive = k
- return o
- }
- // SetProtocolVersion sets the MQTT version to be used to connect to the
- // broker. Legitimate values are currently 3 - MQTT 3.1 or 4 - MQTT 3.1.1
- func (o *ClientOptions) SetProtocolVersion(pv uint) *ClientOptions {
- if pv >= 3 && pv <= 4 {
- o.ProtocolVersion = pv
- o.protocolVersionExplicit = true
- }
- return o
- }
- // UnsetWill will cause any set will message to be disregarded.
- func (o *ClientOptions) UnsetWill() *ClientOptions {
- o.WillEnabled = false
- return o
- }
- // SetWill accepts a string will message to be set. When the client connects,
- // it will give this will message to the broker, which will then publish the
- // provided payload (the will) to any clients that are subscribed to the provided
- // topic.
- func (o *ClientOptions) SetWill(topic string, payload string, qos byte, retained bool) *ClientOptions {
- o.SetBinaryWill(topic, []byte(payload), qos, retained)
- return o
- }
- // SetBinaryWill accepts a []byte will message to be set. When the client connects,
- // it will give this will message to the broker, which will then publish the
- // provided payload (the will) to any clients that are subscribed to the provided
- // topic.
- func (o *ClientOptions) SetBinaryWill(topic string, payload []byte, qos byte, retained bool) *ClientOptions {
- o.WillEnabled = true
- o.WillTopic = topic
- o.WillPayload = payload
- o.WillQos = qos
- o.WillRetained = retained
- return o
- }
- // SetDefaultPublishHandler sets the MessageHandler that will be called when a message
- // is received that does not match any known subscriptions.
- func (o *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions {
- o.DefaultPublishHander = defaultHandler
- return o
- }
- // SetOnConnectHandler sets the function to be called when the client is connected. Both
- // at initial connection time and upon automatic reconnect.
- func (o *ClientOptions) SetOnConnectHandler(onConn OnConnectHandler) *ClientOptions {
- o.OnConnect = onConn
- return o
- }
- // SetConnectionLostHandler will set the OnConnectionLost callback to be executed
- // in the case where the client unexpectedly loses connection with the MQTT broker.
- func (o *ClientOptions) SetConnectionLostHandler(onLost ConnectionLostHandler) *ClientOptions {
- o.OnConnectionLost = onLost
- return o
- }
- // SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a
- // timeout error. A duration of 0 never times out. Default 30 seconds
- func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions {
- o.WriteTimeout = t
- return o
- }
- // SetConnectTimeout limits how long the client will wait when trying to open a connection
- // to an MQTT server before timeing out and erroring the attempt. A duration of 0 never times out.
- // Default 30 seconds. Currently only operational on TCP/TLS connections.
- func (o *ClientOptions) SetConnectTimeout(t time.Duration) *ClientOptions {
- o.ConnectTimeout = t
- return o
- }
- // SetMaxReconnectInterval sets the maximum time that will be waited between reconnection attempts
- // when connection is lost
- func (o *ClientOptions) SetMaxReconnectInterval(t time.Duration) *ClientOptions {
- o.MaxReconnectInterval = t
- return o
- }
- // SetAutoReconnect sets whether the automatic reconnection logic should be used
- // when the connection is lost, even if disabled the ConnectionLostHandler is still
- // called
- func (o *ClientOptions) SetAutoReconnect(a bool) *ClientOptions {
- o.AutoReconnect = a
- return o
- }
|