Selaa lähdekoodia

更新fileaccess服务,添加自动清理文件功能

lijian 6 vuotta sitten
vanhempi
commit
135d0d9a23

+ 5 - 0
pkg/rpcs/fileaccess.go

@@ -10,3 +10,8 @@ type ArgsMoveFile struct {
 type ReplyMoveFile struct {
 	FilePath string
 }
+
+// ArgsDeleteFile 删除文件参数
+type ArgsDeleteFile struct {
+	FileName string
+}

+ 7 - 11
services/fileaccess/file.go

@@ -9,7 +9,6 @@ import (
 	"sparrow/pkg/server"
 	"sparrow/pkg/utils"
 	"strconv"
-	"time"
 
 	"github.com/kataras/iris"
 )
@@ -22,7 +21,7 @@ func registerRouter(app *iris.Application, fc *FileAccess) {
 				server.Log.Error(err)
 				ctx.JSON(map[string]interface{}{
 					"code":    -1,
-					"message": "文件大小超过限制(" + strconv.Itoa(*conMaxSize) + "字节)",
+					"message": "无法找到文件或文件大小超过限制(" + strconv.Itoa(*conMaxSize) + "字节)",
 				})
 				return
 			}
@@ -39,7 +38,7 @@ func registerRouter(app *iris.Application, fc *FileAccess) {
 				return
 			}
 			newname := fmt.Sprintf("%s%s", utils.UUID(), fileSuffix)
-			newfile := "./upload/tmp/" + newname
+			newfile := fmt.Sprintf("%s/%s/%s", *conStaticPath, "tmp", newname)
 			utils.CreateIfNotExist(newfile)
 			out, err := os.OpenFile(newfile,
 				os.O_WRONLY|os.O_CREATE, 0666)
@@ -55,16 +54,13 @@ func registerRouter(app *iris.Application, fc *FileAccess) {
 			io.Copy(out, file)
 			var fileURL string
 			if *conDomain == "" {
-				fileURL = "http://" + server.GetHTTPHost() + "/upload/tmp/" + newname
+				//fileURL = "http://" + server.GetHTTPHost() + "/upload/tmp/" + newname
+				fileURL = fmt.Sprintf("http://%s/%s/%s/%s", server.GetHTTPHost(), *conStaticPath, "tmp", newname)
 			} else {
-				fileURL = "http://" + *conDomain + "/upload/tmp/" + newname
+				fileURL = fmt.Sprintf("%s/%s/%s/%s", *conDomain, *conStaticPath, "tmp", newname)
 			}
-			fc.mu.Lock()
-			fc.tempFiles[newname] = &tempFile{
-				fileName:   newfile,
-				createTime: time.Now(),
-			}
-			fc.mu.Unlock()
+			// save
+			fc.addTempFile(newfile)
 			ctx.JSON(map[string]interface{}{
 				"code":    0,
 				"message": "success",

+ 84 - 5
services/fileaccess/fileaccess.go

@@ -1,11 +1,13 @@
 package main
 
 import (
+	"encoding/json"
 	"fmt"
 	"io"
 	"os"
 	"path/filepath"
 	"regexp"
+	"sparrow/pkg/redispool"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
 	"sparrow/pkg/utils"
@@ -14,11 +16,14 @@ import (
 )
 
 const checkTimeOut = 30 * time.Minute
+const tempFilesKey = "tempfilelist"
 
 // FileAccess RPC服务
 type FileAccess struct {
-	mu        sync.RWMutex
+	mu sync.RWMutex
+	// store tmp file list
 	tempFiles map[string]*tempFile
+	redisHost string
 }
 
 // 代表一个临时文件
@@ -30,25 +35,95 @@ type tempFile struct {
 }
 
 // NewFileAccess create a FileAccessor instance
-func NewFileAccess() *FileAccess {
+func NewFileAccess(redis string) *FileAccess {
 	return &FileAccess{
 		tempFiles: make(map[string]*tempFile),
+		redisHost: redis,
 	}
 }
 
+// 增加一个tempfile
+func (f *FileAccess) addTempFile(fileName string) {
+	obj := &tempFile{
+		fileName:   fileName,
+		createTime: time.Now(),
+	}
+	f.mu.Lock()
+	f.tempFiles[fileName] = obj
+	f.mu.Unlock()
+	// store redis
+	err := f.saveToRedis(obj)
+	if err != nil {
+		server.Log.Error(err)
+	}
+}
+
+func (f *FileAccess) delTempFile(fileName string) {
+	f.mu.Lock()
+	if obj, ok := f.tempFiles[fileName]; ok {
+		conn, err := redispool.GetClient(f.redisHost)
+		if err != nil {
+			server.Log.Errorf("conn redis error :%v", err)
+			f.mu.Unlock()
+			return
+		}
+		bytes, err := json.Marshal(obj)
+		if err != nil {
+			server.Log.Errorf("json marshal error :%v", err)
+		}
+		conn.Do("SREM", tempFilesKey, string(bytes))
+	}
+	f.mu.Unlock()
+}
+
+//
+func (f *FileAccess) saveToRedis(tmp *tempFile) error {
+	conn, err := redispool.GetClient(f.redisHost)
+	if err != nil {
+		server.Log.Errorf("saveToRedis error :%v", err)
+		return err
+	}
+	bytes, err := json.Marshal(tmp)
+	if err != nil {
+		server.Log.Errorf("json marshal error :%v", err)
+		return err
+	}
+	_, err = conn.Do("SADD", tempFilesKey, string(bytes))
+	if err != nil {
+		server.Log.Errorf("store to redis error :%v", err)
+		return err
+	}
+	return nil
+}
+
+func (f *FileAccess) getTempFileFromRedis() map[string]*tempFile {
+	return nil
+}
+
 // TODO:  临时解决文案,下个版本把文件信息写到redis中,利用redis的pub/sub机制,自动清理文件
 func (f *FileAccess) checker() {
-	server.Log.Info("开始文件过期检测")
+	conn, err := redispool.GetClient(f.redisHost)
+	if err != nil {
+		server.Log.Errorf("conn to redis error :%v", err)
+	} else {
+		// fill tempFile
+		_, err := conn.Do("GET", tempFilesKey)
+		if err != nil {
+			server.Log.Error(err)
+		}
+	}
+	server.Log.Info("start temp file checker")
 	for {
 		for k, v := range f.tempFiles {
 			if time.Now().Sub(v.createTime) > checkTimeOut {
 				//delete file
+				f.delTempFile(v.fileName)
 				f.mu.Lock()
 				delete(f.tempFiles, k)
 				f.mu.Unlock()
 				err := os.Remove(v.fileName)
 				if err != nil {
-					server.Log.Errorf("自动清理文件失败:%v", err)
+					server.Log.Errorf("error while delete file:%v", err)
 				}
 			}
 		}
@@ -56,6 +131,11 @@ func (f *FileAccess) checker() {
 	}
 }
 
+// DeleteFile 删除文件
+func (f *FileAccess) DeleteFile(args *rpcs.ArgsDeleteFile, reply *rpcs.ReplyEmptyResult) error {
+	return nil
+}
+
 // MoveFile move a file to new path
 // source:http://192.168.175.60:9000/upload/tmp/2c9d7d85-2266-450a-9d47-28e67703d818.jpeg
 func (f *FileAccess) MoveFile(args *rpcs.ArgsMoveFile, reply *rpcs.ReplyMoveFile) error {
@@ -63,7 +143,6 @@ func (f *FileAccess) MoveFile(args *rpcs.ArgsMoveFile, reply *rpcs.ReplyMoveFile
 	reg := regexp.MustCompile(`tmp/\$*.*`)
 	src := reg.FindString(args.Source)
 	fileName := filepath.Base(src)
-	server.Log.Debug(src)
 	src = *conStaticPath + "/" + src
 	b, err := utils.Exists(src)
 	if err != nil {

+ 4 - 1
services/fileaccess/flags.go

@@ -7,15 +7,18 @@ const (
 	flagMaxSize    = "maxsize"  //最大允许上传的文件大小
 	flagAllowExt   = "allowext" //允许上传的文件格式
 	flagDomain     = "domain"   // 文件服务域名
+	flagRedis      = "redis"    //redis服务
 
 	defaultStaticPath = "upload"
 	defaultMaxSize    = 300 << 10         //默认300K
 	defaultAllowExt   = ".jpeg|.jpg|.png" //注意.号
+	defaultRedisHost  = "192.168.175.60:6379"
 )
 
 var (
 	conStaticPath = flag.String(flagStaticPath, defaultStaticPath, "static file path")
 	conMaxSize    = flag.Int(flagMaxSize, defaultMaxSize, "允许上传的最大文件尺寸")
 	conAllowExt   = flag.String(flagAllowExt, defaultAllowExt, "允许上传的文件格式")
-	conDomain     = flag.String(flagDomain, "", "文件服务器域名")
+	conDomain     = flag.String(flagDomain, "", "文件服务器域名 http://xxxxxx.xxx")
+	conRedisHost  = flag.String(flagRedis, defaultRedisHost, "redis 服务器地址 ip:host")
 )

+ 1 - 1
services/fileaccess/main.go

@@ -14,7 +14,7 @@ func main() {
 		return
 	}
 
-	fileaccess := NewFileAccess()
+	fileaccess := NewFileAccess(*conRedisHost)
 	err = server.RegisterRPCHandler(fileaccess)
 	if err != nil {
 		server.Log.Errorf("RegisterRPCHandler Error: %s", err)