fileaccess.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "regexp"
  9. "sparrow/pkg/redispool"
  10. "sparrow/pkg/rpcs"
  11. "sparrow/pkg/server"
  12. "sparrow/pkg/utils"
  13. "sync"
  14. "time"
  15. "github.com/garyburd/redigo/redis"
  16. "github.com/opentracing/opentracing-go"
  17. "github.com/opentracing/opentracing-go/ext"
  18. )
  19. const checkTimeOut = 30 * time.Minute
  20. const tempFilesKey = "tempfilelist"
  21. // FileAccess RPC服务
  22. type FileAccess struct {
  23. mu sync.RWMutex
  24. // store tmp file list
  25. tempFiles map[string]*tempFile
  26. redisHost string
  27. }
  28. // 临时文件
  29. type tempFile struct {
  30. //文件路径
  31. FileName string
  32. //创建时间
  33. CreateTime time.Time
  34. }
  35. // NewFileAccess create a FileAccessor instance
  36. func NewFileAccess(redis string) *FileAccess {
  37. return &FileAccess{
  38. tempFiles: make(map[string]*tempFile),
  39. redisHost: redis,
  40. }
  41. }
  42. // 增加一个tempfile
  43. func (f *FileAccess) addTempFile(fileName string) {
  44. obj := &tempFile{
  45. FileName: fileName,
  46. CreateTime: time.Now(),
  47. }
  48. f.mu.Lock()
  49. f.tempFiles[fileName] = obj
  50. f.mu.Unlock()
  51. // store redis
  52. err := f.saveToRedis(obj)
  53. if err != nil {
  54. server.Log.Error(err)
  55. }
  56. }
  57. func (f *FileAccess) delTempFile(fileName string) {
  58. f.mu.Lock()
  59. if obj, ok := f.tempFiles[fileName]; ok {
  60. conn, err := redispool.GetClient(f.redisHost)
  61. if err != nil {
  62. server.Log.Errorf("conn redis error :%v", err)
  63. f.mu.Unlock()
  64. return
  65. }
  66. bytes, err := json.Marshal(obj)
  67. if err != nil {
  68. server.Log.Errorf("json marshal error :%v", err)
  69. }
  70. conn.Do("SREM", tempFilesKey, string(bytes))
  71. }
  72. f.mu.Unlock()
  73. }
  74. //
  75. func (f *FileAccess) saveToRedis(obj *tempFile) error {
  76. conn, err := redispool.GetClient(f.redisHost)
  77. if err != nil {
  78. server.Log.Errorf("saveToRedis error :%v", err)
  79. return err
  80. }
  81. bytes, err := json.Marshal(obj)
  82. if err != nil {
  83. server.Log.Errorf("json marshal error :%v", err)
  84. return err
  85. }
  86. _, err = conn.Do("SADD", tempFilesKey, string(bytes))
  87. if err != nil {
  88. server.Log.Errorf("store to redis error :%v", err)
  89. return err
  90. }
  91. return nil
  92. }
  93. func (f *FileAccess) getTempFileFromRedis() error {
  94. conn, err := redispool.GetClient(f.redisHost)
  95. if err != nil {
  96. server.Log.Errorf("conn to redis error :%v", err)
  97. } else {
  98. // fill tempFile
  99. lists, err := redis.Strings(conn.Do("SMEMBERS", tempFilesKey))
  100. if err != nil {
  101. server.Log.Error(err)
  102. return err
  103. }
  104. for _, str := range lists {
  105. var obj tempFile
  106. json.Unmarshal([]byte(str), &obj)
  107. server.Log.Debugf("%v", obj)
  108. f.mu.Lock()
  109. f.tempFiles[obj.FileName] = &obj
  110. f.mu.Unlock()
  111. }
  112. }
  113. return nil
  114. }
  115. // TODO: 临时解决文案,下个版本把文件信息写到redis中,利用redis的pub/sub机制,自动清理文件
  116. func (f *FileAccess) checker() {
  117. server.Log.Info("start temp file checker")
  118. f.getTempFileFromRedis()
  119. for {
  120. for k, v := range f.tempFiles {
  121. if time.Now().Sub(v.CreateTime) > checkTimeOut {
  122. //delete file
  123. f.delTempFile(v.FileName)
  124. f.mu.Lock()
  125. delete(f.tempFiles, k)
  126. f.mu.Unlock()
  127. err := os.Remove(v.FileName)
  128. if err != nil {
  129. server.Log.Errorf("error while delete file:%v", err)
  130. }
  131. }
  132. }
  133. time.Sleep(15 * time.Minute)
  134. }
  135. }
  136. // DeleteFile 删除文件
  137. func (f *FileAccess) DeleteFile(args *rpcs.ArgsDeleteFile, reply *rpcs.ReplyEmptyResult) error {
  138. reg := regexp.MustCompile(`upload/\$*.*`)
  139. src := reg.FindString(args.FileName)
  140. err := os.Remove(src)
  141. return err
  142. }
  143. // MoveFile move a file to new path
  144. // source:http://192.168.175.60:9000/upload/tmp/2c9d7d85-2266-450a-9d47-28e67703d818.jpeg
  145. func (f *FileAccess) MoveFile(args *rpcs.ArgsMoveFile, reply *rpcs.ReplyMoveFile) error {
  146. spanCtx, _ := opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(args.SpanCarrier))
  147. span := opentracing.StartSpan("movefile", ext.RPCServerOption(spanCtx))
  148. defer span.Finish()
  149. span.LogKV("tmpfile", args.Source)
  150. // check source file
  151. reg := regexp.MustCompile(`tmp/\$*.*`)
  152. src := reg.FindString(args.Source)
  153. fileName := filepath.Base(src)
  154. src = *conStaticPath + "/" + src
  155. b, err := utils.Exists(src)
  156. if err != nil {
  157. server.Log.Error(err)
  158. return err
  159. }
  160. if b {
  161. // copy file
  162. sourceFileStat, err := os.Stat(src)
  163. if err != nil {
  164. return err
  165. }
  166. if !sourceFileStat.Mode().IsRegular() {
  167. return fmt.Errorf("%s is not a regular file", src)
  168. }
  169. source, err := os.Open(src)
  170. if err != nil {
  171. return err
  172. }
  173. defer source.Close()
  174. dst := *conStaticPath + "/" + args.Target + "/" + fileName
  175. utils.CreateIfNotExist(dst)
  176. destination, err := os.Create(dst)
  177. if err != nil {
  178. return err
  179. }
  180. defer destination.Close()
  181. io.Copy(destination, source)
  182. fpath := fmt.Sprintf("http://%s/%s/%s/%s", server.GetHTTPHost(), *conStaticPath, args.Target, fileName)
  183. if *conDomain != "" {
  184. fpath = fmt.Sprintf("%s/%s/%s/%s", *conDomain, *conStaticPath, args.Target, fileName)
  185. }
  186. reply.FilePath = fpath
  187. span.LogKV("tartgetfile", fpath)
  188. //delete src
  189. os.Remove(src)
  190. return nil
  191. }
  192. return nil
  193. }