package agent import ( "context" "errors" "github.com/go-redis/redis/v8" "github.com/gogf/gf/os/grpool" context2 "gxt-file-server/app/context" "gxt-file-server/app/model" "gxt-file-server/app/schema" "gxt-file-server/pkg/logger" "gxt-file-server/pkg/store" "mime/multipart" "net/http" "os" "time" ) var ( // ErrMissingFile no such file ErrMissingFile = errors.New("no such file") defaultAgent = NewAgent() ) // Agent 文件服务代理 type Agent struct { config *Config backend store.Backend redisCli *redis.Client workPool *grpool.Pool fhm model.IFileHistory } type Config struct { DefaultExpireTime int // 默认临时文件过期时间(小时) MaxMemory int64 } const ( defaultMaxMemory = 32 << 20 // 32 MB ) func DefaultAgent() *Agent { return defaultAgent } func NewAgent() *Agent { return &Agent{ workPool: grpool.New(50), } } func (a *Agent) SetConfig(config *Config) { a.config = config } func (a *Agent) SetFileHistoryModel(fhm model.IFileHistory) { a.fhm = fhm } func (a *Agent) SetRedisClient(rdc *redis.Client) { a.redisCli = rdc } func (a *Agent) SetBackend(backend store.Backend) { a.backend = backend } func (a *Agent) maxMemory() int64 { if mm := a.config.MaxMemory; mm > 0 { return mm } return defaultMaxMemory } func (a *Agent) Upload(ctx context.Context, r *http.Request, key string) ([]store.FileInfo, error) { if r.MultipartForm == nil { err := r.ParseMultipartForm(a.maxMemory()) if err != nil { return nil, err } } if r.MultipartForm == nil || r.MultipartForm.File == nil { return nil, ErrMissingFile } var infos []store.FileInfo for _, file := range r.MultipartForm.File[key] { info, err := a.doUpload(ctx, file) if err != nil { return infos, err } infos = append(infos, info) } return infos, nil } func (a *Agent) doUpload(ctx context.Context, fheader *multipart.FileHeader) (store.FileInfo, error) { file, err := fheader.Open() if err != nil { return nil, err } defer file.Close() size, err := a.fileSize(file) if err != nil { return nil, err } var fullName string if h, ok := context2.FromFileNameContext(ctx); ok { fullName = h(fheader.Filename) } else { fullName = fheader.Filename } hash, err := a.backend.Store(ctx, fullName, file, size) if err != nil { return nil, errors.New("文件存储失败:" + err.Error()) } a.SetDefaultExpireTime(ctx, hash, fullName) return &fileInfo{ fullName: fullName, name: fheader.Filename, size: size, hash: hash, }, nil } // SetDefaultExpireTime redis设置文件过期 func (a *Agent) SetDefaultExpireTime(ctx context.Context, hash, fullName string) { var defaultExpireTime time.Duration if ctx == nil { ctx = context.Background() defaultExpireTime = time.Duration(a.config.DefaultExpireTime) * time.Hour } else { if v, b := context2.FromFileExpireContext(ctx); b { nV := v.(int) defaultExpireTime = time.Duration(nV) * time.Hour } else { defaultExpireTime = time.Duration(a.config.DefaultExpireTime) * time.Hour } } if defaultExpireTime > 0 { a.redisCli.Set(ctx, hash, fullName, defaultExpireTime) } } func (a *Agent) fileSize(file multipart.File) (int64, error) { var size int64 if fsize, ok := file.(fsize); ok { size = fsize.Size() } else if fstat, ok := file.(fstat); ok { stat, err := fstat.Stat() if err != nil { return 0, err } size = stat.Size() } return size, nil } // Persistent persistent file func (a *Agent) Persistent(ctx context.Context, hash string) error { return a.redisCli.Del(ctx, hash).Err() } func (a *Agent) Start(ctx context.Context) error { channel := "__keyevent@0__:expired" pubSub := a.redisCli.PSubscribe(ctx, channel) go func() { _, err := pubSub.Receive(ctx) if err != nil { panic(err) } ch := pubSub.Channel() for msg := range ch { // fileHash := msg.Payload logger.Debugf(ctx, "文件过期删除:[%s]", fileHash) _ = a.workPool.Add(func() { data, err := a.fhm.Query(ctx, schema.FileHistoryQueryParam{FileHash: msg.Payload}) if err != nil { return } if len(data.Data) > 0 { v := data.Data[0] _ = a.backend.Delete(ctx, v.Path) _ = a.fhm.Delete(ctx, v.RecordID) } }) } }() return nil } func (a *Agent) Get(ctx context.Context, filePath string) ([]byte, string, error) { return a.backend.Get(ctx, filePath) } type fsize interface { Size() int64 } type fstat interface { Stat() (os.FileInfo, error) } type fileInfo struct { fullName string name string size int64 hash string } func (fi *fileInfo) Hash() string { return fi.hash } func (fi *fileInfo) FullName() string { return fi.fullName } func (fi *fileInfo) Name() string { return fi.name } func (fi *fileInfo) Size() int64 { return fi.size } // ComposeObject 通过使用服务端拷贝实现钭多个源对象合并创建成一个新的对象。 func (a *Agent) ComposeObject(ctx context.Context, pathS []string, filePath string) error { return a.backend.ComposeObject(ctx, pathS, filePath) } // RemoveObject 删除文件 func (a *Agent) RemoveObject(ctx context.Context, filePath string) error { return a.backend.RemoveObject(ctx, filePath) } // Stat 查看文件信息 func (a *Agent) Stat(ctx context.Context, filePath string) (*schema.FileInfo, error) { stat, err := a.backend.Stat(filePath) if err != nil { return nil, err } if stat.Err != nil { return nil, errors.New("未找到文件信息") } return &schema.FileInfo{ URL: filePath, Name: stat.Key, Hash: stat.ETag, Size: stat.Size, }, nil }