fileaccess.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "regexp"
  9. "sparrow/pkg/redispool"
  10. "sparrow/pkg/rpcs"
  11. "sparrow/pkg/server"
  12. "sparrow/pkg/utils"
  13. "sync"
  14. "time"
  15. "github.com/garyburd/redigo/redis"
  16. )
  17. const checkTimeOut = 30 * time.Minute
  18. const tempFilesKey = "tempfilelist"
  19. // FileAccess RPC服务
  20. type FileAccess struct {
  21. mu sync.RWMutex
  22. // store tmp file list
  23. tempFiles map[string]*tempFile
  24. redisHost string
  25. }
  26. // 临时文件
  27. type tempFile struct {
  28. //文件路径
  29. FileName string
  30. //创建时间
  31. CreateTime time.Time
  32. }
  33. // NewFileAccess create a FileAccessor instance
  34. func NewFileAccess(redis string) *FileAccess {
  35. return &FileAccess{
  36. tempFiles: make(map[string]*tempFile),
  37. redisHost: redis,
  38. }
  39. }
  40. // 增加一个tempfile
  41. func (f *FileAccess) addTempFile(fileName string) {
  42. obj := &tempFile{
  43. FileName: fileName,
  44. CreateTime: time.Now(),
  45. }
  46. f.mu.Lock()
  47. f.tempFiles[fileName] = obj
  48. f.mu.Unlock()
  49. // store redis
  50. err := f.saveToRedis(obj)
  51. if err != nil {
  52. server.Log.Error(err)
  53. }
  54. }
  55. func (f *FileAccess) delTempFile(fileName string) {
  56. f.mu.Lock()
  57. if obj, ok := f.tempFiles[fileName]; ok {
  58. conn, err := redispool.GetClient(f.redisHost)
  59. if err != nil {
  60. server.Log.Errorf("conn redis error :%v", err)
  61. f.mu.Unlock()
  62. return
  63. }
  64. bytes, err := json.Marshal(obj)
  65. if err != nil {
  66. server.Log.Errorf("json marshal error :%v", err)
  67. }
  68. conn.Do("SREM", tempFilesKey, string(bytes))
  69. }
  70. f.mu.Unlock()
  71. }
  72. //
  73. func (f *FileAccess) saveToRedis(obj *tempFile) error {
  74. conn, err := redispool.GetClient(f.redisHost)
  75. if err != nil {
  76. server.Log.Errorf("saveToRedis error :%v", err)
  77. return err
  78. }
  79. bytes, err := json.Marshal(obj)
  80. if err != nil {
  81. server.Log.Errorf("json marshal error :%v", err)
  82. return err
  83. }
  84. _, err = conn.Do("SADD", tempFilesKey, string(bytes))
  85. if err != nil {
  86. server.Log.Errorf("store to redis error :%v", err)
  87. return err
  88. }
  89. return nil
  90. }
  91. func (f *FileAccess) getTempFileFromRedis() error {
  92. conn, err := redispool.GetClient(f.redisHost)
  93. if err != nil {
  94. server.Log.Errorf("conn to redis error :%v", err)
  95. } else {
  96. // fill tempFile
  97. lists, err := redis.Strings(conn.Do("SMEMBERS", tempFilesKey))
  98. if err != nil {
  99. server.Log.Error(err)
  100. return err
  101. }
  102. for _, str := range lists {
  103. var obj tempFile
  104. json.Unmarshal([]byte(str), &obj)
  105. server.Log.Debugf("%v", obj)
  106. f.mu.Lock()
  107. f.tempFiles[obj.FileName] = &obj
  108. f.mu.Unlock()
  109. }
  110. }
  111. conn.Close()
  112. return nil
  113. }
  114. // TODO: 临时解决文案,下个版本把文件信息写到redis中,利用redis的pub/sub机制,自动清理文件
  115. func (f *FileAccess) checker() {
  116. server.Log.Info("start temp file checker")
  117. f.getTempFileFromRedis()
  118. for {
  119. for k, v := range f.tempFiles {
  120. if time.Now().Sub(v.CreateTime) > checkTimeOut {
  121. //delete file
  122. f.delTempFile(v.FileName)
  123. f.mu.Lock()
  124. delete(f.tempFiles, k)
  125. f.mu.Unlock()
  126. err := os.Remove(v.FileName)
  127. if err != nil {
  128. server.Log.Errorf("error while delete file:%v", err)
  129. }
  130. }
  131. }
  132. time.Sleep(15 * time.Minute)
  133. }
  134. }
  135. // DeleteFile 删除文件
  136. func (f *FileAccess) DeleteFile(args *rpcs.ArgsDeleteFile, reply *rpcs.ReplyEmptyResult) error {
  137. reg := regexp.MustCompile(`upload/\$*.*`)
  138. src := reg.FindString(args.FileName)
  139. err := os.Remove(src)
  140. return err
  141. }
  142. // MoveFile move a file to new path
  143. // source:http://192.168.175.60:9000/upload/tmp/2c9d7d85-2266-450a-9d47-28e67703d818.jpeg
  144. func (f *FileAccess) MoveFile(args *rpcs.ArgsMoveFile, reply *rpcs.ReplyMoveFile) error {
  145. // check source file
  146. reg := regexp.MustCompile(`tmp/\$*.*`)
  147. src := reg.FindString(args.Source)
  148. fileName := filepath.Base(src)
  149. src = *conStaticPath + "/" + src
  150. b, err := utils.Exists(src)
  151. if err != nil {
  152. server.Log.Error(err)
  153. return err
  154. }
  155. if b {
  156. // copy file
  157. sourceFileStat, err := os.Stat(src)
  158. if err != nil {
  159. return err
  160. }
  161. if !sourceFileStat.Mode().IsRegular() {
  162. return fmt.Errorf("%s is not a regular file", src)
  163. }
  164. source, err := os.Open(src)
  165. if err != nil {
  166. return err
  167. }
  168. defer source.Close()
  169. dst := *conStaticPath + "/" + args.Target + "/" + fileName
  170. utils.CreateIfNotExist(dst)
  171. destination, err := os.Create(dst)
  172. if err != nil {
  173. return err
  174. }
  175. defer destination.Close()
  176. io.Copy(destination, source)
  177. fpath := fmt.Sprintf("http://%s/%s/%s/%s", server.GetHTTPHost(), *conStaticPath, args.Target, fileName)
  178. if *conDomain != "" {
  179. fpath = fmt.Sprintf("%s/%s/%s/%s", *conDomain, *conStaticPath, args.Target, fileName)
  180. }
  181. reply.FilePath = fpath
  182. //delete src
  183. os.Remove(src)
  184. return nil
  185. }
  186. return nil
  187. }