package main import ( "encoding/json" "fmt" "io" "os" "path/filepath" "regexp" "sparrow/pkg/redispool" "sparrow/pkg/rpcs" "sparrow/pkg/server" "sparrow/pkg/utils" "sync" "time" "github.com/garyburd/redigo/redis" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" ) const checkTimeOut = 30 * time.Minute const tempFilesKey = "tempfilelist" // FileAccess RPC服务 type FileAccess struct { mu sync.RWMutex // store tmp file list tempFiles map[string]*tempFile redisHost string } // 临时文件 type tempFile struct { //文件路径 FileName string //创建时间 CreateTime time.Time } // NewFileAccess create a FileAccessor instance func NewFileAccess(redis string) *FileAccess { return &FileAccess{ tempFiles: make(map[string]*tempFile), redisHost: redis, } } // 增加一个tempfile func (f *FileAccess) addTempFile(fileName string) { obj := &tempFile{ FileName: fileName, CreateTime: time.Now(), } f.mu.Lock() f.tempFiles[fileName] = obj f.mu.Unlock() // store redis err := f.saveToRedis(obj) if err != nil { server.Log.Error(err) } } func (f *FileAccess) delTempFile(fileName string) { f.mu.Lock() if obj, ok := f.tempFiles[fileName]; ok { conn, err := redispool.GetClient(f.redisHost) if err != nil { server.Log.Errorf("conn redis error :%v", err) f.mu.Unlock() return } bytes, err := json.Marshal(obj) if err != nil { server.Log.Errorf("json marshal error :%v", err) } conn.Do("SREM", tempFilesKey, string(bytes)) } f.mu.Unlock() } // func (f *FileAccess) saveToRedis(obj *tempFile) error { conn, err := redispool.GetClient(f.redisHost) if err != nil { server.Log.Errorf("saveToRedis error :%v", err) return err } bytes, err := json.Marshal(obj) if err != nil { server.Log.Errorf("json marshal error :%v", err) return err } _, err = conn.Do("SADD", tempFilesKey, string(bytes)) if err != nil { server.Log.Errorf("store to redis error :%v", err) return err } return nil } func (f *FileAccess) getTempFileFromRedis() error { conn, err := redispool.GetClient(f.redisHost) if err != nil { server.Log.Errorf("conn to redis error :%v", err) } else { // fill tempFile lists, err := redis.Strings(conn.Do("SMEMBERS", tempFilesKey)) if err != nil { server.Log.Error(err) return err } for _, str := range lists { var obj tempFile json.Unmarshal([]byte(str), &obj) server.Log.Debugf("%v", obj) f.mu.Lock() f.tempFiles[obj.FileName] = &obj f.mu.Unlock() } } return nil } // TODO: 临时解决文案,下个版本把文件信息写到redis中,利用redis的pub/sub机制,自动清理文件 func (f *FileAccess) checker() { server.Log.Info("start temp file checker") f.getTempFileFromRedis() for { for k, v := range f.tempFiles { if time.Now().Sub(v.CreateTime) > checkTimeOut { //delete file f.delTempFile(v.FileName) f.mu.Lock() delete(f.tempFiles, k) f.mu.Unlock() err := os.Remove(v.FileName) if err != nil { server.Log.Errorf("error while delete file:%v", err) } } } time.Sleep(15 * time.Minute) } } // DeleteFile 删除文件 func (f *FileAccess) DeleteFile(args *rpcs.ArgsDeleteFile, reply *rpcs.ReplyEmptyResult) error { reg := regexp.MustCompile(`upload/\$*.*`) src := reg.FindString(args.FileName) err := os.Remove(src) return err } // MoveFile move a file to new path // source:http://192.168.175.60:9000/upload/tmp/2c9d7d85-2266-450a-9d47-28e67703d818.jpeg func (f *FileAccess) MoveFile(args *rpcs.ArgsMoveFile, reply *rpcs.ReplyMoveFile) error { spanCtx, _ := opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(args.SpanCarrier)) span := opentracing.StartSpan("movefile", ext.RPCServerOption(spanCtx)) defer span.Finish() span.LogKV("tmpfile", args.Source) // check source file reg := regexp.MustCompile(`tmp/\$*.*`) src := reg.FindString(args.Source) fileName := filepath.Base(src) src = *conStaticPath + "/" + src b, err := utils.Exists(src) if err != nil { server.Log.Error(err) return err } if b { // copy file sourceFileStat, err := os.Stat(src) if err != nil { return err } if !sourceFileStat.Mode().IsRegular() { return fmt.Errorf("%s is not a regular file", src) } source, err := os.Open(src) if err != nil { return err } defer source.Close() dst := *conStaticPath + "/" + args.Target + "/" + fileName utils.CreateIfNotExist(dst) destination, err := os.Create(dst) if err != nil { return err } defer destination.Close() io.Copy(destination, source) fpath := fmt.Sprintf("http://%s/%s/%s/%s", server.GetHTTPHost(), *conStaticPath, args.Target, fileName) if *conDomain != "" { fpath = fmt.Sprintf("%s/%s/%s/%s", *conDomain, *conStaticPath, args.Target, fileName) } reply.FilePath = fpath span.LogKV("tartgetfile", fpath) //delete src os.Remove(src) return nil } return nil }