token.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. /*
  2. * Copyright (c) 2014 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Allan Stockdill-Mander
  11. */
  12. package mqtt
  13. import (
  14. "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets"
  15. "sync"
  16. "time"
  17. )
  18. //PacketAndToken is a struct that contains both a ControlPacket and a
  19. //Token. This struct is passed via channels between the client interface
  20. //code and the underlying code responsible for sending and receiving
  21. //MQTT messages.
  22. type PacketAndToken struct {
  23. p packets.ControlPacket
  24. t Token
  25. }
  26. //Token defines the interface for the tokens used to indicate when
  27. //actions have completed.
  28. type Token interface {
  29. Wait() bool
  30. WaitTimeout(time.Duration) bool
  31. flowComplete()
  32. Error() error
  33. }
  34. type baseToken struct {
  35. m sync.RWMutex
  36. complete chan struct{}
  37. ready bool
  38. err error
  39. }
  40. // Wait will wait indefinitely for the Token to complete, ie the Publish
  41. // to be sent and confirmed receipt from the broker
  42. func (b *baseToken) Wait() bool {
  43. b.m.Lock()
  44. defer b.m.Unlock()
  45. if !b.ready {
  46. <-b.complete
  47. b.ready = true
  48. }
  49. return b.ready
  50. }
  51. // WaitTimeout takes a time in ms to wait for the flow associated with the
  52. // Token to complete, returns true if it returned before the timeout or
  53. // returns false if the timeout occurred. In the case of a timeout the Token
  54. // does not have an error set in case the caller wishes to wait again
  55. func (b *baseToken) WaitTimeout(d time.Duration) bool {
  56. b.m.Lock()
  57. defer b.m.Unlock()
  58. if !b.ready {
  59. select {
  60. case <-b.complete:
  61. b.ready = true
  62. case <-time.After(d):
  63. }
  64. }
  65. return b.ready
  66. }
  67. func (b *baseToken) flowComplete() {
  68. close(b.complete)
  69. }
  70. func (b *baseToken) Error() error {
  71. b.m.RLock()
  72. defer b.m.RUnlock()
  73. return b.err
  74. }
  75. func newToken(tType byte) Token {
  76. switch tType {
  77. case packets.Connect:
  78. return &ConnectToken{baseToken: baseToken{complete: make(chan struct{})}}
  79. case packets.Subscribe:
  80. return &SubscribeToken{baseToken: baseToken{complete: make(chan struct{})}, subResult: make(map[string]byte)}
  81. case packets.Publish:
  82. return &PublishToken{baseToken: baseToken{complete: make(chan struct{})}}
  83. case packets.Unsubscribe:
  84. return &UnsubscribeToken{baseToken: baseToken{complete: make(chan struct{})}}
  85. case packets.Disconnect:
  86. return &DisconnectToken{baseToken: baseToken{complete: make(chan struct{})}}
  87. }
  88. return nil
  89. }
  90. //ConnectToken is an extension of Token containing the extra fields
  91. //required to provide information about calls to Connect()
  92. type ConnectToken struct {
  93. baseToken
  94. returnCode byte
  95. }
  96. //ReturnCode returns the acknowlegement code in the connack sent
  97. //in response to a Connect()
  98. func (c *ConnectToken) ReturnCode() byte {
  99. c.m.RLock()
  100. defer c.m.RUnlock()
  101. return c.returnCode
  102. }
  103. //PublishToken is an extension of Token containing the extra fields
  104. //required to provide information about calls to Publish()
  105. type PublishToken struct {
  106. baseToken
  107. messageID uint16
  108. }
  109. //MessageID returns the MQTT message ID that was assigned to the
  110. //Publish packet when it was sent to the broker
  111. func (p *PublishToken) MessageID() uint16 {
  112. return p.messageID
  113. }
  114. //SubscribeToken is an extension of Token containing the extra fields
  115. //required to provide information about calls to Subscribe()
  116. type SubscribeToken struct {
  117. baseToken
  118. subs []string
  119. subResult map[string]byte
  120. }
  121. //Result returns a map of topics that were subscribed to along with
  122. //the matching return code from the broker. This is either the Qos
  123. //value of the subscription or an error code.
  124. func (s *SubscribeToken) Result() map[string]byte {
  125. s.m.RLock()
  126. defer s.m.RUnlock()
  127. return s.subResult
  128. }
  129. //UnsubscribeToken is an extension of Token containing the extra fields
  130. //required to provide information about calls to Unsubscribe()
  131. type UnsubscribeToken struct {
  132. baseToken
  133. }
  134. //DisconnectToken is an extension of Token containing the extra fields
  135. //required to provide information about calls to Disconnect()
  136. type DisconnectToken struct {
  137. baseToken
  138. }