123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- /*
- * Copyright (c) 2013 IBM Corp.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Seth Hoenig
- * Allan Stockdill-Mander
- * Mike Robertson
- */
- package mqtt
- import (
- "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets"
- "io"
- "io/ioutil"
- "os"
- "path"
- "sync"
- )
- const (
- msgExt = ".msg"
- bkpExt = ".bkp"
- )
- // FileStore implements the store interface using the filesystem to provide
- // true persistence, even across client failure. This is designed to use a
- // single directory per running client. If you are running multiple clients
- // on the same filesystem, you will need to be careful to specify unique
- // store directories for each.
- type FileStore struct {
- sync.RWMutex
- directory string
- opened bool
- }
- // NewFileStore will create a new FileStore which stores its messages in the
- // directory provided.
- func NewFileStore(directory string) *FileStore {
- store := &FileStore{
- directory: directory,
- opened: false,
- }
- return store
- }
- // Open will allow the FileStore to be used.
- func (store *FileStore) Open() {
- store.Lock()
- defer store.Unlock()
- // if no store directory was specified in ClientOpts, by default use the
- // current working directory
- if store.directory == "" {
- store.directory, _ = os.Getwd()
- }
- // if store dir exists, great, otherwise, create it
- if !exists(store.directory) {
- perms := os.FileMode(0770)
- merr := os.MkdirAll(store.directory, perms)
- chkerr(merr)
- }
- store.opened = true
- DEBUG.Println(STR, "store is opened at", store.directory)
- }
- // Close will disallow the FileStore from being used.
- func (store *FileStore) Close() {
- store.Lock()
- defer store.Unlock()
- store.opened = false
- WARN.Println(STR, "store is not open")
- }
- // Put will put a message into the store, associated with the provided
- // key value.
- func (store *FileStore) Put(key string, m packets.ControlPacket) {
- store.Lock()
- defer store.Unlock()
- chkcond(store.opened)
- full := fullpath(store.directory, key)
- if exists(full) {
- backup(store.directory, key) // make a copy of what already exists
- defer unbackup(store.directory, key)
- }
- write(store.directory, key, m)
- chkcond(exists(full))
- }
- // Get will retrieve a message from the store, the one associated with
- // the provided key value.
- func (store *FileStore) Get(key string) packets.ControlPacket {
- store.RLock()
- defer store.RUnlock()
- chkcond(store.opened)
- filepath := fullpath(store.directory, key)
- if !exists(filepath) {
- return nil
- }
- mfile, oerr := os.Open(filepath)
- chkerr(oerr)
- //all, rerr := ioutil.ReadAll(mfile)
- //chkerr(rerr)
- msg, rerr := packets.ReadPacket(mfile)
- chkerr(rerr)
- cerr := mfile.Close()
- chkerr(cerr)
- return msg
- }
- // All will provide a list of all of the keys associated with messages
- // currenly residing in the FileStore.
- func (store *FileStore) All() []string {
- store.RLock()
- defer store.RUnlock()
- return store.all()
- }
- // Del will remove the persisted message associated with the provided
- // key from the FileStore.
- func (store *FileStore) Del(key string) {
- store.Lock()
- defer store.Unlock()
- store.del(key)
- }
- // Reset will remove all persisted messages from the FileStore.
- func (store *FileStore) Reset() {
- store.Lock()
- defer store.Unlock()
- WARN.Println(STR, "FileStore Reset")
- for _, key := range store.all() {
- store.del(key)
- }
- }
- // lockless
- func (store *FileStore) all() []string {
- chkcond(store.opened)
- keys := []string{}
- files, rderr := ioutil.ReadDir(store.directory)
- chkerr(rderr)
- for _, f := range files {
- DEBUG.Println(STR, "file in All():", f.Name())
- key := f.Name()[0 : len(f.Name())-4] // remove file extension
- keys = append(keys, key)
- }
- return keys
- }
- // lockless
- func (store *FileStore) del(key string) {
- chkcond(store.opened)
- DEBUG.Println(STR, "store del filepath:", store.directory)
- DEBUG.Println(STR, "store delete key:", key)
- filepath := fullpath(store.directory, key)
- DEBUG.Println(STR, "path of deletion:", filepath)
- if !exists(filepath) {
- WARN.Println(STR, "store could not delete key:", key)
- return
- }
- rerr := os.Remove(filepath)
- chkerr(rerr)
- DEBUG.Println(STR, "del msg:", key)
- chkcond(!exists(filepath))
- }
- func fullpath(store string, key string) string {
- p := path.Join(store, key+msgExt)
- return p
- }
- func bkppath(store string, key string) string {
- p := path.Join(store, key+bkpExt)
- return p
- }
- // create file called "X.[messageid].msg" located in the store
- // the contents of the file is the bytes of the message
- // if a message with m's message id already exists, it will
- // be overwritten
- // X will be 'i' for inbound messages, and O for outbound messages
- func write(store, key string, m packets.ControlPacket) {
- filepath := fullpath(store, key)
- f, err := os.Create(filepath)
- chkerr(err)
- werr := m.Write(f)
- chkerr(werr)
- cerr := f.Close()
- chkerr(cerr)
- }
- func exists(file string) bool {
- if _, err := os.Stat(file); err != nil {
- if os.IsNotExist(err) {
- return false
- }
- chkerr(err)
- }
- return true
- }
- func backup(store, key string) {
- bkpp := bkppath(store, key)
- fulp := fullpath(store, key)
- backup, err := os.Create(bkpp)
- chkerr(err)
- mfile, oerr := os.Open(fulp)
- chkerr(oerr)
- _, cerr := io.Copy(backup, mfile)
- chkerr(cerr)
- clberr := backup.Close()
- chkerr(clberr)
- clmerr := mfile.Close()
- chkerr(clmerr)
- }
- // Identify .bkp files in the store and turn them into .msg files,
- // whether or not it overwrites an existing file. This is safe because
- // I'm copying the Paho Java client and they say it is.
- func restore(store string) {
- files, rderr := ioutil.ReadDir(store)
- chkerr(rderr)
- for _, f := range files {
- fname := f.Name()
- if len(fname) > 4 {
- if fname[len(fname)-4:] == bkpExt {
- key := fname[0 : len(fname)-4]
- fulp := fullpath(store, key)
- msg, cerr := os.Create(fulp)
- chkerr(cerr)
- bkpp := path.Join(store, fname)
- bkp, oerr := os.Open(bkpp)
- chkerr(oerr)
- n, cerr := io.Copy(msg, bkp)
- chkerr(cerr)
- chkcond(n > 0)
- clmerr := msg.Close()
- chkerr(clmerr)
- clberr := bkp.Close()
- chkerr(clberr)
- remerr := os.Remove(bkpp)
- chkerr(remerr)
- }
- }
- }
- }
- func unbackup(store, key string) {
- bkpp := bkppath(store, key)
- remerr := os.Remove(bkpp)
- chkerr(remerr)
- }
|