memstore_ordered.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. /*
  2. * Copyright (c) 2021 IBM Corp and others.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Seth Hoenig
  15. * Allan Stockdill-Mander
  16. * Mike Robertson
  17. * Matt Brittan
  18. */
  19. package mqtt
  20. import (
  21. "sort"
  22. "sync"
  23. "time"
  24. "github.com/eclipse/paho.mqtt.golang/packets"
  25. )
  26. // OrderedMemoryStore uses a map internally so the order in which All() returns packets is
  27. // undefined. OrderedMemoryStore resolves this by storing the time the message is added
  28. // and sorting based upon this.
  29. // storedMessage encapsulates a message and the time it was initially stored
  30. type storedMessage struct {
  31. ts time.Time
  32. msg packets.ControlPacket
  33. }
  34. // OrderedMemoryStore implements the store interface to provide a "persistence"
  35. // mechanism wholly stored in memory. This is only useful for
  36. // as long as the client instance exists.
  37. type OrderedMemoryStore struct {
  38. sync.RWMutex
  39. messages map[string]storedMessage
  40. opened bool
  41. }
  42. // NewOrderedMemoryStore returns a pointer to a new instance of
  43. // OrderedMemoryStore, the instance is not initialized and ready to
  44. // use until Open() has been called on it.
  45. func NewOrderedMemoryStore() *OrderedMemoryStore {
  46. store := &OrderedMemoryStore{
  47. messages: make(map[string]storedMessage),
  48. opened: false,
  49. }
  50. return store
  51. }
  52. // Open initializes a OrderedMemoryStore instance.
  53. func (store *OrderedMemoryStore) Open() {
  54. store.Lock()
  55. defer store.Unlock()
  56. store.opened = true
  57. DEBUG.Println(STR, "OrderedMemoryStore initialized")
  58. }
  59. // Put takes a key and a pointer to a Message and stores the
  60. // message.
  61. func (store *OrderedMemoryStore) Put(key string, message packets.ControlPacket) {
  62. store.Lock()
  63. defer store.Unlock()
  64. if !store.opened {
  65. ERROR.Println(STR, "Trying to use memory store, but not open")
  66. return
  67. }
  68. store.messages[key] = storedMessage{ts: time.Now(), msg: message}
  69. }
  70. // Get takes a key and looks in the store for a matching Message
  71. // returning either the Message pointer or nil.
  72. func (store *OrderedMemoryStore) Get(key string) packets.ControlPacket {
  73. store.RLock()
  74. defer store.RUnlock()
  75. if !store.opened {
  76. ERROR.Println(STR, "Trying to use memory store, but not open")
  77. return nil
  78. }
  79. mid := mIDFromKey(key)
  80. m, ok := store.messages[key]
  81. if !ok || m.msg == nil {
  82. CRITICAL.Println(STR, "OrderedMemoryStore get: message", mid, "not found")
  83. } else {
  84. DEBUG.Println(STR, "OrderedMemoryStore get: message", mid, "found")
  85. }
  86. return m.msg
  87. }
  88. // All returns a slice of strings containing all the keys currently
  89. // in the OrderedMemoryStore.
  90. func (store *OrderedMemoryStore) All() []string {
  91. store.RLock()
  92. defer store.RUnlock()
  93. if !store.opened {
  94. ERROR.Println(STR, "Trying to use memory store, but not open")
  95. return nil
  96. }
  97. type tsAndKey struct {
  98. ts time.Time
  99. key string
  100. }
  101. tsKeys := make([]tsAndKey, 0, len(store.messages))
  102. for k, v := range store.messages {
  103. tsKeys = append(tsKeys, tsAndKey{ts: v.ts, key: k})
  104. }
  105. sort.Slice(tsKeys, func(a int, b int) bool { return tsKeys[a].ts.Before(tsKeys[b].ts) })
  106. keys := make([]string, len(tsKeys))
  107. for i := range tsKeys {
  108. keys[i] = tsKeys[i].key
  109. }
  110. return keys
  111. }
  112. // Del takes a key, searches the OrderedMemoryStore and if the key is found
  113. // deletes the Message pointer associated with it.
  114. func (store *OrderedMemoryStore) Del(key string) {
  115. store.Lock()
  116. defer store.Unlock()
  117. if !store.opened {
  118. ERROR.Println(STR, "Trying to use memory store, but not open")
  119. return
  120. }
  121. mid := mIDFromKey(key)
  122. _, ok := store.messages[key]
  123. if !ok {
  124. WARN.Println(STR, "OrderedMemoryStore del: message", mid, "not found")
  125. } else {
  126. delete(store.messages, key)
  127. DEBUG.Println(STR, "OrderedMemoryStore del: message", mid, "was deleted")
  128. }
  129. }
  130. // Close will disallow modifications to the state of the store.
  131. func (store *OrderedMemoryStore) Close() {
  132. store.Lock()
  133. defer store.Unlock()
  134. if !store.opened {
  135. ERROR.Println(STR, "Trying to close memory store, but not open")
  136. return
  137. }
  138. store.opened = false
  139. DEBUG.Println(STR, "OrderedMemoryStore closed")
  140. }
  141. // Reset eliminates all persisted message data in the store.
  142. func (store *OrderedMemoryStore) Reset() {
  143. store.Lock()
  144. defer store.Unlock()
  145. if !store.opened {
  146. ERROR.Println(STR, "Trying to reset memory store, but not open")
  147. }
  148. store.messages = make(map[string]storedMessage)
  149. WARN.Println(STR, "OrderedMemoryStore wiped")
  150. }