123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- package main
- import (
- "encoding/json"
- "fmt"
- "io"
- "os"
- "path/filepath"
- "regexp"
- "sparrow/pkg/redispool"
- "sparrow/pkg/rpcs"
- "sparrow/pkg/server"
- "sparrow/pkg/utils"
- "sync"
- "time"
- "github.com/garyburd/redigo/redis"
- "github.com/opentracing/opentracing-go"
- "github.com/opentracing/opentracing-go/ext"
- )
- const checkTimeOut = 30 * time.Minute
- const tempFilesKey = "tempfilelist"
- // FileAccess RPC服务
- type FileAccess struct {
- mu sync.RWMutex
- // store tmp file list
- tempFiles map[string]*tempFile
- redisHost string
- }
- // 临时文件
- type tempFile struct {
- //文件路径
- FileName string
- //创建时间
- CreateTime time.Time
- }
- // NewFileAccess create a FileAccessor instance
- func NewFileAccess(redis string) *FileAccess {
- return &FileAccess{
- tempFiles: make(map[string]*tempFile),
- redisHost: redis,
- }
- }
- // 增加一个tempfile
- func (f *FileAccess) addTempFile(fileName string) {
- obj := &tempFile{
- FileName: fileName,
- CreateTime: time.Now(),
- }
- f.mu.Lock()
- f.tempFiles[fileName] = obj
- f.mu.Unlock()
- // store redis
- err := f.saveToRedis(obj)
- if err != nil {
- server.Log.Error(err)
- }
- }
- func (f *FileAccess) delTempFile(fileName string) {
- f.mu.Lock()
- if obj, ok := f.tempFiles[fileName]; ok {
- conn, err := redispool.GetClient(f.redisHost)
- if err != nil {
- server.Log.Errorf("conn redis error :%v", err)
- f.mu.Unlock()
- return
- }
- bytes, err := json.Marshal(obj)
- if err != nil {
- server.Log.Errorf("json marshal error :%v", err)
- }
- conn.Do("SREM", tempFilesKey, string(bytes))
- }
- f.mu.Unlock()
- }
- //
- func (f *FileAccess) saveToRedis(obj *tempFile) error {
- conn, err := redispool.GetClient(f.redisHost)
- if err != nil {
- server.Log.Errorf("saveToRedis error :%v", err)
- return err
- }
- bytes, err := json.Marshal(obj)
- if err != nil {
- server.Log.Errorf("json marshal error :%v", err)
- return err
- }
- _, err = conn.Do("SADD", tempFilesKey, string(bytes))
- if err != nil {
- server.Log.Errorf("store to redis error :%v", err)
- return err
- }
- return nil
- }
- func (f *FileAccess) getTempFileFromRedis() error {
- conn, err := redispool.GetClient(f.redisHost)
- if err != nil {
- server.Log.Errorf("conn to redis error :%v", err)
- } else {
- // fill tempFile
- lists, err := redis.Strings(conn.Do("SMEMBERS", tempFilesKey))
- if err != nil {
- server.Log.Error(err)
- return err
- }
- for _, str := range lists {
- var obj tempFile
- json.Unmarshal([]byte(str), &obj)
- server.Log.Debugf("%v", obj)
- f.mu.Lock()
- f.tempFiles[obj.FileName] = &obj
- f.mu.Unlock()
- }
- }
- return nil
- }
- // TODO: 临时解决文案,下个版本把文件信息写到redis中,利用redis的pub/sub机制,自动清理文件
- func (f *FileAccess) checker() {
- server.Log.Info("start temp file checker")
- f.getTempFileFromRedis()
- for {
- for k, v := range f.tempFiles {
- if time.Now().Sub(v.CreateTime) > checkTimeOut {
- //delete file
- f.delTempFile(v.FileName)
- f.mu.Lock()
- delete(f.tempFiles, k)
- f.mu.Unlock()
- err := os.Remove(v.FileName)
- if err != nil {
- server.Log.Errorf("error while delete file:%v", err)
- }
- }
- }
- time.Sleep(15 * time.Minute)
- }
- }
- // DeleteFile 删除文件
- func (f *FileAccess) DeleteFile(args *rpcs.ArgsDeleteFile, reply *rpcs.ReplyEmptyResult) error {
- reg := regexp.MustCompile(`upload/\$*.*`)
- src := reg.FindString(args.FileName)
- err := os.Remove(src)
- return err
- }
- // MoveFile move a file to new path
- // source:http://192.168.175.60:9000/upload/tmp/2c9d7d85-2266-450a-9d47-28e67703d818.jpeg
- func (f *FileAccess) MoveFile(args *rpcs.ArgsMoveFile, reply *rpcs.ReplyMoveFile) error {
- spanCtx, _ := opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(args.SpanCarrier))
- span := opentracing.StartSpan("movefile", ext.RPCServerOption(spanCtx))
- defer span.Finish()
- span.LogKV("tmpfile", args.Source)
- // check source file
- reg := regexp.MustCompile(`tmp/\$*.*`)
- src := reg.FindString(args.Source)
- fileName := filepath.Base(src)
- src = *conStaticPath + "/" + src
- b, err := utils.Exists(src)
- if err != nil {
- server.Log.Error(err)
- return err
- }
- if b {
- // copy file
- sourceFileStat, err := os.Stat(src)
- if err != nil {
- return err
- }
- if !sourceFileStat.Mode().IsRegular() {
- return fmt.Errorf("%s is not a regular file", src)
- }
- source, err := os.Open(src)
- if err != nil {
- return err
- }
- defer source.Close()
- dst := *conStaticPath + "/" + args.Target + "/" + fileName
- utils.CreateIfNotExist(dst)
- destination, err := os.Create(dst)
- if err != nil {
- return err
- }
- defer destination.Close()
- io.Copy(destination, source)
- fpath := fmt.Sprintf("http://%s/%s/%s/%s", server.GetHTTPHost(), *conStaticPath, args.Target, fileName)
- if *conDomain != "" {
- fpath = fmt.Sprintf("%s/%s/%s/%s", *conDomain, *conStaticPath, args.Target, fileName)
- }
- reply.FilePath = fpath
- span.LogKV("tartgetfile", fpath)
- //delete src
- os.Remove(src)
- return nil
- }
- return nil
- }
|