filestore.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. /*
  2. * Copyright (c) 2021 IBM Corp and others.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Seth Hoenig
  15. * Allan Stockdill-Mander
  16. * Mike Robertson
  17. */
  18. package mqtt
  19. import (
  20. "io/ioutil"
  21. "os"
  22. "path"
  23. "sort"
  24. "sync"
  25. "github.com/eclipse/paho.mqtt.golang/packets"
  26. )
  27. const (
  28. msgExt = ".msg"
  29. tmpExt = ".tmp"
  30. corruptExt = ".CORRUPT"
  31. )
  32. // FileStore implements the store interface using the filesystem to provide
  33. // true persistence, even across client failure. This is designed to use a
  34. // single directory per running client. If you are running multiple clients
  35. // on the same filesystem, you will need to be careful to specify unique
  36. // store directories for each.
  37. type FileStore struct {
  38. sync.RWMutex
  39. directory string
  40. opened bool
  41. }
  42. // NewFileStore will create a new FileStore which stores its messages in the
  43. // directory provided.
  44. func NewFileStore(directory string) *FileStore {
  45. store := &FileStore{
  46. directory: directory,
  47. opened: false,
  48. }
  49. return store
  50. }
  51. // Open will allow the FileStore to be used.
  52. func (store *FileStore) Open() {
  53. store.Lock()
  54. defer store.Unlock()
  55. // if no store directory was specified in ClientOpts, by default use the
  56. // current working directory
  57. if store.directory == "" {
  58. store.directory, _ = os.Getwd()
  59. }
  60. // if store dir exists, great, otherwise, create it
  61. if !exists(store.directory) {
  62. perms := os.FileMode(0770)
  63. merr := os.MkdirAll(store.directory, perms)
  64. chkerr(merr)
  65. }
  66. store.opened = true
  67. DEBUG.Println(STR, "store is opened at", store.directory)
  68. }
  69. // Close will disallow the FileStore from being used.
  70. func (store *FileStore) Close() {
  71. store.Lock()
  72. defer store.Unlock()
  73. store.opened = false
  74. DEBUG.Println(STR, "store is closed")
  75. }
  76. // Put will put a message into the store, associated with the provided
  77. // key value.
  78. func (store *FileStore) Put(key string, m packets.ControlPacket) {
  79. store.Lock()
  80. defer store.Unlock()
  81. if !store.opened {
  82. ERROR.Println(STR, "Trying to use file store, but not open")
  83. return
  84. }
  85. full := fullpath(store.directory, key)
  86. write(store.directory, key, m)
  87. if !exists(full) {
  88. ERROR.Println(STR, "file not created:", full)
  89. }
  90. }
  91. // Get will retrieve a message from the store, the one associated with
  92. // the provided key value.
  93. func (store *FileStore) Get(key string) packets.ControlPacket {
  94. store.RLock()
  95. defer store.RUnlock()
  96. if !store.opened {
  97. ERROR.Println(STR, "trying to use file store, but not open")
  98. return nil
  99. }
  100. filepath := fullpath(store.directory, key)
  101. if !exists(filepath) {
  102. return nil
  103. }
  104. mfile, oerr := os.Open(filepath)
  105. chkerr(oerr)
  106. msg, rerr := packets.ReadPacket(mfile)
  107. chkerr(mfile.Close())
  108. // Message was unreadable, return nil
  109. if rerr != nil {
  110. newpath := corruptpath(store.directory, key)
  111. WARN.Println(STR, "corrupted file detected:", rerr.Error(), "archived at:", newpath)
  112. if err := os.Rename(filepath, newpath); err != nil {
  113. ERROR.Println(STR, err)
  114. }
  115. return nil
  116. }
  117. return msg
  118. }
  119. // All will provide a list of all of the keys associated with messages
  120. // currently residing in the FileStore.
  121. func (store *FileStore) All() []string {
  122. store.RLock()
  123. defer store.RUnlock()
  124. return store.all()
  125. }
  126. // Del will remove the persisted message associated with the provided
  127. // key from the FileStore.
  128. func (store *FileStore) Del(key string) {
  129. store.Lock()
  130. defer store.Unlock()
  131. store.del(key)
  132. }
  133. // Reset will remove all persisted messages from the FileStore.
  134. func (store *FileStore) Reset() {
  135. store.Lock()
  136. defer store.Unlock()
  137. WARN.Println(STR, "FileStore Reset")
  138. for _, key := range store.all() {
  139. store.del(key)
  140. }
  141. }
  142. // lockless
  143. func (store *FileStore) all() []string {
  144. var err error
  145. var keys []string
  146. var files fileInfos
  147. if !store.opened {
  148. ERROR.Println(STR, "trying to use file store, but not open")
  149. return nil
  150. }
  151. files, err = ioutil.ReadDir(store.directory)
  152. chkerr(err)
  153. sort.Sort(files)
  154. for _, f := range files {
  155. DEBUG.Println(STR, "file in All():", f.Name())
  156. name := f.Name()
  157. if len(name) < len(msgExt) || name[len(name)-len(msgExt):] != msgExt {
  158. DEBUG.Println(STR, "skipping file, doesn't have right extension: ", name)
  159. continue
  160. }
  161. key := name[0 : len(name)-4] // remove file extension
  162. keys = append(keys, key)
  163. }
  164. return keys
  165. }
  166. // lockless
  167. func (store *FileStore) del(key string) {
  168. if !store.opened {
  169. ERROR.Println(STR, "trying to use file store, but not open")
  170. return
  171. }
  172. DEBUG.Println(STR, "store del filepath:", store.directory)
  173. DEBUG.Println(STR, "store delete key:", key)
  174. filepath := fullpath(store.directory, key)
  175. DEBUG.Println(STR, "path of deletion:", filepath)
  176. if !exists(filepath) {
  177. WARN.Println(STR, "store could not delete key:", key)
  178. return
  179. }
  180. rerr := os.Remove(filepath)
  181. chkerr(rerr)
  182. DEBUG.Println(STR, "del msg:", key)
  183. if exists(filepath) {
  184. ERROR.Println(STR, "file not deleted:", filepath)
  185. }
  186. }
  187. func fullpath(store string, key string) string {
  188. p := path.Join(store, key+msgExt)
  189. return p
  190. }
  191. func tmppath(store string, key string) string {
  192. p := path.Join(store, key+tmpExt)
  193. return p
  194. }
  195. func corruptpath(store string, key string) string {
  196. p := path.Join(store, key+corruptExt)
  197. return p
  198. }
  199. // create file called "X.[messageid].tmp" located in the store
  200. // the contents of the file is the bytes of the message, then
  201. // rename it to "X.[messageid].msg", overwriting any existing
  202. // message with the same id
  203. // X will be 'i' for inbound messages, and O for outbound messages
  204. func write(store, key string, m packets.ControlPacket) {
  205. temppath := tmppath(store, key)
  206. f, err := os.Create(temppath)
  207. chkerr(err)
  208. werr := m.Write(f)
  209. chkerr(werr)
  210. cerr := f.Close()
  211. chkerr(cerr)
  212. rerr := os.Rename(temppath, fullpath(store, key))
  213. chkerr(rerr)
  214. }
  215. func exists(file string) bool {
  216. if _, err := os.Stat(file); err != nil {
  217. if os.IsNotExist(err) {
  218. return false
  219. }
  220. chkerr(err)
  221. }
  222. return true
  223. }
  224. type fileInfos []os.FileInfo
  225. func (f fileInfos) Len() int {
  226. return len(f)
  227. }
  228. func (f fileInfos) Swap(i, j int) {
  229. f[i], f[j] = f[j], f[i]
  230. }
  231. func (f fileInfos) Less(i, j int) bool {
  232. return f[i].ModTime().Before(f[j].ModTime())
  233. }