agent.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package agent
  2. import (
  3. "context"
  4. "errors"
  5. "github.com/go-redis/redis/v8"
  6. "github.com/gogf/gf/os/grpool"
  7. context2 "gxt-file-server/app/context"
  8. "gxt-file-server/app/model"
  9. "gxt-file-server/app/schema"
  10. "gxt-file-server/pkg/logger"
  11. "gxt-file-server/pkg/store"
  12. "mime/multipart"
  13. "net/http"
  14. "os"
  15. "time"
  16. )
  17. var (
  18. // ErrMissingFile no such file
  19. ErrMissingFile = errors.New("no such file")
  20. defaultAgent = NewAgent()
  21. )
  22. // Agent 文件服务代理
  23. type Agent struct {
  24. config *Config
  25. backend store.Backend
  26. redisCli *redis.Client
  27. workPool *grpool.Pool
  28. fhm model.IFileHistory
  29. }
  30. type Config struct {
  31. DefaultExpireTime int // 默认临时文件过期时间(小时)
  32. MaxMemory int64
  33. }
  34. const (
  35. defaultMaxMemory = 32 << 20 // 32 MB
  36. )
  37. func DefaultAgent() *Agent {
  38. return defaultAgent
  39. }
  40. func NewAgent() *Agent {
  41. return &Agent{
  42. workPool: grpool.New(50),
  43. }
  44. }
  45. func (a *Agent) SetConfig(config *Config) {
  46. a.config = config
  47. }
  48. func (a *Agent) SetFileHistoryModel(fhm model.IFileHistory) {
  49. a.fhm = fhm
  50. }
  51. func (a *Agent) SetRedisClient(rdc *redis.Client) {
  52. a.redisCli = rdc
  53. }
  54. func (a *Agent) SetBackend(backend store.Backend) {
  55. a.backend = backend
  56. }
  57. func (a *Agent) maxMemory() int64 {
  58. if mm := a.config.MaxMemory; mm > 0 {
  59. return mm
  60. }
  61. return defaultMaxMemory
  62. }
  63. func (a *Agent) Upload(ctx context.Context, r *http.Request, key string) ([]store.FileInfo, error) {
  64. if r.MultipartForm == nil {
  65. err := r.ParseMultipartForm(a.maxMemory())
  66. if err != nil {
  67. return nil, err
  68. }
  69. }
  70. if r.MultipartForm == nil || r.MultipartForm.File == nil {
  71. return nil, ErrMissingFile
  72. }
  73. var infos []store.FileInfo
  74. for _, file := range r.MultipartForm.File[key] {
  75. info, err := a.doUpload(ctx, file)
  76. if err != nil {
  77. return infos, err
  78. }
  79. infos = append(infos, info)
  80. }
  81. return infos, nil
  82. }
  83. func (a *Agent) doUpload(ctx context.Context, fheader *multipart.FileHeader) (store.FileInfo, error) {
  84. file, err := fheader.Open()
  85. if err != nil {
  86. return nil, err
  87. }
  88. defer file.Close()
  89. size, err := a.fileSize(file)
  90. if err != nil {
  91. return nil, err
  92. }
  93. var fullName string
  94. if h, ok := context2.FromFileNameContext(ctx); ok {
  95. fullName = h(fheader.Filename)
  96. } else {
  97. fullName = fheader.Filename
  98. }
  99. hash, err := a.backend.Store(ctx, fullName, file, size)
  100. if err != nil {
  101. return nil, err
  102. }
  103. a.SetDefaultExpireTime(ctx, hash, fullName)
  104. return &fileInfo{
  105. fullName: fullName,
  106. name: fheader.Filename,
  107. size: size,
  108. hash: hash,
  109. }, nil
  110. }
  111. //SetDefaultExpireTime redis设置文件过期
  112. func (a *Agent) SetDefaultExpireTime(ctx context.Context, hash, fullName string) {
  113. var defaultExpireTime time.Duration
  114. if ctx == nil {
  115. ctx = context.Background()
  116. defaultExpireTime = time.Duration(a.config.DefaultExpireTime) * time.Hour
  117. } else {
  118. if v, b := context2.FromFileExpireContext(ctx); b {
  119. nV := v.(int)
  120. defaultExpireTime = time.Duration(nV) * time.Hour
  121. } else {
  122. defaultExpireTime = time.Duration(a.config.DefaultExpireTime) * time.Hour
  123. }
  124. }
  125. if defaultExpireTime > 0 {
  126. a.redisCli.Set(ctx, hash, fullName, defaultExpireTime)
  127. }
  128. }
  129. func (a *Agent) fileSize(file multipart.File) (int64, error) {
  130. var size int64
  131. if fsize, ok := file.(fsize); ok {
  132. size = fsize.Size()
  133. } else if fstat, ok := file.(fstat); ok {
  134. stat, err := fstat.Stat()
  135. if err != nil {
  136. return 0, err
  137. }
  138. size = stat.Size()
  139. }
  140. return size, nil
  141. }
  142. // Persistent persistent file
  143. func (a *Agent) Persistent(ctx context.Context, hash string) error {
  144. return a.redisCli.Del(ctx, hash).Err()
  145. }
  146. func (a *Agent) Start(ctx context.Context) error {
  147. channel := "__keyevent@0__:expired"
  148. pubSub := a.redisCli.PSubscribe(ctx, channel)
  149. go func() {
  150. _, err := pubSub.Receive(ctx)
  151. if err != nil {
  152. panic(err)
  153. }
  154. ch := pubSub.Channel()
  155. for msg := range ch {
  156. //
  157. fileHash := msg.Payload
  158. logger.Debugf(ctx, "文件过期删除:[%s]", fileHash)
  159. _ = a.workPool.Add(func() {
  160. data, err := a.fhm.Query(ctx, schema.FileHistoryQueryParam{FileHash: msg.Payload})
  161. if err != nil {
  162. return
  163. }
  164. if len(data.Data) > 0 {
  165. v := data.Data[0]
  166. _ = a.backend.Delete(ctx, v.Path)
  167. _ = a.fhm.Delete(ctx, v.RecordID)
  168. }
  169. })
  170. }
  171. }()
  172. return nil
  173. }
  174. func (a *Agent) Get(ctx context.Context, filePath string) ([]byte, string, error) {
  175. return a.backend.Get(ctx, filePath)
  176. }
  177. type fsize interface {
  178. Size() int64
  179. }
  180. type fstat interface {
  181. Stat() (os.FileInfo, error)
  182. }
  183. type fileInfo struct {
  184. fullName string
  185. name string
  186. size int64
  187. hash string
  188. }
  189. func (fi *fileInfo) Hash() string {
  190. return fi.hash
  191. }
  192. func (fi *fileInfo) FullName() string {
  193. return fi.fullName
  194. }
  195. func (fi *fileInfo) Name() string {
  196. return fi.name
  197. }
  198. func (fi *fileInfo) Size() int64 {
  199. return fi.size
  200. }
  201. // ComposeObject 通过使用服务端拷贝实现钭多个源对象合并创建成一个新的对象。
  202. func (a *Agent) ComposeObject(ctx context.Context, pathS []string, filePath string) error {
  203. return a.backend.ComposeObject(ctx, pathS, filePath)
  204. }
  205. //RemoveObject 删除文件
  206. func (a *Agent) RemoveObject(ctx context.Context, filePath string) error {
  207. return a.backend.RemoveObject(ctx, filePath)
  208. }
  209. //Stat 查看文件信息
  210. func (a *Agent) Stat(ctx context.Context, filePath string) (*schema.FileInfo, error) {
  211. stat, err := a.backend.Stat(filePath)
  212. if err != nil {
  213. return nil, err
  214. }
  215. if stat.Err != nil {
  216. return nil, errors.New("未找到文件信息")
  217. }
  218. return &schema.FileInfo{
  219. URL: filePath,
  220. Name: stat.Key,
  221. Hash: stat.ETag,
  222. Size: stat.Size,
  223. }, nil
  224. }