status.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. /*
  2. * Copyright (c) 2021 IBM Corp and others.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Seth Hoenig
  15. * Allan Stockdill-Mander
  16. * Mike Robertson
  17. * Matt Brittan
  18. */
  19. package mqtt
  20. import (
  21. "errors"
  22. "sync"
  23. )
  24. // Status - Manage the connection status
  25. // Multiple go routines will want to access/set this. Previously status was implemented as a `uint32` and updated
  26. // with a mixture of atomic functions and a mutex (leading to some deadlock type issues that were very hard to debug).
  27. // In this new implementation `connectionStatus` takes over managing the state and provides functions that allow the
  28. // client to request a move to a particular state (it may reject these requests!). In some cases the 'state' is
  29. // transitory, for example `connecting`, in those cases a function will be returned that allows the client to move
  30. // to a more static state (`disconnected` or `connected`).
  31. // This "belts-and-braces" may be a little over the top but issues with the status have caused a number of difficult
  32. // to trace bugs in the past and the likelihood that introducing a new system would introduce bugs seemed high!
  33. // I have written this in a way that should make it very difficult to misuse it (but it does make things a little
  34. // complex with functions returning functions that return functions!).
  35. type status uint32
  36. const (
  37. disconnected status = iota // default (nil) status is disconnected
  38. disconnecting // Transitioning from one of the below states back to disconnected
  39. connecting
  40. reconnecting
  41. connected
  42. )
  43. // String simplify output of statuses
  44. func (s status) String() string {
  45. switch s {
  46. case disconnected:
  47. return "disconnected"
  48. case disconnecting:
  49. return "disconnecting"
  50. case connecting:
  51. return "connecting"
  52. case reconnecting:
  53. return "reconnecting"
  54. case connected:
  55. return "connected"
  56. default:
  57. return "invalid"
  58. }
  59. }
  60. type connCompletedFn func(success bool) error
  61. type disconnectCompletedFn func()
  62. type connectionLostHandledFn func(bool) (connCompletedFn, error)
  63. /* State transitions
  64. static states are `disconnected` and `connected`. For all other states a process will hold a function that will move
  65. the state to one of those. That function effectively owns the state and any other changes must not proceed until it
  66. completes. One exception to that is that the state can always be moved to `disconnecting` which provides a signal that
  67. transitions to `connected` will be rejected (this is required because a Disconnect can be requested while in the
  68. Connecting state).
  69. # Basic Operations
  70. The standard workflows are:
  71. disconnected -> `Connecting()` -> connecting -> `connCompletedFn(true)` -> connected
  72. connected -> `Disconnecting()` -> disconnecting -> `disconnectCompletedFn()` -> disconnected
  73. connected -> `ConnectionLost(false)` -> disconnecting -> `connectionLostHandledFn(true/false)` -> disconnected
  74. connected -> `ConnectionLost(true)` -> disconnecting -> `connectionLostHandledFn(true)` -> connected
  75. Unfortunately the above workflows are complicated by the fact that `Disconnecting()` or `ConnectionLost()` may,
  76. potentially, be called at any time (i.e. whilst in the middle of transitioning between states). If this happens:
  77. * The state will be set to disconnecting (which will prevent any request to move the status to connected)
  78. * The call to `Disconnecting()`/`ConnectionLost()` will block until the previously active call completes and then
  79. handle the disconnection.
  80. Reading the tests (unit_status_test.go) might help understand these rules.
  81. */
  82. var (
  83. errAbortConnection = errors.New("disconnect called whist connection attempt in progress")
  84. errAlreadyConnectedOrReconnecting = errors.New("status is already connected or reconnecting")
  85. errStatusMustBeDisconnected = errors.New("status can only transition to connecting from disconnected")
  86. errAlreadyDisconnected = errors.New("status is already disconnected")
  87. errDisconnectionRequested = errors.New("disconnection was requested whilst the action was in progress")
  88. errDisconnectionInProgress = errors.New("disconnection already in progress")
  89. errAlreadyHandlingConnectionLoss = errors.New("status is already Connection Lost")
  90. errConnLossWhileDisconnecting = errors.New("connection status is disconnecting so loss of connection is expected")
  91. )
  92. // connectionStatus encapsulates, and protects, the connection status.
  93. type connectionStatus struct {
  94. sync.RWMutex // Protects the variables below
  95. status status
  96. willReconnect bool // only used when status == disconnecting. Indicates that an attempt will be made to reconnect (allows us to abort that)
  97. // Some statuses are transitional (e.g. connecting, connectionLost, reconnecting, disconnecting), that is, whatever
  98. // process moves us into that status will move us out of it when an action is complete. Sometimes other users
  99. // will need to know when the action is complete (e.g. the user calls `Disconnect()` whilst the status is
  100. // `connecting`). `actionCompleted` will be set whenever we move into one of the above statues and the channel
  101. // returned to anything else requesting a status change. The channel will be closed when the operation is complete.
  102. actionCompleted chan struct{} // Only valid whilst status is Connecting or Reconnecting; will be closed when connection completed (success or failure)
  103. }
  104. // ConnectionStatus returns the connection status.
  105. // WARNING: the status may change at any time so users should not assume they are the only goroutine touching this
  106. func (c *connectionStatus) ConnectionStatus() status {
  107. c.RLock()
  108. defer c.RUnlock()
  109. return c.status
  110. }
  111. // ConnectionStatusRetry returns the connection status and retry flag (indicates that we expect to reconnect).
  112. // WARNING: the status may change at any time so users should not assume they are the only goroutine touching this
  113. func (c *connectionStatus) ConnectionStatusRetry() (status, bool) {
  114. c.RLock()
  115. defer c.RUnlock()
  116. return c.status, c.willReconnect
  117. }
  118. // Connecting - Changes the status to connecting if that is a permitted operation
  119. // Will do nothing unless the current status is disconnected
  120. // Returns a function that MUST be called when the operation is complete (pass in true if successful)
  121. func (c *connectionStatus) Connecting() (connCompletedFn, error) {
  122. c.Lock()
  123. defer c.Unlock()
  124. // Calling Connect when already connecting (or if reconnecting) may not always be considered an error
  125. if c.status == connected || c.status == reconnecting {
  126. return nil, errAlreadyConnectedOrReconnecting
  127. }
  128. if c.status != disconnected {
  129. return nil, errStatusMustBeDisconnected
  130. }
  131. c.status = connecting
  132. c.actionCompleted = make(chan struct{})
  133. return c.connected, nil
  134. }
  135. // connected is an internal function (it is returned by functions that set the status to connecting or reconnecting,
  136. // calling it completes the operation). `success` is used to indicate whether the operation was successfully completed.
  137. func (c *connectionStatus) connected(success bool) error {
  138. c.Lock()
  139. defer func() {
  140. close(c.actionCompleted) // Alert anything waiting on the connection process to complete
  141. c.actionCompleted = nil // Be tidy
  142. c.Unlock()
  143. }()
  144. // Status may have moved to disconnecting in the interim (i.e. at users request)
  145. if c.status == disconnecting {
  146. return errAbortConnection
  147. }
  148. if success {
  149. c.status = connected
  150. } else {
  151. c.status = disconnected
  152. }
  153. return nil
  154. }
  155. // Disconnecting - should be called when beginning the disconnection process (cleanup etc.).
  156. // Can be called from ANY status and the end result will always be a status of disconnected
  157. // Note that if a connection/reconnection attempt is in progress this function will set the status to `disconnecting`
  158. // then block until the connection process completes (or aborts).
  159. // Returns a function that MUST be called when the operation is complete (assumed to always be successful!)
  160. func (c *connectionStatus) Disconnecting() (disconnectCompletedFn, error) {
  161. c.Lock()
  162. if c.status == disconnected {
  163. c.Unlock()
  164. return nil, errAlreadyDisconnected // May not always be treated as an error
  165. }
  166. if c.status == disconnecting { // Need to wait for existing process to complete
  167. c.willReconnect = false // Ensure that the existing disconnect process will not reconnect
  168. disConnectDone := c.actionCompleted
  169. c.Unlock()
  170. <-disConnectDone // Wait for existing operation to complete
  171. return nil, errAlreadyDisconnected // Well we are now!
  172. }
  173. prevStatus := c.status
  174. c.status = disconnecting
  175. // We may need to wait for connection/reconnection process to complete (they should regularly check the status)
  176. if prevStatus == connecting || prevStatus == reconnecting {
  177. connectDone := c.actionCompleted
  178. c.Unlock() // Safe because the only way to leave the disconnecting status is via this function
  179. <-connectDone
  180. if prevStatus == reconnecting && !c.willReconnect {
  181. return nil, errAlreadyDisconnected // Following connectionLost process we will be disconnected
  182. }
  183. c.Lock()
  184. }
  185. c.actionCompleted = make(chan struct{})
  186. c.Unlock()
  187. return c.disconnectionCompleted, nil
  188. }
  189. // disconnectionCompleted is an internal function (it is returned by functions that set the status to disconnecting)
  190. func (c *connectionStatus) disconnectionCompleted() {
  191. c.Lock()
  192. defer c.Unlock()
  193. c.status = disconnected
  194. close(c.actionCompleted) // Alert anything waiting on the connection process to complete
  195. c.actionCompleted = nil
  196. }
  197. // ConnectionLost - should be called when the connection is lost.
  198. // This really only differs from Disconnecting in that we may transition into a reconnection (but that could be
  199. // cancelled something else calls Disconnecting in the meantime).
  200. // The returned function should be called when cleanup is completed. It will return a function to be called when
  201. // reconnect completes (or nil if no reconnect requested/disconnect called in the interim).
  202. // Note: This function may block if a connection is in progress (the move to connected will be rejected)
  203. func (c *connectionStatus) ConnectionLost(willReconnect bool) (connectionLostHandledFn, error) {
  204. c.Lock()
  205. defer c.Unlock()
  206. if c.status == disconnected {
  207. return nil, errAlreadyDisconnected
  208. }
  209. if c.status == disconnecting { // its expected that connection lost will be called during the disconnection process
  210. return nil, errDisconnectionInProgress
  211. }
  212. c.willReconnect = willReconnect
  213. prevStatus := c.status
  214. c.status = disconnecting
  215. // There is a slight possibility that a connection attempt is in progress (connection up and goroutines started but
  216. // status not yet changed). By changing the status we ensure that process will exit cleanly
  217. if prevStatus == connecting || prevStatus == reconnecting {
  218. connectDone := c.actionCompleted
  219. c.Unlock() // Safe because the only way to leave the disconnecting status is via this function
  220. <-connectDone
  221. c.Lock()
  222. if !willReconnect {
  223. // In this case the connection will always be aborted so there is nothing more for us to do
  224. return nil, errAlreadyDisconnected
  225. }
  226. }
  227. c.actionCompleted = make(chan struct{})
  228. return c.getConnectionLostHandler(willReconnect), nil
  229. }
  230. // getConnectionLostHandler is an internal function. It returns the function to be returned by ConnectionLost
  231. func (c *connectionStatus) getConnectionLostHandler(reconnectRequested bool) connectionLostHandledFn {
  232. return func(proceed bool) (connCompletedFn, error) {
  233. // Note that connCompletedFn will only be provided if both reconnectRequested and proceed are true
  234. c.Lock()
  235. defer c.Unlock()
  236. // `Disconnecting()` may have been called while the disconnection was being processed (this makes it permanent!)
  237. if !c.willReconnect || !proceed {
  238. c.status = disconnected
  239. close(c.actionCompleted) // Alert anything waiting on the connection process to complete
  240. c.actionCompleted = nil
  241. if !reconnectRequested || !proceed {
  242. return nil, nil
  243. }
  244. return nil, errDisconnectionRequested
  245. }
  246. c.status = reconnecting
  247. return c.connected, nil // Note that c.actionCompleted is still live and will be closed in connected
  248. }
  249. }
  250. // forceConnectionStatus - forces the connection status to the specified value.
  251. // This should only be used when there is no alternative (i.e. only in tests and to recover from situations that
  252. // are unexpected)
  253. func (c *connectionStatus) forceConnectionStatus(s status) {
  254. c.Lock()
  255. defer c.Unlock()
  256. c.status = s
  257. }