filestore.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. /*
  2. * Copyright (c) 2013 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Seth Hoenig
  11. * Allan Stockdill-Mander
  12. * Mike Robertson
  13. */
  14. package mqtt
  15. import (
  16. "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets"
  17. "io"
  18. "io/ioutil"
  19. "os"
  20. "path"
  21. "sync"
  22. )
  23. const (
  24. msgExt = ".msg"
  25. bkpExt = ".bkp"
  26. )
  27. // FileStore implements the store interface using the filesystem to provide
  28. // true persistence, even across client failure. This is designed to use a
  29. // single directory per running client. If you are running multiple clients
  30. // on the same filesystem, you will need to be careful to specify unique
  31. // store directories for each.
  32. type FileStore struct {
  33. sync.RWMutex
  34. directory string
  35. opened bool
  36. }
  37. // NewFileStore will create a new FileStore which stores its messages in the
  38. // directory provided.
  39. func NewFileStore(directory string) *FileStore {
  40. store := &FileStore{
  41. directory: directory,
  42. opened: false,
  43. }
  44. return store
  45. }
  46. // Open will allow the FileStore to be used.
  47. func (store *FileStore) Open() {
  48. store.Lock()
  49. defer store.Unlock()
  50. // if no store directory was specified in ClientOpts, by default use the
  51. // current working directory
  52. if store.directory == "" {
  53. store.directory, _ = os.Getwd()
  54. }
  55. // if store dir exists, great, otherwise, create it
  56. if !exists(store.directory) {
  57. perms := os.FileMode(0770)
  58. merr := os.MkdirAll(store.directory, perms)
  59. chkerr(merr)
  60. }
  61. store.opened = true
  62. DEBUG.Println(STR, "store is opened at", store.directory)
  63. }
  64. // Close will disallow the FileStore from being used.
  65. func (store *FileStore) Close() {
  66. store.Lock()
  67. defer store.Unlock()
  68. store.opened = false
  69. WARN.Println(STR, "store is not open")
  70. }
  71. // Put will put a message into the store, associated with the provided
  72. // key value.
  73. func (store *FileStore) Put(key string, m packets.ControlPacket) {
  74. store.Lock()
  75. defer store.Unlock()
  76. chkcond(store.opened)
  77. full := fullpath(store.directory, key)
  78. if exists(full) {
  79. backup(store.directory, key) // make a copy of what already exists
  80. defer unbackup(store.directory, key)
  81. }
  82. write(store.directory, key, m)
  83. chkcond(exists(full))
  84. }
  85. // Get will retrieve a message from the store, the one associated with
  86. // the provided key value.
  87. func (store *FileStore) Get(key string) packets.ControlPacket {
  88. store.RLock()
  89. defer store.RUnlock()
  90. chkcond(store.opened)
  91. filepath := fullpath(store.directory, key)
  92. if !exists(filepath) {
  93. return nil
  94. }
  95. mfile, oerr := os.Open(filepath)
  96. chkerr(oerr)
  97. //all, rerr := ioutil.ReadAll(mfile)
  98. //chkerr(rerr)
  99. msg, rerr := packets.ReadPacket(mfile)
  100. chkerr(rerr)
  101. cerr := mfile.Close()
  102. chkerr(cerr)
  103. return msg
  104. }
  105. // All will provide a list of all of the keys associated with messages
  106. // currenly residing in the FileStore.
  107. func (store *FileStore) All() []string {
  108. store.RLock()
  109. defer store.RUnlock()
  110. return store.all()
  111. }
  112. // Del will remove the persisted message associated with the provided
  113. // key from the FileStore.
  114. func (store *FileStore) Del(key string) {
  115. store.Lock()
  116. defer store.Unlock()
  117. store.del(key)
  118. }
  119. // Reset will remove all persisted messages from the FileStore.
  120. func (store *FileStore) Reset() {
  121. store.Lock()
  122. defer store.Unlock()
  123. WARN.Println(STR, "FileStore Reset")
  124. for _, key := range store.all() {
  125. store.del(key)
  126. }
  127. }
  128. // lockless
  129. func (store *FileStore) all() []string {
  130. chkcond(store.opened)
  131. keys := []string{}
  132. files, rderr := ioutil.ReadDir(store.directory)
  133. chkerr(rderr)
  134. for _, f := range files {
  135. DEBUG.Println(STR, "file in All():", f.Name())
  136. key := f.Name()[0 : len(f.Name())-4] // remove file extension
  137. keys = append(keys, key)
  138. }
  139. return keys
  140. }
  141. // lockless
  142. func (store *FileStore) del(key string) {
  143. chkcond(store.opened)
  144. DEBUG.Println(STR, "store del filepath:", store.directory)
  145. DEBUG.Println(STR, "store delete key:", key)
  146. filepath := fullpath(store.directory, key)
  147. DEBUG.Println(STR, "path of deletion:", filepath)
  148. if !exists(filepath) {
  149. WARN.Println(STR, "store could not delete key:", key)
  150. return
  151. }
  152. rerr := os.Remove(filepath)
  153. chkerr(rerr)
  154. DEBUG.Println(STR, "del msg:", key)
  155. chkcond(!exists(filepath))
  156. }
  157. func fullpath(store string, key string) string {
  158. p := path.Join(store, key+msgExt)
  159. return p
  160. }
  161. func bkppath(store string, key string) string {
  162. p := path.Join(store, key+bkpExt)
  163. return p
  164. }
  165. // create file called "X.[messageid].msg" located in the store
  166. // the contents of the file is the bytes of the message
  167. // if a message with m's message id already exists, it will
  168. // be overwritten
  169. // X will be 'i' for inbound messages, and O for outbound messages
  170. func write(store, key string, m packets.ControlPacket) {
  171. filepath := fullpath(store, key)
  172. f, err := os.Create(filepath)
  173. chkerr(err)
  174. werr := m.Write(f)
  175. chkerr(werr)
  176. cerr := f.Close()
  177. chkerr(cerr)
  178. }
  179. func exists(file string) bool {
  180. if _, err := os.Stat(file); err != nil {
  181. if os.IsNotExist(err) {
  182. return false
  183. }
  184. chkerr(err)
  185. }
  186. return true
  187. }
  188. func backup(store, key string) {
  189. bkpp := bkppath(store, key)
  190. fulp := fullpath(store, key)
  191. backup, err := os.Create(bkpp)
  192. chkerr(err)
  193. mfile, oerr := os.Open(fulp)
  194. chkerr(oerr)
  195. _, cerr := io.Copy(backup, mfile)
  196. chkerr(cerr)
  197. clberr := backup.Close()
  198. chkerr(clberr)
  199. clmerr := mfile.Close()
  200. chkerr(clmerr)
  201. }
  202. // Identify .bkp files in the store and turn them into .msg files,
  203. // whether or not it overwrites an existing file. This is safe because
  204. // I'm copying the Paho Java client and they say it is.
  205. func restore(store string) {
  206. files, rderr := ioutil.ReadDir(store)
  207. chkerr(rderr)
  208. for _, f := range files {
  209. fname := f.Name()
  210. if len(fname) > 4 {
  211. if fname[len(fname)-4:] == bkpExt {
  212. key := fname[0 : len(fname)-4]
  213. fulp := fullpath(store, key)
  214. msg, cerr := os.Create(fulp)
  215. chkerr(cerr)
  216. bkpp := path.Join(store, fname)
  217. bkp, oerr := os.Open(bkpp)
  218. chkerr(oerr)
  219. n, cerr := io.Copy(msg, bkp)
  220. chkerr(cerr)
  221. chkcond(n > 0)
  222. clmerr := msg.Close()
  223. chkerr(clmerr)
  224. clberr := bkp.Close()
  225. chkerr(clberr)
  226. remerr := os.Remove(bkpp)
  227. chkerr(remerr)
  228. }
  229. }
  230. }
  231. }
  232. func unbackup(store, key string) {
  233. bkpp := bkppath(store, key)
  234. remerr := os.Remove(bkpp)
  235. chkerr(remerr)
  236. }