client.go 47 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240
  1. /*
  2. * Copyright (c) 2021 IBM Corp and others.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Seth Hoenig
  15. * Allan Stockdill-Mander
  16. * Mike Robertson
  17. * Matt Brittan
  18. */
  19. // Portions copyright © 2018 TIBCO Software Inc.
  20. // Package mqtt provides an MQTT v3.1.1 client library.
  21. package mqtt
  22. import (
  23. "bytes"
  24. "context"
  25. "errors"
  26. "fmt"
  27. "net"
  28. "strings"
  29. "sync"
  30. "sync/atomic"
  31. "time"
  32. "golang.org/x/sync/semaphore"
  33. "github.com/eclipse/paho.mqtt.golang/packets"
  34. )
  35. // Client is the interface definition for a Client as used by this
  36. // library, the interface is primarily to allow mocking tests.
  37. //
  38. // It is an MQTT v3.1.1 client for communicating
  39. // with an MQTT server using non-blocking methods that allow work
  40. // to be done in the background.
  41. // An application may connect to an MQTT server using:
  42. //
  43. // A plain TCP socket (e.g. mqtt://test.mosquitto.org:1833)
  44. // A secure SSL/TLS socket (e.g. tls://test.mosquitto.org:8883)
  45. // A websocket (e.g ws://test.mosquitto.org:8080 or wss://test.mosquitto.org:8081)
  46. // Something else (using `options.CustomOpenConnectionFn`)
  47. //
  48. // To enable ensured message delivery at Quality of Service (QoS) levels
  49. // described in the MQTT spec, a message persistence mechanism must be
  50. // used. This is done by providing a type which implements the Store
  51. // interface. For convenience, FileStore and MemoryStore are provided
  52. // implementations that should be sufficient for most use cases. More
  53. // information can be found in their respective documentation.
  54. // Numerous connection options may be specified by configuring a
  55. // and then supplying a ClientOptions type.
  56. // Implementations of Client must be safe for concurrent use by multiple
  57. // goroutines
  58. type Client interface {
  59. // IsConnected returns a bool signifying whether
  60. // the client is connected or not.
  61. IsConnected() bool
  62. // IsConnectionOpen return a bool signifying whether the client has an active
  63. // connection to mqtt broker, i.e not in disconnected or reconnect mode
  64. IsConnectionOpen() bool
  65. // Connect will create a connection to the message broker, by default
  66. // it will attempt to connect at v3.1.1 and auto retry at v3.1 if that
  67. // fails
  68. Connect() Token
  69. // Disconnect will end the connection with the server, but not before waiting
  70. // the specified number of milliseconds to wait for existing work to be
  71. // completed.
  72. Disconnect(quiesce uint)
  73. // Publish will publish a message with the specified QoS and content
  74. // to the specified topic.
  75. // Returns a token to track delivery of the message to the broker
  76. Publish(topic string, qos byte, retained bool, payload interface{}) Token
  77. // Subscribe starts a new subscription. Provide a MessageHandler to be executed when
  78. // a message is published on the topic provided, or nil for the default handler.
  79. //
  80. // If options.OrderMatters is true (the default) then callback must not block or
  81. // call functions within this package that may block (e.g. Publish) other than in
  82. // a new go routine.
  83. // callback must be safe for concurrent use by multiple goroutines.
  84. Subscribe(topic string, qos byte, callback MessageHandler) Token
  85. // SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
  86. // be executed when a message is published on one of the topics provided, or nil for the
  87. // default handler.
  88. //
  89. // If options.OrderMatters is true (the default) then callback must not block or
  90. // call functions within this package that may block (e.g. Publish) other than in
  91. // a new go routine.
  92. // callback must be safe for concurrent use by multiple goroutines.
  93. SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token
  94. // Unsubscribe will end the subscription from each of the topics provided.
  95. // Messages published to those topics from other clients will no longer be
  96. // received.
  97. Unsubscribe(topics ...string) Token
  98. // AddRoute allows you to add a handler for messages on a specific topic
  99. // without making a subscription. For example having a different handler
  100. // for parts of a wildcard subscription or for receiving retained messages
  101. // upon connection (before Sub scribe can be processed).
  102. //
  103. // If options.OrderMatters is true (the default) then callback must not block or
  104. // call functions within this package that may block (e.g. Publish) other than in
  105. // a new go routine.
  106. // callback must be safe for concurrent use by multiple goroutines.
  107. AddRoute(topic string, callback MessageHandler)
  108. // OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions
  109. // in use by the client.
  110. OptionsReader() ClientOptionsReader
  111. }
  112. // client implements the Client interface
  113. // clients are safe for concurrent use by multiple
  114. // goroutines
  115. type client struct {
  116. lastSent atomic.Value // time.Time - the last time a packet was successfully sent to network
  117. lastReceived atomic.Value // time.Time - the last time a packet was successfully received from network
  118. pingOutstanding int32 // set to 1 if a ping has been sent but response not ret received
  119. status connectionStatus // see constants in status.go for values
  120. messageIds // effectively a map from message id to token completor
  121. obound chan *PacketAndToken // outgoing publish packet
  122. oboundP chan *PacketAndToken // outgoing 'priority' packet (anything other than publish)
  123. msgRouter *router // routes topics to handlers
  124. persist Store
  125. options ClientOptions
  126. optionsMu sync.Mutex // Protects the options in a few limited cases where needed for testing
  127. conn net.Conn // the network connection, must only be set with connMu locked (only used when starting/stopping workers)
  128. connMu sync.Mutex // mutex for the connection (again only used in two functions)
  129. stop chan struct{} // Closed to request that workers stop
  130. workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
  131. commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)
  132. backoff *backoffController
  133. }
  134. // NewClient will create an MQTT v3.1.1 client with all of the options specified
  135. // in the provided ClientOptions. The client must have the Connect method called
  136. // on it before it may be used. This is to make sure resources (such as a net
  137. // connection) are created before the application is actually ready.
  138. func NewClient(o *ClientOptions) Client {
  139. c := &client{}
  140. c.options = *o
  141. if c.options.Store == nil {
  142. c.options.Store = NewMemoryStore()
  143. }
  144. switch c.options.ProtocolVersion {
  145. case 3, 4:
  146. c.options.protocolVersionExplicit = true
  147. case 0x83, 0x84:
  148. c.options.protocolVersionExplicit = true
  149. default:
  150. c.options.ProtocolVersion = 4
  151. c.options.protocolVersionExplicit = false
  152. }
  153. c.persist = c.options.Store
  154. c.messageIds = messageIds{index: make(map[uint16]tokenCompletor)}
  155. c.msgRouter = newRouter()
  156. c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
  157. c.obound = make(chan *PacketAndToken)
  158. c.oboundP = make(chan *PacketAndToken)
  159. c.backoff = newBackoffController()
  160. return c
  161. }
  162. // AddRoute allows you to add a handler for messages on a specific topic
  163. // without making a subscription. For example having a different handler
  164. // for parts of a wildcard subscription
  165. //
  166. // If options.OrderMatters is true (the default) then callback must not block or
  167. // call functions within this package that may block (e.g. Publish) other than in
  168. // a new go routine.
  169. // callback must be safe for concurrent use by multiple goroutines.
  170. func (c *client) AddRoute(topic string, callback MessageHandler) {
  171. if callback != nil {
  172. c.msgRouter.addRoute(topic, callback)
  173. }
  174. }
  175. // IsConnected returns a bool signifying whether
  176. // the client is connected or not.
  177. // connected means that the connection is up now OR it will
  178. // be established/reestablished automatically when possible
  179. // Warning: The connection status may change at any time so use this with care!
  180. func (c *client) IsConnected() bool {
  181. // This will need to change if additional statuses are added
  182. s, r := c.status.ConnectionStatusRetry()
  183. switch {
  184. case s == connected:
  185. return true
  186. case c.options.ConnectRetry && s == connecting:
  187. return true
  188. case c.options.AutoReconnect:
  189. return s == reconnecting || (s == disconnecting && r) // r indicates we will reconnect
  190. default:
  191. return false
  192. }
  193. }
  194. // IsConnectionOpen return a bool signifying whether the client has an active
  195. // connection to mqtt broker, i.e. not in disconnected or reconnect mode
  196. // Warning: The connection status may change at any time so use this with care!
  197. func (c *client) IsConnectionOpen() bool {
  198. return c.status.ConnectionStatus() == connected
  199. }
  200. // ErrNotConnected is the error returned from function calls that are
  201. // made when the client is not connected to a broker
  202. var ErrNotConnected = errors.New("not Connected")
  203. // Connect will create a connection to the message broker, by default
  204. // it will attempt to connect at v3.1.1 and auto retry at v3.1 if that
  205. // fails
  206. // Note: If using QOS1+ and CleanSession=false it is advisable to add
  207. // routes (or a DefaultPublishHandler) prior to calling Connect()
  208. // because queued messages may be delivered immediately post connection
  209. func (c *client) Connect() Token {
  210. t := newToken(packets.Connect).(*ConnectToken)
  211. DEBUG.Println(CLI, "Connect()")
  212. connectionUp, err := c.status.Connecting()
  213. if err != nil {
  214. if err == errAlreadyConnectedOrReconnecting && c.options.AutoReconnect {
  215. // When reconnection is active we don't consider calls tro Connect to ba an error (mainly for compatability)
  216. WARN.Println(CLI, "Connect() called but not disconnected")
  217. t.returnCode = packets.Accepted
  218. t.flowComplete()
  219. return t
  220. }
  221. ERROR.Println(CLI, err) // CONNECT should never be called unless we are disconnected
  222. t.setError(err)
  223. return t
  224. }
  225. c.persist.Open()
  226. if c.options.ConnectRetry {
  227. c.reserveStoredPublishIDs() // Reserve IDs to allow publishing before connect complete
  228. }
  229. go func() {
  230. if len(c.options.Servers) == 0 {
  231. t.setError(fmt.Errorf("no servers defined to connect to"))
  232. if err := connectionUp(false); err != nil {
  233. ERROR.Println(CLI, err.Error())
  234. }
  235. return
  236. }
  237. RETRYCONN:
  238. var conn net.Conn
  239. var rc byte
  240. var err error
  241. conn, rc, t.sessionPresent, err = c.attemptConnection()
  242. if err != nil {
  243. if c.options.ConnectRetry {
  244. DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry, error:", err.Error())
  245. time.Sleep(c.options.ConnectRetryInterval)
  246. if c.status.ConnectionStatus() == connecting { // Possible connection aborted elsewhere
  247. goto RETRYCONN
  248. }
  249. }
  250. ERROR.Println(CLI, "Failed to connect to a broker")
  251. c.persist.Close()
  252. t.returnCode = rc
  253. t.setError(err)
  254. if err := connectionUp(false); err != nil {
  255. ERROR.Println(CLI, err.Error())
  256. }
  257. return
  258. }
  259. inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
  260. if c.startCommsWorkers(conn, connectionUp, inboundFromStore) { // note that this takes care of updating the status (to connected or disconnected)
  261. // Take care of any messages in the store
  262. if !c.options.CleanSession {
  263. c.resume(c.options.ResumeSubs, inboundFromStore)
  264. } else {
  265. c.persist.Reset()
  266. }
  267. } else { // Note: With the new status subsystem this should only happen if Disconnect called simultaneously with the above
  268. WARN.Println(CLI, "Connect() called but connection established in another goroutine")
  269. }
  270. close(inboundFromStore)
  271. t.flowComplete()
  272. DEBUG.Println(CLI, "exit startClient")
  273. }()
  274. return t
  275. }
  276. // internal function used to reconnect the client when it loses its connection
  277. // The connection status MUST be reconnecting prior to calling this function (via call to status.connectionLost)
  278. func (c *client) reconnect(connectionUp connCompletedFn) {
  279. DEBUG.Println(CLI, "enter reconnect")
  280. var (
  281. initSleep = 1 * time.Second
  282. conn net.Conn
  283. )
  284. // If the reason of connection lost is same as the before one, sleep timer is set before attempting connection is started.
  285. // Sleep time is exponentially increased as the same situation continues
  286. if slp, isContinual := c.backoff.sleepWithBackoff("connectionLost", initSleep, c.options.MaxReconnectInterval, 3 * time.Second, true); isContinual {
  287. DEBUG.Println(CLI, "Detect continual connection lost after reconnect, slept for", int(slp.Seconds()), "seconds")
  288. }
  289. for {
  290. if nil != c.options.OnReconnecting {
  291. c.options.OnReconnecting(c, &c.options)
  292. }
  293. var err error
  294. conn, _, _, err = c.attemptConnection()
  295. if err == nil {
  296. break
  297. }
  298. sleep, _ := c.backoff.sleepWithBackoff("attemptReconnection", initSleep, c.options.MaxReconnectInterval, c.options.ConnectTimeout, false)
  299. DEBUG.Println(CLI, "Reconnect failed, slept for", int(sleep.Seconds()), "seconds:", err)
  300. if c.status.ConnectionStatus() != reconnecting { // Disconnect may have been called
  301. if err := connectionUp(false); err != nil { // Should always return an error
  302. ERROR.Println(CLI, err.Error())
  303. }
  304. DEBUG.Println(CLI, "Client moved to disconnected state while reconnecting, abandoning reconnect")
  305. return
  306. }
  307. }
  308. inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
  309. if c.startCommsWorkers(conn, connectionUp, inboundFromStore) { // note that this takes care of updating the status (to connected or disconnected)
  310. c.resume(c.options.ResumeSubs, inboundFromStore)
  311. }
  312. close(inboundFromStore)
  313. }
  314. // attemptConnection makes a single attempt to connect to each of the brokers
  315. // the protocol version to use is passed in (as c.options.ProtocolVersion)
  316. // Note: Does not set c.conn in order to minimise race conditions
  317. // Returns:
  318. // net.Conn - Connected network connection
  319. // byte - Return code (packets.Accepted indicates a successful connection).
  320. // bool - SessionPresent flag from the connect ack (only valid if packets.Accepted)
  321. // err - Error (err != nil guarantees that conn has been set to active connection).
  322. func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
  323. protocolVersion := c.options.ProtocolVersion
  324. var (
  325. sessionPresent bool
  326. conn net.Conn
  327. err error
  328. rc byte
  329. )
  330. c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases
  331. brokers := c.options.Servers
  332. c.optionsMu.Unlock()
  333. for _, broker := range brokers {
  334. cm := newConnectMsgFromOptions(&c.options, broker)
  335. DEBUG.Println(CLI, "about to write new connect msg")
  336. CONN:
  337. tlsCfg := c.options.TLSConfig
  338. if c.options.OnConnectAttempt != nil {
  339. DEBUG.Println(CLI, "using custom onConnectAttempt handler...")
  340. tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
  341. }
  342. connDeadline := time.Now().Add(c.options.ConnectTimeout) // Time by which connection must be established
  343. dialer := c.options.Dialer
  344. if dialer == nil { //
  345. WARN.Println(CLI, "dialer was nil, using default")
  346. dialer = &net.Dialer{Timeout: 30 * time.Second}
  347. }
  348. // Start by opening the network connection (tcp, tls, ws) etc
  349. if c.options.CustomOpenConnectionFn != nil {
  350. conn, err = c.options.CustomOpenConnectionFn(broker, c.options)
  351. } else {
  352. conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, dialer)
  353. }
  354. if err != nil {
  355. ERROR.Println(CLI, err.Error())
  356. WARN.Println(CLI, "failed to connect to broker, trying next")
  357. rc = packets.ErrNetworkError
  358. continue
  359. }
  360. DEBUG.Println(CLI, "socket connected to broker")
  361. // Now we perform the MQTT connection handshake ensuring that it does not exceed the timeout
  362. if err := conn.SetDeadline(connDeadline); err != nil {
  363. ERROR.Println(CLI, "set deadline for handshake ", err)
  364. }
  365. // Now we perform the MQTT connection handshake
  366. rc, sessionPresent, err = connectMQTT(conn, cm, protocolVersion)
  367. if rc == packets.Accepted {
  368. if err := conn.SetDeadline(time.Time{}); err != nil {
  369. ERROR.Println(CLI, "reset deadline following handshake ", err)
  370. }
  371. break // successfully connected
  372. }
  373. // We may have to attempt the connection with MQTT 3.1
  374. _ = conn.Close()
  375. if !c.options.protocolVersionExplicit && protocolVersion == 4 { // try falling back to 3.1?
  376. DEBUG.Println(CLI, "Trying reconnect using MQTT 3.1 protocol")
  377. protocolVersion = 3
  378. goto CONN
  379. }
  380. if c.options.protocolVersionExplicit { // to maintain logging from previous version
  381. ERROR.Println(CLI, "Connecting to", broker, "CONNACK was not CONN_ACCEPTED, but rather", packets.ConnackReturnCodes[rc])
  382. }
  383. }
  384. // If the connection was successful we set member variable and lock in the protocol version for future connection attempts (and users)
  385. if rc == packets.Accepted {
  386. c.options.ProtocolVersion = protocolVersion
  387. c.options.protocolVersionExplicit = true
  388. } else {
  389. // Maintain same error format as used previously
  390. if rc != packets.ErrNetworkError { // mqtt error
  391. err = packets.ConnErrors[rc]
  392. } else { // network error (if this occurred in ConnectMQTT then err will be nil)
  393. err = fmt.Errorf("%s : %s", packets.ConnErrors[rc], err)
  394. }
  395. }
  396. return conn, rc, sessionPresent, err
  397. }
  398. // Disconnect will end the connection with the server, but not before waiting
  399. // the specified number of milliseconds to wait for existing work to be
  400. // completed.
  401. // WARNING: `Disconnect` may return before all activities (goroutines) have completed. This means that
  402. // reusing the `client` may lead to panics. If you want to reconnect when the connection drops then use
  403. // `SetAutoReconnect` and/or `SetConnectRetry`options instead of implementing this yourself.
  404. func (c *client) Disconnect(quiesce uint) {
  405. done := make(chan struct{}) // Simplest way to ensure quiesce is always honoured
  406. go func() {
  407. defer close(done)
  408. disDone, err := c.status.Disconnecting()
  409. if err != nil {
  410. // Status has been set to disconnecting, but we had to wait for something else to complete
  411. WARN.Println(CLI, err.Error())
  412. return
  413. }
  414. defer func() {
  415. c.disconnect() // Force disconnection
  416. disDone() // Update status
  417. }()
  418. DEBUG.Println(CLI, "disconnecting")
  419. dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
  420. dt := newToken(packets.Disconnect)
  421. select {
  422. case c.oboundP <- &PacketAndToken{p: dm, t: dt}:
  423. // wait for work to finish, or quiesce time consumed
  424. DEBUG.Println(CLI, "calling WaitTimeout")
  425. dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
  426. DEBUG.Println(CLI, "WaitTimeout done")
  427. // Below code causes a potential data race. Following status refactor it should no longer be required
  428. // but leaving in as need to check code further.
  429. // case <-c.commsStopped:
  430. // WARN.Println("Disconnect packet could not be sent because comms stopped")
  431. case <-time.After(time.Duration(quiesce) * time.Millisecond):
  432. WARN.Println("Disconnect packet not sent due to timeout")
  433. }
  434. }()
  435. // Return when done or after timeout expires (would like to change but this maintains compatibility)
  436. delay := time.NewTimer(time.Duration(quiesce) * time.Millisecond)
  437. select {
  438. case <-done:
  439. if !delay.Stop() {
  440. <-delay.C
  441. }
  442. case <-delay.C:
  443. }
  444. }
  445. // forceDisconnect will end the connection with the mqtt broker immediately (used for tests only)
  446. func (c *client) forceDisconnect() {
  447. disDone, err := c.status.Disconnecting()
  448. if err != nil {
  449. // Possible that we are not actually connected
  450. WARN.Println(CLI, err.Error())
  451. return
  452. }
  453. DEBUG.Println(CLI, "forcefully disconnecting")
  454. c.disconnect()
  455. disDone()
  456. }
  457. // disconnect cleans up after a final disconnection (user requested so no auto reconnection)
  458. func (c *client) disconnect() {
  459. done := c.stopCommsWorkers()
  460. if done != nil {
  461. <-done // Wait until the disconnect is complete (to limit chance that another connection will be started)
  462. DEBUG.Println(CLI, "forcefully disconnecting")
  463. c.messageIds.cleanUp()
  464. DEBUG.Println(CLI, "disconnected")
  465. c.persist.Close()
  466. }
  467. }
  468. // internalConnLost cleanup when connection is lost or an error occurs
  469. // Note: This function will not block
  470. func (c *client) internalConnLost(whyConnLost error) {
  471. // It is possible that internalConnLost will be called multiple times simultaneously
  472. // (including after sending a DisconnectPacket) as such we only do cleanup etc if the
  473. // routines were actually running and are not being disconnected at users request
  474. DEBUG.Println(CLI, "internalConnLost called")
  475. disDone, err := c.status.ConnectionLost(c.options.AutoReconnect && c.status.ConnectionStatus() > connecting)
  476. if err != nil {
  477. if err == errConnLossWhileDisconnecting || err == errAlreadyHandlingConnectionLoss {
  478. return // Loss of connection is expected or already being handled
  479. }
  480. ERROR.Println(CLI, fmt.Sprintf("internalConnLost unexpected status: %s", err.Error()))
  481. return
  482. }
  483. // c.stopCommsWorker returns a channel that is closed when the operation completes. This was required prior
  484. // to the implementation of proper status management but has been left in place, for now, to minimise change
  485. stopDone := c.stopCommsWorkers()
  486. // stopDone was required in previous versions because there was no connectionLost status (and there were
  487. // issues with status handling). This code has been left in place for the time being just in case the new
  488. // status handling contains bugs (refactoring required at some point).
  489. if stopDone == nil { // stopDone will be nil if workers already in the process of stopping or stopped
  490. ERROR.Println(CLI, "internalConnLost stopDone unexpectedly nil - BUG BUG")
  491. // Cannot really do anything other than leave things disconnected
  492. if _, err = disDone(false); err != nil { // Safest option - cannot leave status as connectionLost
  493. ERROR.Println(CLI, fmt.Sprintf("internalConnLost failed to set status to disconnected (stopDone): %s", err.Error()))
  494. }
  495. return
  496. }
  497. // It may take a while for the disconnection to complete whatever called us needs to exit cleanly so finnish in goRoutine
  498. go func() {
  499. DEBUG.Println(CLI, "internalConnLost waiting on workers")
  500. <-stopDone
  501. DEBUG.Println(CLI, "internalConnLost workers stopped")
  502. reConnDone, err := disDone(true)
  503. if err != nil {
  504. ERROR.Println(CLI, "failure whilst reporting completion of disconnect", err)
  505. } else if reConnDone == nil { // Should never happen
  506. ERROR.Println(CLI, "BUG BUG BUG reconnection function is nil", err)
  507. }
  508. reconnect := err == nil && reConnDone != nil
  509. if c.options.CleanSession && !reconnect {
  510. c.messageIds.cleanUp() // completes PUB/SUB/UNSUB tokens
  511. } else if !c.options.ResumeSubs {
  512. c.messageIds.cleanUpSubscribe() // completes SUB/UNSUB tokens
  513. }
  514. if reconnect {
  515. go c.reconnect(reConnDone) // Will set connection status to reconnecting
  516. }
  517. if c.options.OnConnectionLost != nil {
  518. go c.options.OnConnectionLost(c, whyConnLost)
  519. }
  520. DEBUG.Println(CLI, "internalConnLost complete")
  521. }()
  522. }
  523. // startCommsWorkers is called when the connection is up.
  524. // It starts off the routines needed to process incoming and outgoing messages.
  525. // Returns true if the comms workers were started (i.e. successful connection)
  526. // connectionUp(true) will be called once everything is up; connectionUp(false) will be called on failure
  527. func (c *client) startCommsWorkers(conn net.Conn, connectionUp connCompletedFn, inboundFromStore <-chan packets.ControlPacket) bool {
  528. DEBUG.Println(CLI, "startCommsWorkers called")
  529. c.connMu.Lock()
  530. defer c.connMu.Unlock()
  531. if c.conn != nil { // Should never happen due to new status handling; leaving in for safety for the time being
  532. WARN.Println(CLI, "startCommsWorkers called when commsworkers already running BUG BUG")
  533. _ = conn.Close() // No use for the new network connection
  534. if err := connectionUp(false); err != nil {
  535. ERROR.Println(CLI, err.Error())
  536. }
  537. return false
  538. }
  539. c.conn = conn // Store the connection
  540. c.stop = make(chan struct{})
  541. if c.options.KeepAlive != 0 {
  542. atomic.StoreInt32(&c.pingOutstanding, 0)
  543. c.lastReceived.Store(time.Now())
  544. c.lastSent.Store(time.Now())
  545. c.workers.Add(1)
  546. go keepalive(c, conn)
  547. }
  548. // matchAndDispatch will process messages received from the network. It may generate acknowledgements
  549. // It will complete when incomingPubChan is closed and will close ackOut prior to exiting
  550. incomingPubChan := make(chan *packets.PublishPacket)
  551. c.workers.Add(1) // Done will be called when ackOut is closed
  552. ackOut := c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c)
  553. // The connection is now ready for use (we spin up a few go routines below). It is possible that
  554. // Disconnect has been called in the interim...
  555. if err := connectionUp(true); err != nil {
  556. DEBUG.Println(CLI, err)
  557. close(c.stop) // Tidy up anything we have already started
  558. close(incomingPubChan)
  559. c.workers.Wait()
  560. c.conn.Close()
  561. c.conn = nil
  562. return false
  563. }
  564. DEBUG.Println(CLI, "client is connected/reconnected")
  565. if c.options.OnConnect != nil {
  566. go c.options.OnConnect(c)
  567. }
  568. // c.oboundP and c.obound need to stay active for the life of the client because, depending upon the options,
  569. // messages may be published while the client is disconnected (they will block unless in a goroutine). However
  570. // to keep the comms routines clean we want to shutdown the input messages it uses so create out own channels
  571. // and copy data across.
  572. commsobound := make(chan *PacketAndToken) // outgoing publish packets
  573. commsoboundP := make(chan *PacketAndToken) // outgoing 'priority' packet
  574. c.workers.Add(1)
  575. go func() {
  576. defer c.workers.Done()
  577. for {
  578. select {
  579. case msg := <-c.oboundP:
  580. commsoboundP <- msg
  581. case msg := <-c.obound:
  582. commsobound <- msg
  583. case msg, ok := <-ackOut:
  584. if !ok {
  585. ackOut = nil // ignore channel going forward
  586. c.workers.Done() // matchAndDispatch has completed
  587. continue // await next message
  588. }
  589. commsoboundP <- msg
  590. case <-c.stop:
  591. // Attempt to transmit any outstanding acknowledgements (this may well fail but should work if this is a clean disconnect)
  592. if ackOut != nil {
  593. for msg := range ackOut {
  594. commsoboundP <- msg
  595. }
  596. c.workers.Done() // matchAndDispatch has completed
  597. }
  598. close(commsoboundP) // Nothing sending to these channels anymore so close them and allow comms routines to exit
  599. close(commsobound)
  600. DEBUG.Println(CLI, "startCommsWorkers output redirector finished")
  601. return
  602. }
  603. }
  604. }()
  605. commsIncomingPub, commsErrors := startComms(c.conn, c, inboundFromStore, commsoboundP, commsobound)
  606. c.commsStopped = make(chan struct{})
  607. go func() {
  608. for {
  609. if commsIncomingPub == nil && commsErrors == nil {
  610. break
  611. }
  612. select {
  613. case pub, ok := <-commsIncomingPub:
  614. if !ok {
  615. // Incoming comms has shutdown
  616. close(incomingPubChan) // stop the router
  617. commsIncomingPub = nil
  618. continue
  619. }
  620. // Care is needed here because an error elsewhere could trigger a deadlock
  621. sendPubLoop:
  622. for {
  623. select {
  624. case incomingPubChan <- pub:
  625. break sendPubLoop
  626. case err, ok := <-commsErrors:
  627. if !ok { // commsErrors has been closed so we can ignore it
  628. commsErrors = nil
  629. continue
  630. }
  631. ERROR.Println(CLI, "Connect comms goroutine - error triggered during send Pub", err)
  632. c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
  633. continue
  634. }
  635. }
  636. case err, ok := <-commsErrors:
  637. if !ok {
  638. commsErrors = nil
  639. continue
  640. }
  641. ERROR.Println(CLI, "Connect comms goroutine - error triggered", err)
  642. c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
  643. continue
  644. }
  645. }
  646. DEBUG.Println(CLI, "incoming comms goroutine done")
  647. close(c.commsStopped)
  648. }()
  649. DEBUG.Println(CLI, "startCommsWorkers done")
  650. return true
  651. }
  652. // stopWorkersAndComms - Cleanly shuts down worker go routines (including the comms routines) and waits until everything has stopped
  653. // Returns nil if workers did not need to be stopped; otherwise returns a channel which will be closed when the stop is complete
  654. // Note: This may block so run as a go routine if calling from any of the comms routines
  655. // Note2: It should be possible to simplify this now that the new status management code is in place.
  656. func (c *client) stopCommsWorkers() chan struct{} {
  657. DEBUG.Println(CLI, "stopCommsWorkers called")
  658. // It is possible that this function will be called multiple times simultaneously due to the way things get shutdown
  659. c.connMu.Lock()
  660. if c.conn == nil {
  661. DEBUG.Println(CLI, "stopCommsWorkers done (not running)")
  662. c.connMu.Unlock()
  663. return nil
  664. }
  665. // It is important that everything is stopped in the correct order to avoid deadlocks. The main issue here is
  666. // the router because it both receives incoming publish messages and also sends outgoing acknowledgements. To
  667. // avoid issues we signal the workers to stop and close the connection (it is probably already closed but
  668. // there is no harm in being sure). We can then wait for the workers to finnish before closing outbound comms
  669. // channels which will allow the comms routines to exit.
  670. // We stop all non-comms related workers first (ping, keepalive, errwatch, resume etc) so they don't get blocked waiting on comms
  671. close(c.stop) // Signal for workers to stop
  672. c.conn.Close() // Possible that this is already closed but no harm in closing again
  673. c.conn = nil // Important that this is the only place that this is set to nil
  674. c.connMu.Unlock() // As the connection is now nil we can unlock the mu (allowing subsequent calls to exit immediately)
  675. doneChan := make(chan struct{})
  676. go func() {
  677. DEBUG.Println(CLI, "stopCommsWorkers waiting for workers")
  678. c.workers.Wait()
  679. // Stopping the workers will allow the comms routines to exit; we wait for these to complete
  680. DEBUG.Println(CLI, "stopCommsWorkers waiting for comms")
  681. <-c.commsStopped // wait for comms routine to stop
  682. DEBUG.Println(CLI, "stopCommsWorkers done")
  683. close(doneChan)
  684. }()
  685. return doneChan
  686. }
  687. // Publish will publish a message with the specified QoS and content
  688. // to the specified topic.
  689. // Returns a token to track delivery of the message to the broker
  690. func (c *client) Publish(topic string, qos byte, retained bool, payload interface{}) Token {
  691. token := newToken(packets.Publish).(*PublishToken)
  692. DEBUG.Println(CLI, "enter Publish")
  693. switch {
  694. case !c.IsConnected():
  695. token.setError(ErrNotConnected)
  696. return token
  697. case c.status.ConnectionStatus() == reconnecting && qos == 0:
  698. // message written to store and will be sent when connection comes up
  699. token.flowComplete()
  700. return token
  701. }
  702. pub := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
  703. pub.Qos = qos
  704. pub.TopicName = topic
  705. pub.Retain = retained
  706. switch p := payload.(type) {
  707. case string:
  708. pub.Payload = []byte(p)
  709. case []byte:
  710. pub.Payload = p
  711. case bytes.Buffer:
  712. pub.Payload = p.Bytes()
  713. default:
  714. token.setError(fmt.Errorf("unknown payload type"))
  715. return token
  716. }
  717. if pub.Qos != 0 && pub.MessageID == 0 {
  718. mID := c.getID(token)
  719. if mID == 0 {
  720. token.setError(fmt.Errorf("no message IDs available"))
  721. return token
  722. }
  723. pub.MessageID = mID
  724. token.messageID = mID
  725. }
  726. persistOutbound(c.persist, pub)
  727. switch c.status.ConnectionStatus() {
  728. case connecting:
  729. DEBUG.Println(CLI, "storing publish message (connecting), topic:", topic)
  730. case reconnecting:
  731. DEBUG.Println(CLI, "storing publish message (reconnecting), topic:", topic)
  732. case disconnecting:
  733. DEBUG.Println(CLI, "storing publish message (disconnecting), topic:", topic)
  734. default:
  735. DEBUG.Println(CLI, "sending publish message, topic:", topic)
  736. publishWaitTimeout := c.options.WriteTimeout
  737. if publishWaitTimeout == 0 {
  738. publishWaitTimeout = time.Second * 30
  739. }
  740. select {
  741. case c.obound <- &PacketAndToken{p: pub, t: token}:
  742. case <-time.After(publishWaitTimeout):
  743. token.setError(errors.New("publish was broken by timeout"))
  744. }
  745. }
  746. return token
  747. }
  748. // Subscribe starts a new subscription. Provide a MessageHandler to be executed when
  749. // a message is published on the topic provided.
  750. //
  751. // If options.OrderMatters is true (the default) then callback must not block or
  752. // call functions within this package that may block (e.g. Publish) other than in
  753. // a new go routine.
  754. // callback must be safe for concurrent use by multiple goroutines.
  755. func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Token {
  756. token := newToken(packets.Subscribe).(*SubscribeToken)
  757. DEBUG.Println(CLI, "enter Subscribe")
  758. if !c.IsConnected() {
  759. token.setError(ErrNotConnected)
  760. return token
  761. }
  762. if !c.IsConnectionOpen() {
  763. switch {
  764. case !c.options.ResumeSubs:
  765. // if not connected and resumeSubs not set this sub will be thrown away
  766. token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
  767. return token
  768. case c.options.CleanSession && c.status.ConnectionStatus() == reconnecting:
  769. // if reconnecting and cleanSession is true this sub will be thrown away
  770. token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
  771. return token
  772. }
  773. }
  774. sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
  775. if err := validateTopicAndQos(topic, qos); err != nil {
  776. token.setError(err)
  777. return token
  778. }
  779. sub.Topics = append(sub.Topics, topic)
  780. sub.Qoss = append(sub.Qoss, qos)
  781. if strings.HasPrefix(topic, "$share/") {
  782. topic = strings.Join(strings.Split(topic, "/")[2:], "/")
  783. }
  784. if strings.HasPrefix(topic, "$queue/") {
  785. topic = strings.TrimPrefix(topic, "$queue/")
  786. }
  787. if callback != nil {
  788. c.msgRouter.addRoute(topic, callback)
  789. }
  790. token.subs = append(token.subs, topic)
  791. if sub.MessageID == 0 {
  792. mID := c.getID(token)
  793. if mID == 0 {
  794. token.setError(fmt.Errorf("no message IDs available"))
  795. return token
  796. }
  797. sub.MessageID = mID
  798. token.messageID = mID
  799. }
  800. DEBUG.Println(CLI, sub.String())
  801. if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
  802. persistOutbound(c.persist, sub)
  803. }
  804. switch c.status.ConnectionStatus() {
  805. case connecting:
  806. DEBUG.Println(CLI, "storing subscribe message (connecting), topic:", topic)
  807. case reconnecting:
  808. DEBUG.Println(CLI, "storing subscribe message (reconnecting), topic:", topic)
  809. case disconnecting:
  810. DEBUG.Println(CLI, "storing subscribe message (disconnecting), topic:", topic)
  811. default:
  812. DEBUG.Println(CLI, "sending subscribe message, topic:", topic)
  813. subscribeWaitTimeout := c.options.WriteTimeout
  814. if subscribeWaitTimeout == 0 {
  815. subscribeWaitTimeout = time.Second * 30
  816. }
  817. select {
  818. case c.oboundP <- &PacketAndToken{p: sub, t: token}:
  819. case <-time.After(subscribeWaitTimeout):
  820. token.setError(errors.New("subscribe was broken by timeout"))
  821. }
  822. }
  823. DEBUG.Println(CLI, "exit Subscribe")
  824. return token
  825. }
  826. // SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
  827. // be executed when a message is published on one of the topics provided.
  828. //
  829. // If options.OrderMatters is true (the default) then callback must not block or
  830. // call functions within this package that may block (e.g. Publish) other than in
  831. // a new go routine.
  832. // callback must be safe for concurrent use by multiple goroutines.
  833. func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token {
  834. var err error
  835. token := newToken(packets.Subscribe).(*SubscribeToken)
  836. DEBUG.Println(CLI, "enter SubscribeMultiple")
  837. if !c.IsConnected() {
  838. token.setError(ErrNotConnected)
  839. return token
  840. }
  841. if !c.IsConnectionOpen() {
  842. switch {
  843. case !c.options.ResumeSubs:
  844. // if not connected and resumesubs not set this sub will be thrown away
  845. token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
  846. return token
  847. case c.options.CleanSession && c.status.ConnectionStatus() == reconnecting:
  848. // if reconnecting and cleanSession is true this sub will be thrown away
  849. token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
  850. return token
  851. }
  852. }
  853. sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
  854. if sub.Topics, sub.Qoss, err = validateSubscribeMap(filters); err != nil {
  855. token.setError(err)
  856. return token
  857. }
  858. if callback != nil {
  859. for topic := range filters {
  860. c.msgRouter.addRoute(topic, callback)
  861. }
  862. }
  863. token.subs = make([]string, len(sub.Topics))
  864. copy(token.subs, sub.Topics)
  865. if sub.MessageID == 0 {
  866. mID := c.getID(token)
  867. if mID == 0 {
  868. token.setError(fmt.Errorf("no message IDs available"))
  869. return token
  870. }
  871. sub.MessageID = mID
  872. token.messageID = mID
  873. }
  874. if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
  875. persistOutbound(c.persist, sub)
  876. }
  877. switch c.status.ConnectionStatus() {
  878. case connecting:
  879. DEBUG.Println(CLI, "storing subscribe message (connecting), topics:", sub.Topics)
  880. case reconnecting:
  881. DEBUG.Println(CLI, "storing subscribe message (reconnecting), topics:", sub.Topics)
  882. case disconnecting:
  883. DEBUG.Println(CLI, "storing subscribe message (disconnecting), topics:", sub.Topics)
  884. default:
  885. DEBUG.Println(CLI, "sending subscribe message, topics:", sub.Topics)
  886. subscribeWaitTimeout := c.options.WriteTimeout
  887. if subscribeWaitTimeout == 0 {
  888. subscribeWaitTimeout = time.Second * 30
  889. }
  890. select {
  891. case c.oboundP <- &PacketAndToken{p: sub, t: token}:
  892. case <-time.After(subscribeWaitTimeout):
  893. token.setError(errors.New("subscribe was broken by timeout"))
  894. }
  895. }
  896. DEBUG.Println(CLI, "exit SubscribeMultiple")
  897. return token
  898. }
  899. // reserveStoredPublishIDs reserves the ids for publish packets in the persistent store to ensure these are not duplicated
  900. func (c *client) reserveStoredPublishIDs() {
  901. // The resume function sets the stored id for publish packets only (some other packets
  902. // will get new ids in net code). This means that the only keys we need to ensure are
  903. // unique are the publish ones (and these will completed/replaced in resume() )
  904. if !c.options.CleanSession {
  905. storedKeys := c.persist.All()
  906. for _, key := range storedKeys {
  907. packet := c.persist.Get(key)
  908. if packet == nil {
  909. continue
  910. }
  911. switch packet.(type) {
  912. case *packets.PublishPacket:
  913. details := packet.Details()
  914. token := &PlaceHolderToken{id: details.MessageID}
  915. c.claimID(token, details.MessageID)
  916. }
  917. }
  918. }
  919. }
  920. // Load all stored messages and resend them
  921. // Call this to ensure QOS > 1,2 even after an application crash
  922. // Note: This function will exit if c.stop is closed (this allows the shutdown to proceed avoiding a potential deadlock)
  923. // other than that it does not return until all messages in the store have been sent (connect() does not complete its
  924. // token before this completes)
  925. func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
  926. DEBUG.Println(STR, "enter Resume")
  927. // Prior to sending a message getSemaphore will be called and once sent releaseSemaphore will be called
  928. // with the token (so semaphore can be released when ACK received if applicable).
  929. // Using a weighted semaphore rather than channels because this retains ordering
  930. getSemaphore := func() {} // Default = do nothing
  931. releaseSemaphore := func(_ *PublishToken) {} // Default = do nothing
  932. var sem *semaphore.Weighted
  933. if c.options.MaxResumePubInFlight > 0 {
  934. sem = semaphore.NewWeighted(int64(c.options.MaxResumePubInFlight))
  935. ctx, cancel := context.WithCancel(context.Background()) // Context needed for semaphore
  936. defer cancel() // ensure context gets cancelled
  937. go func() {
  938. select {
  939. case <-c.stop: // Request to stop (due to comm error etc)
  940. cancel()
  941. case <-ctx.Done(): // resume completed normally
  942. }
  943. }()
  944. getSemaphore = func() { sem.Acquire(ctx, 1) }
  945. releaseSemaphore = func(token *PublishToken) { // Note: If token never completes then resume() may stall (will still exit on ctx.Done())
  946. go func() {
  947. select {
  948. case <-token.Done():
  949. case <-ctx.Done():
  950. }
  951. sem.Release(1)
  952. }()
  953. }
  954. }
  955. storedKeys := c.persist.All()
  956. for _, key := range storedKeys {
  957. packet := c.persist.Get(key)
  958. if packet == nil {
  959. DEBUG.Println(STR, fmt.Sprintf("resume found NIL packet (%s)", key))
  960. continue
  961. }
  962. details := packet.Details()
  963. if isKeyOutbound(key) {
  964. switch p := packet.(type) {
  965. case *packets.SubscribePacket:
  966. if subscription {
  967. DEBUG.Println(STR, fmt.Sprintf("loaded pending subscribe (%d)", details.MessageID))
  968. subPacket := packet.(*packets.SubscribePacket)
  969. token := newToken(packets.Subscribe).(*SubscribeToken)
  970. token.messageID = details.MessageID
  971. token.subs = append(token.subs, subPacket.Topics...)
  972. c.claimID(token, details.MessageID)
  973. select {
  974. case c.oboundP <- &PacketAndToken{p: packet, t: token}:
  975. case <-c.stop:
  976. DEBUG.Println(STR, "resume exiting due to stop")
  977. return
  978. }
  979. } else {
  980. c.persist.Del(key) // Unsubscribe packets should not be retained following a reconnect
  981. }
  982. case *packets.UnsubscribePacket:
  983. if subscription {
  984. DEBUG.Println(STR, fmt.Sprintf("loaded pending unsubscribe (%d)", details.MessageID))
  985. token := newToken(packets.Unsubscribe).(*UnsubscribeToken)
  986. select {
  987. case c.oboundP <- &PacketAndToken{p: packet, t: token}:
  988. case <-c.stop:
  989. DEBUG.Println(STR, "resume exiting due to stop")
  990. return
  991. }
  992. } else {
  993. c.persist.Del(key) // Unsubscribe packets should not be retained following a reconnect
  994. }
  995. case *packets.PubrelPacket:
  996. DEBUG.Println(STR, fmt.Sprintf("loaded pending pubrel (%d)", details.MessageID))
  997. select {
  998. case c.oboundP <- &PacketAndToken{p: packet, t: nil}:
  999. case <-c.stop:
  1000. DEBUG.Println(STR, "resume exiting due to stop")
  1001. return
  1002. }
  1003. case *packets.PublishPacket:
  1004. // spec: If the DUP flag is set to 0, it indicates that this is the first occasion that the Client or
  1005. // Server has attempted to send this MQTT PUBLISH Packet. If the DUP flag is set to 1, it indicates that
  1006. // this might be re-delivery of an earlier attempt to send the Packet.
  1007. //
  1008. // If the message is in the store than an attempt at delivery has been made (note that the message may
  1009. // never have made it onto the wire but tracking that would be complicated!).
  1010. if p.Qos != 0 { // spec: The DUP flag MUST be set to 0 for all QoS 0 messages
  1011. p.Dup = true
  1012. }
  1013. token := newToken(packets.Publish).(*PublishToken)
  1014. token.messageID = details.MessageID
  1015. c.claimID(token, details.MessageID)
  1016. DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID))
  1017. DEBUG.Println(STR, details)
  1018. getSemaphore()
  1019. select {
  1020. case c.obound <- &PacketAndToken{p: p, t: token}:
  1021. case <-c.stop:
  1022. DEBUG.Println(STR, "resume exiting due to stop")
  1023. return
  1024. }
  1025. releaseSemaphore(token) // If limiting simultaneous messages then we need to know when message is acknowledged
  1026. default:
  1027. ERROR.Println(STR, fmt.Sprintf("invalid message type (inbound - %T) in store (discarded)", packet))
  1028. c.persist.Del(key)
  1029. }
  1030. } else {
  1031. switch packet.(type) {
  1032. case *packets.PubrelPacket:
  1033. DEBUG.Println(STR, fmt.Sprintf("loaded pending incomming (%d)", details.MessageID))
  1034. select {
  1035. case ibound <- packet:
  1036. case <-c.stop:
  1037. DEBUG.Println(STR, "resume exiting due to stop (ibound <- packet)")
  1038. return
  1039. }
  1040. default:
  1041. ERROR.Println(STR, fmt.Sprintf("invalid message type (%T) in store (discarded)", packet))
  1042. c.persist.Del(key)
  1043. }
  1044. }
  1045. }
  1046. DEBUG.Println(STR, "exit resume")
  1047. }
  1048. // Unsubscribe will end the subscription from each of the topics provided.
  1049. // Messages published to those topics from other clients will no longer be
  1050. // received.
  1051. func (c *client) Unsubscribe(topics ...string) Token {
  1052. token := newToken(packets.Unsubscribe).(*UnsubscribeToken)
  1053. DEBUG.Println(CLI, "enter Unsubscribe")
  1054. if !c.IsConnected() {
  1055. token.setError(ErrNotConnected)
  1056. return token
  1057. }
  1058. if !c.IsConnectionOpen() {
  1059. switch {
  1060. case !c.options.ResumeSubs:
  1061. // if not connected and resumeSubs not set this unsub will be thrown away
  1062. token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
  1063. return token
  1064. case c.options.CleanSession && c.status.ConnectionStatus() == reconnecting:
  1065. // if reconnecting and cleanSession is true this unsub will be thrown away
  1066. token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
  1067. return token
  1068. }
  1069. }
  1070. unsub := packets.NewControlPacket(packets.Unsubscribe).(*packets.UnsubscribePacket)
  1071. unsub.Topics = make([]string, len(topics))
  1072. copy(unsub.Topics, topics)
  1073. if unsub.MessageID == 0 {
  1074. mID := c.getID(token)
  1075. if mID == 0 {
  1076. token.setError(fmt.Errorf("no message IDs available"))
  1077. return token
  1078. }
  1079. unsub.MessageID = mID
  1080. token.messageID = mID
  1081. }
  1082. if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
  1083. persistOutbound(c.persist, unsub)
  1084. }
  1085. switch c.status.ConnectionStatus() {
  1086. case connecting:
  1087. DEBUG.Println(CLI, "storing unsubscribe message (connecting), topics:", topics)
  1088. case reconnecting:
  1089. DEBUG.Println(CLI, "storing unsubscribe message (reconnecting), topics:", topics)
  1090. case disconnecting:
  1091. DEBUG.Println(CLI, "storing unsubscribe message (reconnecting), topics:", topics)
  1092. default:
  1093. DEBUG.Println(CLI, "sending unsubscribe message, topics:", topics)
  1094. subscribeWaitTimeout := c.options.WriteTimeout
  1095. if subscribeWaitTimeout == 0 {
  1096. subscribeWaitTimeout = time.Second * 30
  1097. }
  1098. select {
  1099. case c.oboundP <- &PacketAndToken{p: unsub, t: token}:
  1100. for _, topic := range topics {
  1101. c.msgRouter.deleteRoute(topic)
  1102. }
  1103. case <-time.After(subscribeWaitTimeout):
  1104. token.setError(errors.New("unsubscribe was broken by timeout"))
  1105. }
  1106. }
  1107. DEBUG.Println(CLI, "exit Unsubscribe")
  1108. return token
  1109. }
  1110. // OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions
  1111. // in use by the client.
  1112. func (c *client) OptionsReader() ClientOptionsReader {
  1113. r := ClientOptionsReader{options: &c.options}
  1114. return r
  1115. }
  1116. // DefaultConnectionLostHandler is a definition of a function that simply
  1117. // reports to the DEBUG log the reason for the client losing a connection.
  1118. func DefaultConnectionLostHandler(client Client, reason error) {
  1119. DEBUG.Println("Connection lost:", reason.Error())
  1120. }
  1121. // UpdateLastReceived - Will be called whenever a packet is received off the network
  1122. // This is used by the keepalive routine to
  1123. func (c *client) UpdateLastReceived() {
  1124. if c.options.KeepAlive != 0 {
  1125. c.lastReceived.Store(time.Now())
  1126. }
  1127. }
  1128. // UpdateLastReceived - Will be called whenever a packet is successfully transmitted to the network
  1129. func (c *client) UpdateLastSent() {
  1130. if c.options.KeepAlive != 0 {
  1131. c.lastSent.Store(time.Now())
  1132. }
  1133. }
  1134. // getWriteTimeOut returns the writetimeout (duration to wait when writing to the connection) or 0 if none
  1135. func (c *client) getWriteTimeOut() time.Duration {
  1136. return c.options.WriteTimeout
  1137. }
  1138. // persistOutbound adds the packet to the outbound store
  1139. func (c *client) persistOutbound(m packets.ControlPacket) {
  1140. persistOutbound(c.persist, m)
  1141. }
  1142. // persistInbound adds the packet to the inbound store
  1143. func (c *client) persistInbound(m packets.ControlPacket) {
  1144. persistInbound(c.persist, m)
  1145. }
  1146. // pingRespReceived will be called by the network routines when a ping response is received
  1147. func (c *client) pingRespReceived() {
  1148. atomic.StoreInt32(&c.pingOutstanding, 0)
  1149. }