b_file.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. package internal
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/go-redis/redis/v8"
  6. "github.com/gogf/gf/frame/g"
  7. "github.com/gogf/gf/util/guid"
  8. "gxt-file-server/app/agent"
  9. context2 "gxt-file-server/app/context"
  10. "gxt-file-server/app/errors"
  11. "gxt-file-server/app/model"
  12. "gxt-file-server/app/schema"
  13. "log"
  14. "net/http"
  15. "net/url"
  16. "strings"
  17. "time"
  18. )
  19. type File struct {
  20. historyModel model.IFileHistory
  21. chunkModel model.IFileChunk
  22. redisCli *redis.Client
  23. transModel model.ITrans
  24. }
  25. func NewFile(
  26. hm model.IFileHistory,
  27. cm model.IFileChunk,
  28. redisCli *redis.Client,
  29. tm model.ITrans,
  30. ) *File {
  31. return &File{
  32. historyModel: hm,
  33. chunkModel: cm,
  34. redisCli: redisCli,
  35. transModel: tm,
  36. }
  37. }
  38. //TODO 文件上传时候的文件名是否重命名
  39. // Upload 文件上传
  40. func (f *File) Upload(ctx context.Context, r *http.Request, formKey, basePath string) (*schema.FileInfo, error) {
  41. uuid := guid.S()
  42. base := g.Cfg().GetString("agent.DefaultFilePathPrefix")
  43. ctx = context2.NewFileNameContext(ctx, func(fileName string) string {
  44. return fmt.Sprintf("%s/%s/%s/%s",
  45. base, strings.ToLower(basePath), uuid, fileName)
  46. })
  47. if hash, b := context2.FromFileHashContext(ctx); b {
  48. result, err := f.historyModel.Query(ctx, schema.FileHistoryQueryParam{Hash: hash.(string)})
  49. if err != nil {
  50. return nil, err
  51. }
  52. if len(result.Data) > 0 {
  53. v := result.Data[0]
  54. return &schema.FileInfo{
  55. URL: v.Path,
  56. Size: v.FileSize,
  57. Name: v.FileName,
  58. Hash: v.Hash,
  59. }, nil
  60. }
  61. }
  62. infos, err := agent.DefaultAgent().Upload(ctx, r, formKey)
  63. if err != nil {
  64. return nil, err
  65. } else if len(infos) == 0 {
  66. return nil, nil
  67. }
  68. fullName := fmt.Sprintf("%s/%s/%s/%s", base, strings.ToLower(basePath), uuid, url.PathEscape(f.getFileName(infos[0].Name())))
  69. if fullName[0] != '/' {
  70. fullName = "/" + fullName
  71. }
  72. isPersistent := schema.FALSE
  73. if v, b := context2.FromFileExpireContext(ctx); b {
  74. nV := v.(int)
  75. if nV == -1 {
  76. isPersistent = schema.TRUE
  77. }
  78. }
  79. err = f.historyModel.Create(ctx, schema.FileHistory{
  80. RecordID: guid.S(),
  81. Hash: infos[0].Hash(),
  82. Path: fullName,
  83. Creator: "",
  84. IsPersistent: isPersistent,
  85. FileSize: infos[0].Size(),
  86. FileName: infos[0].Name(),
  87. FileHash: infos[0].Hash(),
  88. })
  89. if err != nil {
  90. return nil, err
  91. }
  92. info := &schema.FileInfo{
  93. URL: fullName,
  94. Name: infos[0].Name(),
  95. Size: infos[0].Size(),
  96. Hash: infos[0].Hash(),
  97. }
  98. return info, nil
  99. }
  100. // Download 获取文件二进制流
  101. func (f *File) Download(ctx context.Context, filePath string) ([]byte, string, error) {
  102. return agent.DefaultAgent().Get(ctx, filePath)
  103. }
  104. // 修正文件名,将半角 % 替换为全角 %(不替换的话文件将无法从浏览器中打开)
  105. func (f *File) getFileName(fileName string) string {
  106. return strings.ReplaceAll(fileName, "%", "%")
  107. }
  108. // Persistent 持久化文件
  109. func (f *File) Persistent(ctx context.Context, hash string) error {
  110. if err := agent.DefaultAgent().Persistent(ctx, hash); err != nil {
  111. return err
  112. }
  113. result, err := f.historyModel.Query(ctx, schema.FileHistoryQueryParam{
  114. Hash: hash,
  115. IsPersistent: schema.FALSE,
  116. })
  117. if err != nil {
  118. return err
  119. }
  120. if len(result.Data) > 0 {
  121. v := result.Data[0]
  122. v.IsPersistent = schema.TRUE
  123. err := f.historyModel.Update(ctx, v.RecordID, *v)
  124. if err != nil {
  125. return err
  126. }
  127. }
  128. return nil
  129. }
  130. // ChunkUpload 文件分块上传
  131. func (f *File) ChunkUpload(ctx context.Context, req schema.FileChunkParams) (*schema.FileChunkInfo, error) {
  132. base := g.Cfg().GetString("agent.DefaultFilePathPrefix")
  133. chunkBase := g.Cfg().GetString("agent.DefaultChunkExpireTime")
  134. ctx = context2.NewFileNameContext(ctx, func(fileName string) string {
  135. return fmt.Sprintf("%s/%s/%s/%s_%d",
  136. base, strings.ToLower(req.BaseUrl), req.Hash, req.Hash, req.Index)
  137. })
  138. // check file exist
  139. result, err := f.historyModel.Query(ctx, schema.FileHistoryQueryParam{Hash: req.Hash})
  140. if err != nil {
  141. return nil, err
  142. }
  143. if len(result.Data) > 0 {
  144. v := result.Data[0]
  145. return &schema.FileChunkInfo{
  146. URL: v.Path,
  147. Name: v.FileName,
  148. Hash: v.Hash,
  149. IsComplete: 2,
  150. }, nil
  151. }
  152. // check file chunk exist
  153. chunks, err := f.chunkModel.Query(ctx, schema.FileChunkQueryParam{Hash: req.Hash, Current: req.Index})
  154. if err != nil {
  155. return nil, err
  156. }
  157. if len(chunks.Data) > 0 {
  158. v := chunks.Data[0]
  159. return &schema.FileChunkInfo{
  160. Total: v.Total,
  161. Current: v.Current,
  162. URL: v.Path,
  163. Name: v.Name,
  164. Hash: v.Hash,
  165. IsComplete: 1,
  166. }, nil
  167. }
  168. //分块上传不经过redis,每天扫表查询过期的文件块
  169. ctx = context2.NewFileExpireContext(ctx, -1)
  170. infos, err := agent.DefaultAgent().Upload(ctx, req.HttpRequest, req.FormKey)
  171. if err != nil {
  172. return nil, err
  173. } else if len(infos) == 0 {
  174. return nil, nil
  175. }
  176. fullName := fmt.Sprintf("%s/%s/%s/%s_%d", base, strings.ToLower(req.BaseUrl), req.Hash, req.Hash, req.Index)
  177. if fullName[0] != '/' {
  178. fullName = "/" + fullName
  179. }
  180. //设置块的过期时间
  181. pastTime, err := time.ParseDuration(chunkBase + "s")
  182. if err != nil {
  183. return nil, err
  184. }
  185. data := &schema.FileChunk{
  186. RecordID: guid.S(),
  187. Hash: req.Hash,
  188. Total: req.Total,
  189. Current: req.Index,
  190. Name: infos[0].Name(),
  191. Path: fullName,
  192. Size: infos[0].Size(),
  193. FileHash: infos[0].Hash(),
  194. PastTime: time.Now().Add(pastTime),
  195. }
  196. err = f.chunkModel.Create(ctx, *data)
  197. if err != nil {
  198. return nil, err
  199. }
  200. return &schema.FileChunkInfo{
  201. Current: data.Current,
  202. Total: data.Total,
  203. URL: data.Path,
  204. Name: data.Name,
  205. Hash: data.Hash,
  206. IsComplete: 1,
  207. }, nil
  208. }
  209. //FileMerge 文件合并
  210. func (f *File) FileMerge(ctx context.Context, req schema.FileMergeParams) (*schema.FileInfo, error) {
  211. if req.Total <= 0 || req.Total > 1000 {
  212. //minio中最多可操作1000个对象
  213. return nil, errors.New("非法的块数")
  214. }
  215. // check file exist
  216. hisResult, err := f.historyModel.Query(ctx, schema.FileHistoryQueryParam{Hash: req.Hash})
  217. if err != nil {
  218. return nil, err
  219. }
  220. if len(hisResult.Data) > 0 {
  221. v := hisResult.Data[0]
  222. return &schema.FileInfo{
  223. URL: v.Path,
  224. Name: v.FileName,
  225. Hash: v.Hash,
  226. Size: v.FileSize,
  227. }, nil
  228. }
  229. result, err := f.chunkModel.Query(ctx, schema.FileChunkQueryParam{
  230. Hash: req.Hash,
  231. })
  232. if err != nil {
  233. return nil, err
  234. }
  235. if len(result.Data) < req.Total {
  236. return nil, errors.New400Response("不完整的文件对象")
  237. }
  238. filePaths := result.Data.FileChunkToPath()
  239. base := g.Cfg().GetString("agent.DefaultFilePathPrefix")
  240. fullName := fmt.Sprintf("%s/%s/%s/%s", base, req.BaseUrl, req.Hash, req.FileName)
  241. //分片合并
  242. err = agent.DefaultAgent().ComposeObject(ctx, filePaths, fullName)
  243. if err != nil {
  244. return nil, err
  245. }
  246. //分片删除
  247. for i := range filePaths {
  248. err = agent.DefaultAgent().RemoveObject(ctx, filePaths[i])
  249. if err != nil {
  250. return nil, err
  251. }
  252. }
  253. fileInfo, err := agent.DefaultAgent().Stat(ctx, fullName)
  254. if err != nil {
  255. return nil, err
  256. }
  257. err = ExecTrans(ctx, f.transModel, func(ctx context.Context) error {
  258. err = f.chunkModel.DeleteHash(ctx, req.Hash)
  259. if err != nil {
  260. return err
  261. }
  262. return f.historyModel.Create(ctx, schema.FileHistory{
  263. RecordID: guid.S(),
  264. Hash: req.Hash,
  265. Path: fullName,
  266. Creator: "",
  267. IsPersistent: 2,
  268. FileSize: fileInfo.Size,
  269. FileName: req.FileName,
  270. FileHash: fileInfo.Hash,
  271. })
  272. })
  273. //设置文件过期
  274. agent.DefaultAgent().SetDefaultExpireTime(ctx, fileInfo.Hash, fullName)
  275. if err != nil {
  276. return nil, err
  277. }
  278. return fileInfo, nil
  279. }
  280. //ClearChunks 清理过期的文件分块
  281. func (f *File) ClearChunks(ctx context.Context) {
  282. results, err := f.chunkModel.Query(ctx, schema.FileChunkQueryParam{
  283. IsClear: true,
  284. })
  285. if err != nil {
  286. log.Fatalln(err)
  287. }
  288. for _, v := range results.Data {
  289. //删除文件
  290. err = agent.DefaultAgent().RemoveObject(ctx, v.Path)
  291. if err != nil {
  292. log.Fatalln(err)
  293. }
  294. //删除数据
  295. err = f.chunkModel.Delete(ctx, v.RecordID)
  296. if err != nil {
  297. log.Fatalln(err)
  298. return
  299. }
  300. }
  301. }