fileaccess.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "regexp"
  9. "sparrow/pkg/redispool"
  10. "sparrow/pkg/rpcs"
  11. "sparrow/pkg/server"
  12. "sparrow/pkg/utils"
  13. "sync"
  14. "time"
  15. "github.com/garyburd/redigo/redis"
  16. )
  17. const checkTimeOut = 30 * time.Minute
  18. const tempFilesKey = "tempfilelist"
  19. // FileAccess RPC服务
  20. type FileAccess struct {
  21. mu sync.RWMutex
  22. // store tmp file list
  23. tempFiles map[string]*tempFile
  24. redisHost string
  25. }
  26. // 临时文件
  27. type tempFile struct {
  28. //文件路径
  29. FileName string
  30. //创建时间
  31. CreateTime time.Time
  32. }
  33. // NewFileAccess create a FileAccessor instance
  34. func NewFileAccess(redis string) *FileAccess {
  35. return &FileAccess{
  36. tempFiles: make(map[string]*tempFile),
  37. redisHost: redis,
  38. }
  39. }
  40. // 增加一个tempfile
  41. func (f *FileAccess) addTempFile(fileName string) {
  42. obj := &tempFile{
  43. FileName: fileName,
  44. CreateTime: time.Now(),
  45. }
  46. f.mu.Lock()
  47. f.tempFiles[fileName] = obj
  48. f.mu.Unlock()
  49. // store redis
  50. err := f.saveToRedis(obj)
  51. if err != nil {
  52. server.Log.Error(err)
  53. }
  54. }
  55. func (f *FileAccess) delTempFile(fileName string) {
  56. f.mu.Lock()
  57. if obj, ok := f.tempFiles[fileName]; ok {
  58. conn, err := redispool.GetClient(f.redisHost)
  59. if err != nil {
  60. server.Log.Errorf("conn redis error :%v", err)
  61. f.mu.Unlock()
  62. return
  63. }
  64. bytes, err := json.Marshal(obj)
  65. if err != nil {
  66. server.Log.Errorf("json marshal error :%v", err)
  67. }
  68. conn.Do("SREM", tempFilesKey, string(bytes))
  69. }
  70. f.mu.Unlock()
  71. }
  72. //
  73. func (f *FileAccess) saveToRedis(obj *tempFile) error {
  74. conn, err := redispool.GetClient(f.redisHost)
  75. if err != nil {
  76. server.Log.Errorf("saveToRedis error :%v", err)
  77. return err
  78. }
  79. bytes, err := json.Marshal(obj)
  80. if err != nil {
  81. server.Log.Errorf("json marshal error :%v", err)
  82. return err
  83. }
  84. _, err = conn.Do("SADD", tempFilesKey, string(bytes))
  85. if err != nil {
  86. server.Log.Errorf("store to redis error :%v", err)
  87. return err
  88. }
  89. return nil
  90. }
  91. func (f *FileAccess) getTempFileFromRedis() error {
  92. conn, err := redispool.GetClient(f.redisHost)
  93. if err != nil {
  94. server.Log.Errorf("conn to redis error :%v", err)
  95. } else {
  96. // fill tempFile
  97. lists, err := redis.Strings(conn.Do("SMEMBERS", tempFilesKey))
  98. if err != nil {
  99. server.Log.Error(err)
  100. return err
  101. }
  102. for _, str := range lists {
  103. var obj tempFile
  104. json.Unmarshal([]byte(str), &obj)
  105. server.Log.Debugf("%v", obj)
  106. f.mu.Lock()
  107. f.tempFiles[obj.FileName] = &obj
  108. f.mu.Unlock()
  109. }
  110. }
  111. return nil
  112. }
  113. // TODO: 临时解决文案,下个版本把文件信息写到redis中,利用redis的pub/sub机制,自动清理文件
  114. func (f *FileAccess) checker() {
  115. server.Log.Info("start temp file checker")
  116. f.getTempFileFromRedis()
  117. server.Log.Info("start temp file checker")
  118. for {
  119. for k, v := range f.tempFiles {
  120. if time.Now().Sub(v.CreateTime) > checkTimeOut {
  121. //delete file
  122. f.delTempFile(v.FileName)
  123. f.mu.Lock()
  124. delete(f.tempFiles, k)
  125. f.mu.Unlock()
  126. err := os.Remove(v.FileName)
  127. if err != nil {
  128. server.Log.Errorf("error while delete file:%v", err)
  129. }
  130. }
  131. }
  132. time.Sleep(15 * time.Minute)
  133. }
  134. }
  135. // DeleteFile 删除文件
  136. func (f *FileAccess) DeleteFile(args *rpcs.ArgsDeleteFile, reply *rpcs.ReplyEmptyResult) error {
  137. return nil
  138. }
  139. // MoveFile move a file to new path
  140. // source:http://192.168.175.60:9000/upload/tmp/2c9d7d85-2266-450a-9d47-28e67703d818.jpeg
  141. func (f *FileAccess) MoveFile(args *rpcs.ArgsMoveFile, reply *rpcs.ReplyMoveFile) error {
  142. // check source file
  143. reg := regexp.MustCompile(`tmp/\$*.*`)
  144. src := reg.FindString(args.Source)
  145. fileName := filepath.Base(src)
  146. src = *conStaticPath + "/" + src
  147. b, err := utils.Exists(src)
  148. if err != nil {
  149. server.Log.Error(err)
  150. return err
  151. }
  152. if b {
  153. // copy file
  154. sourceFileStat, err := os.Stat(src)
  155. if err != nil {
  156. return err
  157. }
  158. if !sourceFileStat.Mode().IsRegular() {
  159. return fmt.Errorf("%s is not a regular file", src)
  160. }
  161. source, err := os.Open(src)
  162. if err != nil {
  163. return err
  164. }
  165. defer source.Close()
  166. dst := *conStaticPath + "/" + args.Target + "/" + fileName
  167. utils.CreateIfNotExist(dst)
  168. destination, err := os.Create(dst)
  169. if err != nil {
  170. return err
  171. }
  172. defer destination.Close()
  173. io.Copy(destination, source)
  174. fpath := fmt.Sprintf("http://%s/%s/%s/%s", server.GetHTTPHost(), *conStaticPath, args.Target, fileName)
  175. if *conDomain != "" {
  176. fpath = fmt.Sprintf("http://%s/%s/%s/%s", *conDomain, *conStaticPath, args.Target, fileName)
  177. }
  178. reply.FilePath = fpath
  179. //delete src
  180. os.Remove(src)
  181. return nil
  182. }
  183. return nil
  184. }