package internal import ( "context" "fmt" "github.com/go-redis/redis/v8" "github.com/gogf/gf/frame/g" "github.com/gogf/gf/util/guid" "gxt-file-server/app/agent" context2 "gxt-file-server/app/context" "gxt-file-server/app/errors" "gxt-file-server/app/model" "gxt-file-server/app/schema" "log" "net/http" "net/url" "strings" "time" ) type File struct { historyModel model.IFileHistory chunkModel model.IFileChunk redisCli *redis.Client transModel model.ITrans } func NewFile( hm model.IFileHistory, cm model.IFileChunk, redisCli *redis.Client, tm model.ITrans, ) *File { return &File{ historyModel: hm, chunkModel: cm, redisCli: redisCli, transModel: tm, } } //TODO 文件上传时候的文件名是否重命名 // Upload 文件上传 func (f *File) Upload(ctx context.Context, r *http.Request, formKey, basePath string) (*schema.FileInfo, error) { uuid := guid.S() base := g.Cfg().GetString("agent.DefaultFilePathPrefix") ctx = context2.NewFileNameContext(ctx, func(fileName string) string { return fmt.Sprintf("%s/%s/%s/%s", base, strings.ToLower(basePath), uuid, fileName) }) if hash, b := context2.FromFileHashContext(ctx); b { result, err := f.historyModel.Query(ctx, schema.FileHistoryQueryParam{Hash: hash.(string)}) if err != nil { return nil, err } if len(result.Data) > 0 { v := result.Data[0] return &schema.FileInfo{ URL: v.Path, Size: v.FileSize, Name: v.FileName, Hash: v.Hash, }, nil } } infos, err := agent.DefaultAgent().Upload(ctx, r, formKey) if err != nil { return nil, err } else if len(infos) == 0 { return nil, nil } fullName := fmt.Sprintf("%s/%s/%s/%s", base, strings.ToLower(basePath), uuid, url.PathEscape(f.getFileName(infos[0].Name()))) if fullName[0] != '/' { fullName = "/" + fullName } isPersistent := schema.FALSE if v, b := context2.FromFileExpireContext(ctx); b { nV := v.(int) if nV == -1 { isPersistent = schema.TRUE } } err = f.historyModel.Create(ctx, schema.FileHistory{ RecordID: guid.S(), Hash: infos[0].Hash(), Path: fullName, Creator: "", IsPersistent: isPersistent, FileSize: infos[0].Size(), FileName: infos[0].Name(), FileHash: infos[0].Hash(), }) if err != nil { return nil, err } info := &schema.FileInfo{ URL: fullName, Name: infos[0].Name(), Size: infos[0].Size(), Hash: infos[0].Hash(), } return info, nil } // Download 获取文件二进制流 func (f *File) Download(ctx context.Context, filePath string) ([]byte, string, error) { return agent.DefaultAgent().Get(ctx, filePath) } // 修正文件名,将半角 % 替换为全角 %(不替换的话文件将无法从浏览器中打开) func (f *File) getFileName(fileName string) string { return strings.ReplaceAll(fileName, "%", "%") } // Persistent 持久化文件 func (f *File) Persistent(ctx context.Context, hash string) error { if err := agent.DefaultAgent().Persistent(ctx, hash); err != nil { return err } result, err := f.historyModel.Query(ctx, schema.FileHistoryQueryParam{ Hash: hash, IsPersistent: schema.FALSE, }) if err != nil { return err } if len(result.Data) > 0 { v := result.Data[0] v.IsPersistent = schema.TRUE err := f.historyModel.Update(ctx, v.RecordID, *v) if err != nil { return err } } return nil } // ChunkUpload 文件分块上传 func (f *File) ChunkUpload(ctx context.Context, req schema.FileChunkParams) (*schema.FileChunkInfo, error) { base := g.Cfg().GetString("agent.DefaultFilePathPrefix") chunkBase := g.Cfg().GetString("agent.DefaultChunkExpireTime") ctx = context2.NewFileNameContext(ctx, func(fileName string) string { return fmt.Sprintf("%s/%s/%s/%s_%d", base, strings.ToLower(req.BaseUrl), req.Hash, req.Hash, req.Index) }) // check file exist result, err := f.historyModel.Query(ctx, schema.FileHistoryQueryParam{Hash: req.Hash}) if err != nil { return nil, err } if len(result.Data) > 0 { v := result.Data[0] return &schema.FileChunkInfo{ URL: v.Path, Name: v.FileName, Hash: v.Hash, IsComplete: 2, }, nil } // check file chunk exist chunks, err := f.chunkModel.Query(ctx, schema.FileChunkQueryParam{Hash: req.Hash, Current: req.Index}) if err != nil { return nil, err } if len(chunks.Data) > 0 { v := chunks.Data[0] return &schema.FileChunkInfo{ Total: v.Total, Current: v.Current, URL: v.Path, Name: v.Name, Hash: v.Hash, IsComplete: 1, }, nil } //分块上传不经过redis,每天扫表查询过期的文件块 ctx = context2.NewFileExpireContext(ctx, -1) infos, err := agent.DefaultAgent().Upload(ctx, req.HttpRequest, req.FormKey) if err != nil { return nil, err } else if len(infos) == 0 { return nil, nil } fullName := fmt.Sprintf("%s/%s/%s/%s_%d", base, strings.ToLower(req.BaseUrl), req.Hash, req.Hash, req.Index) if fullName[0] != '/' { fullName = "/" + fullName } //设置块的过期时间 pastTime, err := time.ParseDuration(chunkBase + "s") if err != nil { return nil, err } data := &schema.FileChunk{ RecordID: guid.S(), Hash: req.Hash, Total: req.Total, Current: req.Index, Name: infos[0].Name(), Path: fullName, Size: infos[0].Size(), FileHash: infos[0].Hash(), PastTime: time.Now().Add(pastTime), } err = f.chunkModel.Create(ctx, *data) if err != nil { return nil, err } return &schema.FileChunkInfo{ Current: data.Current, Total: data.Total, URL: data.Path, Name: data.Name, Hash: data.Hash, IsComplete: 1, }, nil } //FileMerge 文件合并 func (f *File) FileMerge(ctx context.Context, req schema.FileMergeParams) (*schema.FileInfo, error) { if req.Total <= 0 || req.Total > 1000 { //minio中最多可操作1000个对象 return nil, errors.New("非法的块数") } // check file exist hisResult, err := f.historyModel.Query(ctx, schema.FileHistoryQueryParam{Hash: req.Hash}) if err != nil { return nil, err } if len(hisResult.Data) > 0 { v := hisResult.Data[0] return &schema.FileInfo{ URL: v.Path, Name: v.FileName, Hash: v.Hash, Size: v.FileSize, }, nil } result, err := f.chunkModel.Query(ctx, schema.FileChunkQueryParam{ Hash: req.Hash, }) if err != nil { return nil, err } if len(result.Data) < req.Total { return nil, errors.New400Response("不完整的文件对象") } filePaths := result.Data.FileChunkToPath() base := g.Cfg().GetString("agent.DefaultFilePathPrefix") fullName := fmt.Sprintf("%s/%s/%s/%s", base, req.BaseUrl, req.Hash, req.FileName) //分片合并 err = agent.DefaultAgent().ComposeObject(ctx, filePaths, fullName) if err != nil { return nil, err } //分片删除 for i := range filePaths { err = agent.DefaultAgent().RemoveObject(ctx, filePaths[i]) if err != nil { return nil, err } } fileInfo, err := agent.DefaultAgent().Stat(ctx, fullName) if err != nil { return nil, err } err = ExecTrans(ctx, f.transModel, func(ctx context.Context) error { err = f.chunkModel.DeleteHash(ctx, req.Hash) if err != nil { return err } return f.historyModel.Create(ctx, schema.FileHistory{ RecordID: guid.S(), Hash: req.Hash, Path: fullName, Creator: "", IsPersistent: 2, FileSize: fileInfo.Size, FileName: req.FileName, FileHash: fileInfo.Hash, }) }) //设置文件过期 agent.DefaultAgent().SetDefaultExpireTime(ctx, fileInfo.Hash, fullName) if err != nil { return nil, err } return fileInfo, nil } //ClearChunks 清理过期的文件分块 func (f *File) ClearChunks(ctx context.Context) { results, err := f.chunkModel.Query(ctx, schema.FileChunkQueryParam{ IsClear: true, }) if err != nil { log.Fatalln(err) } for _, v := range results.Data { //删除文件 err = agent.DefaultAgent().RemoveObject(ctx, v.Path) if err != nil { log.Fatalln(err) } //删除数据 err = f.chunkModel.Delete(ctx, v.RecordID) if err != nil { log.Fatalln(err) return } } }