messageids.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. /*
  2. * Copyright (c) 2013 IBM Corp and others.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Seth Hoenig
  15. * Allan Stockdill-Mander
  16. * Mike Robertson
  17. * Matt Brittan
  18. */
  19. package mqtt
  20. import (
  21. "fmt"
  22. "sync"
  23. "time"
  24. )
  25. // MId is 16 bit message id as specified by the MQTT spec.
  26. // In general, these values should not be depended upon by
  27. // the client application.
  28. type MId uint16
  29. type messageIds struct {
  30. mu sync.RWMutex // Named to prevent Mu from being accessible directly via client
  31. index map[uint16]tokenCompletor
  32. lastIssuedID uint16 // The most recently issued ID. Used so we cycle through ids rather than immediately reusing them (can make debugging easier)
  33. }
  34. const (
  35. midMin uint16 = 1
  36. midMax uint16 = 65535
  37. )
  38. // cleanup clears the message ID map; completes all token types and sets error on PUB, SUB and UNSUB tokens.
  39. func (mids *messageIds) cleanUp() {
  40. mids.mu.Lock()
  41. for _, token := range mids.index {
  42. switch token.(type) {
  43. case *PublishToken:
  44. token.setError(fmt.Errorf("connection lost before Publish completed"))
  45. case *SubscribeToken:
  46. token.setError(fmt.Errorf("connection lost before Subscribe completed"))
  47. case *UnsubscribeToken:
  48. token.setError(fmt.Errorf("connection lost before Unsubscribe completed"))
  49. case nil: // should not be any nil entries
  50. continue
  51. }
  52. token.flowComplete()
  53. }
  54. mids.index = make(map[uint16]tokenCompletor)
  55. mids.mu.Unlock()
  56. DEBUG.Println(MID, "cleaned up")
  57. }
  58. // cleanUpSubscribe removes all SUBSCRIBE and UNSUBSCRIBE tokens (setting error)
  59. // This may be called when the connection is lost, and we will not be resending SUB/UNSUB packets
  60. func (mids *messageIds) cleanUpSubscribe() {
  61. mids.mu.Lock()
  62. for mid, token := range mids.index {
  63. switch token.(type) {
  64. case *SubscribeToken:
  65. token.setError(fmt.Errorf("connection lost before Subscribe completed"))
  66. delete(mids.index, mid)
  67. case *UnsubscribeToken:
  68. token.setError(fmt.Errorf("connection lost before Unsubscribe completed"))
  69. delete(mids.index, mid)
  70. }
  71. }
  72. mids.mu.Unlock()
  73. DEBUG.Println(MID, "cleaned up subs")
  74. }
  75. func (mids *messageIds) freeID(id uint16) {
  76. mids.mu.Lock()
  77. delete(mids.index, id)
  78. mids.mu.Unlock()
  79. }
  80. func (mids *messageIds) claimID(token tokenCompletor, id uint16) {
  81. mids.mu.Lock()
  82. defer mids.mu.Unlock()
  83. if _, ok := mids.index[id]; !ok {
  84. mids.index[id] = token
  85. } else {
  86. old := mids.index[id]
  87. old.flowComplete()
  88. mids.index[id] = token
  89. }
  90. if id > mids.lastIssuedID {
  91. mids.lastIssuedID = id
  92. }
  93. }
  94. // getID will return an available id or 0 if none available
  95. // The id will generally be the previous id + 1 (because this makes tracing messages a bit simpler)
  96. func (mids *messageIds) getID(t tokenCompletor) uint16 {
  97. mids.mu.Lock()
  98. defer mids.mu.Unlock()
  99. i := mids.lastIssuedID // note: the only situation where lastIssuedID is 0 the map will be empty
  100. looped := false // uint16 will loop from 65535->0
  101. for {
  102. i++
  103. if i == 0 { // skip 0 because its not a valid id (Control Packets MUST contain a non-zero 16-bit Packet Identifier [MQTT-2.3.1-1])
  104. i++
  105. looped = true
  106. }
  107. if _, ok := mids.index[i]; !ok {
  108. mids.index[i] = t
  109. mids.lastIssuedID = i
  110. return i
  111. }
  112. if (looped && i == mids.lastIssuedID) || (mids.lastIssuedID == 0 && i == midMax) { // lastIssuedID will be 0 at startup
  113. return 0 // no free ids
  114. }
  115. }
  116. }
  117. func (mids *messageIds) getToken(id uint16) tokenCompletor {
  118. mids.mu.RLock()
  119. defer mids.mu.RUnlock()
  120. if token, ok := mids.index[id]; ok {
  121. return token
  122. }
  123. return &DummyToken{id: id}
  124. }
  125. type DummyToken struct {
  126. id uint16
  127. }
  128. // Wait implements the Token Wait method.
  129. func (d *DummyToken) Wait() bool {
  130. return true
  131. }
  132. // WaitTimeout implements the Token WaitTimeout method.
  133. func (d *DummyToken) WaitTimeout(t time.Duration) bool {
  134. return true
  135. }
  136. // Done implements the Token Done method.
  137. func (d *DummyToken) Done() <-chan struct{} {
  138. ch := make(chan struct{})
  139. close(ch)
  140. return ch
  141. }
  142. func (d *DummyToken) flowComplete() {
  143. ERROR.Printf("A lookup for token %d returned nil\n", d.id)
  144. }
  145. func (d *DummyToken) Error() error {
  146. return nil
  147. }
  148. func (d *DummyToken) setError(e error) {}
  149. // PlaceHolderToken does nothing and was implemented to allow a messageid to be reserved
  150. // it differs from DummyToken in that calling flowComplete does not generate an error (it
  151. // is expected that flowComplete will be called when the token is overwritten with a real token)
  152. type PlaceHolderToken struct {
  153. id uint16
  154. }
  155. // Wait implements the Token Wait method.
  156. func (p *PlaceHolderToken) Wait() bool {
  157. return true
  158. }
  159. // WaitTimeout implements the Token WaitTimeout method.
  160. func (p *PlaceHolderToken) WaitTimeout(t time.Duration) bool {
  161. return true
  162. }
  163. // Done implements the Token Done method.
  164. func (p *PlaceHolderToken) Done() <-chan struct{} {
  165. ch := make(chan struct{})
  166. close(ch)
  167. return ch
  168. }
  169. func (p *PlaceHolderToken) flowComplete() {
  170. }
  171. func (p *PlaceHolderToken) Error() error {
  172. return nil
  173. }
  174. func (p *PlaceHolderToken) setError(e error) {}