token.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. /*
  2. * Copyright (c) 2021 IBM Corp and others.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Allan Stockdill-Mander
  15. */
  16. package mqtt
  17. import (
  18. "sync"
  19. "time"
  20. "github.com/eclipse/paho.mqtt.golang/packets"
  21. )
  22. // PacketAndToken is a struct that contains both a ControlPacket and a
  23. // Token. This struct is passed via channels between the client interface
  24. // code and the underlying code responsible for sending and receiving
  25. // MQTT messages.
  26. type PacketAndToken struct {
  27. p packets.ControlPacket
  28. t tokenCompletor
  29. }
  30. // Token defines the interface for the tokens used to indicate when
  31. // actions have completed.
  32. type Token interface {
  33. // Wait will wait indefinitely for the Token to complete, ie the Publish
  34. // to be sent and confirmed receipt from the broker.
  35. Wait() bool
  36. // WaitTimeout takes a time.Duration to wait for the flow associated with the
  37. // Token to complete, returns true if it returned before the timeout or
  38. // returns false if the timeout occurred. In the case of a timeout the Token
  39. // does not have an error set in case the caller wishes to wait again.
  40. WaitTimeout(time.Duration) bool
  41. // Done returns a channel that is closed when the flow associated
  42. // with the Token completes. Clients should call Error after the
  43. // channel is closed to check if the flow completed successfully.
  44. //
  45. // Done is provided for use in select statements. Simple use cases may
  46. // use Wait or WaitTimeout.
  47. Done() <-chan struct{}
  48. Error() error
  49. }
  50. type TokenErrorSetter interface {
  51. setError(error)
  52. }
  53. type tokenCompletor interface {
  54. Token
  55. TokenErrorSetter
  56. flowComplete()
  57. }
  58. type baseToken struct {
  59. m sync.RWMutex
  60. complete chan struct{}
  61. err error
  62. }
  63. // Wait implements the Token Wait method.
  64. func (b *baseToken) Wait() bool {
  65. <-b.complete
  66. return true
  67. }
  68. // WaitTimeout implements the Token WaitTimeout method.
  69. func (b *baseToken) WaitTimeout(d time.Duration) bool {
  70. timer := time.NewTimer(d)
  71. select {
  72. case <-b.complete:
  73. if !timer.Stop() {
  74. <-timer.C
  75. }
  76. return true
  77. case <-timer.C:
  78. }
  79. return false
  80. }
  81. // Done implements the Token Done method.
  82. func (b *baseToken) Done() <-chan struct{} {
  83. return b.complete
  84. }
  85. func (b *baseToken) flowComplete() {
  86. select {
  87. case <-b.complete:
  88. default:
  89. close(b.complete)
  90. }
  91. }
  92. func (b *baseToken) Error() error {
  93. b.m.RLock()
  94. defer b.m.RUnlock()
  95. return b.err
  96. }
  97. func (b *baseToken) setError(e error) {
  98. b.m.Lock()
  99. b.err = e
  100. b.flowComplete()
  101. b.m.Unlock()
  102. }
  103. func newToken(tType byte) tokenCompletor {
  104. switch tType {
  105. case packets.Connect:
  106. return &ConnectToken{baseToken: baseToken{complete: make(chan struct{})}}
  107. case packets.Subscribe:
  108. return &SubscribeToken{baseToken: baseToken{complete: make(chan struct{})}, subResult: make(map[string]byte)}
  109. case packets.Publish:
  110. return &PublishToken{baseToken: baseToken{complete: make(chan struct{})}}
  111. case packets.Unsubscribe:
  112. return &UnsubscribeToken{baseToken: baseToken{complete: make(chan struct{})}}
  113. case packets.Disconnect:
  114. return &DisconnectToken{baseToken: baseToken{complete: make(chan struct{})}}
  115. }
  116. return nil
  117. }
  118. // ConnectToken is an extension of Token containing the extra fields
  119. // required to provide information about calls to Connect()
  120. type ConnectToken struct {
  121. baseToken
  122. returnCode byte
  123. sessionPresent bool
  124. }
  125. // ReturnCode returns the acknowledgement code in the connack sent
  126. // in response to a Connect()
  127. func (c *ConnectToken) ReturnCode() byte {
  128. c.m.RLock()
  129. defer c.m.RUnlock()
  130. return c.returnCode
  131. }
  132. // SessionPresent returns a bool representing the value of the
  133. // session present field in the connack sent in response to a Connect()
  134. func (c *ConnectToken) SessionPresent() bool {
  135. c.m.RLock()
  136. defer c.m.RUnlock()
  137. return c.sessionPresent
  138. }
  139. // PublishToken is an extension of Token containing the extra fields
  140. // required to provide information about calls to Publish()
  141. type PublishToken struct {
  142. baseToken
  143. messageID uint16
  144. }
  145. // MessageID returns the MQTT message ID that was assigned to the
  146. // Publish packet when it was sent to the broker
  147. func (p *PublishToken) MessageID() uint16 {
  148. return p.messageID
  149. }
  150. // SubscribeToken is an extension of Token containing the extra fields
  151. // required to provide information about calls to Subscribe()
  152. type SubscribeToken struct {
  153. baseToken
  154. subs []string
  155. subResult map[string]byte
  156. messageID uint16
  157. }
  158. // Result returns a map of topics that were subscribed to along with
  159. // the matching return code from the broker. This is either the Qos
  160. // value of the subscription or an error code.
  161. func (s *SubscribeToken) Result() map[string]byte {
  162. s.m.RLock()
  163. defer s.m.RUnlock()
  164. return s.subResult
  165. }
  166. // UnsubscribeToken is an extension of Token containing the extra fields
  167. // required to provide information about calls to Unsubscribe()
  168. type UnsubscribeToken struct {
  169. baseToken
  170. messageID uint16
  171. }
  172. // DisconnectToken is an extension of Token containing the extra fields
  173. // required to provide information about calls to Disconnect()
  174. type DisconnectToken struct {
  175. baseToken
  176. }