b_file.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. package internal
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/go-redis/redis/v8"
  6. "github.com/gogf/gf/frame/g"
  7. "github.com/gogf/gf/util/guid"
  8. "gxt-file-server/app/agent"
  9. context2 "gxt-file-server/app/context"
  10. "gxt-file-server/app/errors"
  11. "gxt-file-server/app/model"
  12. "gxt-file-server/app/schema"
  13. "log"
  14. "net/http"
  15. "net/url"
  16. "strings"
  17. "time"
  18. )
  19. type File struct {
  20. historyModel model.IFileHistory
  21. chunkModel model.IFileChunk
  22. redisCli *redis.Client
  23. transModel model.ITrans
  24. }
  25. func NewFile(
  26. hm model.IFileHistory,
  27. cm model.IFileChunk,
  28. redisCli *redis.Client,
  29. tm model.ITrans,
  30. ) *File {
  31. return &File{
  32. historyModel: hm,
  33. chunkModel: cm,
  34. redisCli: redisCli,
  35. transModel: tm,
  36. }
  37. }
  38. //TODO 文件上传时候的文件名是否重命名
  39. // Upload 文件上传
  40. func (f *File) Upload(ctx context.Context, r *http.Request, formKey, basePath string) (*schema.FileInfo, error) {
  41. uuid := guid.S()
  42. base := g.Cfg().GetString("agent.DefaultFilePathPrefix")
  43. ctx = context2.NewFileNameContext(ctx, func(fileName string) string {
  44. return fmt.Sprintf("%s/%s/%s/%s",
  45. base, strings.ToLower(basePath), uuid, fileName)
  46. })
  47. if hash, b := context2.FromFileHashContext(ctx); b {
  48. result, err := f.historyModel.Query(ctx, schema.FileHistoryQueryParam{Hash: hash.(string)})
  49. if err != nil {
  50. return nil, err
  51. }
  52. if len(result.Data) > 0 {
  53. v := result.Data[0]
  54. return &schema.FileInfo{
  55. URL: v.Path,
  56. Size: v.FileSize,
  57. Name: v.FileName,
  58. Hash: v.Hash,
  59. }, nil
  60. }
  61. }
  62. infos, err := agent.DefaultAgent().Upload(ctx, r, formKey)
  63. if err != nil {
  64. return nil, err
  65. } else if len(infos) == 0 {
  66. return nil, nil
  67. }
  68. fmt.Println(infos[0])
  69. fullName := fmt.Sprintf("%s/%s/%s/%s", base, strings.ToLower(basePath), uuid, url.PathEscape(f.getFileName(infos[0].Name())))
  70. if fullName[0] != '/' {
  71. fullName = "/" + fullName
  72. }
  73. isPersistent := schema.FALSE
  74. if v, b := context2.FromFileExpireContext(ctx); b {
  75. nV := v.(int)
  76. if nV == -1 {
  77. isPersistent = schema.TRUE
  78. }
  79. }
  80. err = f.historyModel.Create(ctx, schema.FileHistory{
  81. RecordID: guid.S(),
  82. Hash: infos[0].Hash(),
  83. Path: fullName,
  84. Creator: "",
  85. IsPersistent: isPersistent,
  86. FileSize: infos[0].Size(),
  87. FileName: infos[0].Name(),
  88. FileHash: infos[0].Hash(),
  89. })
  90. if err != nil {
  91. return nil, err
  92. }
  93. info := &schema.FileInfo{
  94. URL: fullName,
  95. Name: infos[0].Name(),
  96. Size: infos[0].Size(),
  97. Hash: infos[0].Hash(),
  98. }
  99. return info, nil
  100. }
  101. // Download 获取文件二进制流
  102. func (f *File) Download(ctx context.Context, filePath string) ([]byte, string, error) {
  103. return agent.DefaultAgent().Get(ctx, filePath)
  104. }
  105. // 修正文件名,将半角 % 替换为全角 %(不替换的话文件将无法从浏览器中打开)
  106. func (f *File) getFileName(fileName string) string {
  107. return strings.ReplaceAll(fileName, "%", "%")
  108. }
  109. // Persistent 持久化文件
  110. func (f *File) Persistent(ctx context.Context, hash string) error {
  111. if err := agent.DefaultAgent().Persistent(ctx, hash); err != nil {
  112. return err
  113. }
  114. result, err := f.historyModel.Query(ctx, schema.FileHistoryQueryParam{
  115. Hash: hash,
  116. IsPersistent: schema.FALSE,
  117. })
  118. if err != nil {
  119. return err
  120. }
  121. if len(result.Data) > 0 {
  122. v := result.Data[0]
  123. v.IsPersistent = schema.TRUE
  124. err := f.historyModel.Update(ctx, v.RecordID, *v)
  125. if err != nil {
  126. return err
  127. }
  128. }
  129. return nil
  130. }
  131. // ChunkUpload 文件分块上传
  132. func (f *File) ChunkUpload(ctx context.Context, req schema.FileChunkParams) (*schema.FileChunkInfo, error) {
  133. base := g.Cfg().GetString("agent.DefaultFilePathPrefix")
  134. chunkBase := g.Cfg().GetString("agent.DefaultChunkExpireTime")
  135. ctx = context2.NewFileNameContext(ctx, func(fileName string) string {
  136. return fmt.Sprintf("%s/%s/%s/%s_%d",
  137. base, strings.ToLower(req.BaseUrl), req.Hash, req.Hash, req.Index)
  138. })
  139. // check file exist
  140. result, err := f.historyModel.Query(ctx, schema.FileHistoryQueryParam{Hash: req.Hash})
  141. if err != nil {
  142. return nil, err
  143. }
  144. if len(result.Data) > 0 {
  145. v := result.Data[0]
  146. return &schema.FileChunkInfo{
  147. URL: v.Path,
  148. Name: v.FileName,
  149. Hash: v.Hash,
  150. IsComplete: 2,
  151. }, nil
  152. }
  153. // check file chunk exist
  154. chunks, err := f.chunkModel.Query(ctx, schema.FileChunkQueryParam{Hash: req.Hash, Current: req.Index})
  155. if err != nil {
  156. return nil, err
  157. }
  158. if len(chunks.Data) > 0 {
  159. v := chunks.Data[0]
  160. return &schema.FileChunkInfo{
  161. Total: v.Total,
  162. Current: v.Current,
  163. URL: v.Path,
  164. Name: v.Name,
  165. Hash: v.Hash,
  166. IsComplete: 1,
  167. }, nil
  168. }
  169. //分块上传不经过redis,每天扫表查询过期的文件块
  170. ctx = context2.NewFileExpireContext(ctx, -1)
  171. infos, err := agent.DefaultAgent().Upload(ctx, req.HttpRequest, req.FormKey)
  172. if err != nil {
  173. return nil, err
  174. } else if len(infos) == 0 {
  175. return nil, nil
  176. }
  177. fullName := fmt.Sprintf("%s/%s/%s/%s_%d", base, strings.ToLower(req.BaseUrl), req.Hash, req.Hash, req.Index)
  178. if fullName[0] != '/' {
  179. fullName = "/" + fullName
  180. }
  181. //设置块的过期时间
  182. pastTime, err := time.ParseDuration(chunkBase + "s")
  183. if err != nil {
  184. return nil, err
  185. }
  186. data := &schema.FileChunk{
  187. RecordID: guid.S(),
  188. Hash: req.Hash,
  189. Total: req.Total,
  190. Current: req.Index,
  191. Name: infos[0].Name(),
  192. Path: fullName,
  193. Size: infos[0].Size(),
  194. FileHash: infos[0].Hash(),
  195. PastTime: time.Now().Add(pastTime),
  196. }
  197. err = f.chunkModel.Create(ctx, *data)
  198. if err != nil {
  199. return nil, err
  200. }
  201. return &schema.FileChunkInfo{
  202. Current: data.Current,
  203. Total: data.Total,
  204. URL: data.Path,
  205. Name: data.Name,
  206. Hash: data.Hash,
  207. IsComplete: 1,
  208. }, nil
  209. }
  210. //FileMerge 文件合并
  211. func (f *File) FileMerge(ctx context.Context, req schema.FileMergeParams) (*schema.FileInfo, error) {
  212. if req.Total <= 0 || req.Total > 1000 {
  213. //minio中最多可操作1000个对象
  214. return nil, errors.New("非法的块数")
  215. }
  216. // check file exist
  217. hisResult, err := f.historyModel.Query(ctx, schema.FileHistoryQueryParam{Hash: req.Hash})
  218. if err != nil {
  219. return nil, err
  220. }
  221. if len(hisResult.Data) > 0 {
  222. v := hisResult.Data[0]
  223. return &schema.FileInfo{
  224. URL: v.Path,
  225. Name: v.FileName,
  226. Hash: v.Hash,
  227. Size: v.FileSize,
  228. }, nil
  229. }
  230. result, err := f.chunkModel.Query(ctx, schema.FileChunkQueryParam{
  231. Hash: req.Hash,
  232. })
  233. if err != nil {
  234. return nil, err
  235. }
  236. if len(result.Data) < req.Total {
  237. return nil, errors.New400Response("不完整的文件对象")
  238. }
  239. filePaths := result.Data.FileChunkToPath()
  240. base := g.Cfg().GetString("agent.DefaultFilePathPrefix")
  241. fullName := fmt.Sprintf("%s/%s/%s/%s", base, req.BaseUrl, req.Hash, req.FileName)
  242. //分片合并
  243. err = agent.DefaultAgent().ComposeObject(ctx, filePaths, fullName)
  244. if err != nil {
  245. return nil, err
  246. }
  247. //分片删除
  248. for i := range filePaths {
  249. err = agent.DefaultAgent().RemoveObject(ctx, filePaths[i])
  250. if err != nil {
  251. return nil, err
  252. }
  253. }
  254. fileInfo, err := agent.DefaultAgent().Stat(ctx, fullName)
  255. if err != nil {
  256. return nil, err
  257. }
  258. err = ExecTrans(ctx, f.transModel, func(ctx context.Context) error {
  259. err = f.chunkModel.DeleteHash(ctx, req.Hash)
  260. if err != nil {
  261. return err
  262. }
  263. return f.historyModel.Create(ctx, schema.FileHistory{
  264. RecordID: guid.S(),
  265. Hash: req.Hash,
  266. Path: fullName,
  267. Creator: "",
  268. IsPersistent: 2,
  269. FileSize: fileInfo.Size,
  270. FileName: req.FileName,
  271. FileHash: fileInfo.Hash,
  272. })
  273. })
  274. //设置文件过期
  275. agent.DefaultAgent().SetDefaultExpireTime(ctx, fileInfo.Hash, fullName)
  276. if err != nil {
  277. return nil, err
  278. }
  279. return fileInfo, nil
  280. }
  281. //ClearChunks 清理过期的文件分块
  282. func (f *File) ClearChunks(ctx context.Context) {
  283. results, err := f.chunkModel.Query(ctx, schema.FileChunkQueryParam{
  284. IsClear: true,
  285. })
  286. if err != nil {
  287. log.Fatalln(err)
  288. }
  289. for _, v := range results.Data {
  290. //删除文件
  291. err = agent.DefaultAgent().RemoveObject(ctx, v.Path)
  292. if err != nil {
  293. log.Fatalln(err)
  294. }
  295. //删除数据
  296. err = f.chunkModel.Delete(ctx, v.RecordID)
  297. if err != nil {
  298. log.Fatalln(err)
  299. return
  300. }
  301. }
  302. }