| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- package internal
- import (
- "context"
- "fmt"
- "path/filepath"
- "strings"
- "time"
- "github.com/gogf/gf/v2/os/glog"
- "github.com/gogf/gf/v2/util/guid"
- "yx-dataset-server/app/errors"
- "yx-dataset-server/app/model"
- "yx-dataset-server/app/schema"
- "yx-dataset-server/library/ragflow"
- )
- // NewDatasetFile 创建DatasetFile
- func NewDatasetFile(
- mDatasetFile model.IDatasetFile,
- mDataset model.IDataset,
- mTrans model.ITrans,
- mUser model.IUser,
- ) *DatasetFile {
- return &DatasetFile{
- DatasetFileModel: mDatasetFile,
- datasetModel: mDataset,
- transModel: mTrans,
- userModel: mUser,
- }
- }
- // DatasetFile 创建DatasetFile对象
- type DatasetFile struct {
- DatasetFileModel model.IDatasetFile
- datasetModel model.IDataset
- transModel model.ITrans
- userModel model.IUser
- }
- // Query 查询数据
- func (a *DatasetFile) Query(ctx context.Context, params schema.DatasetFileQueryParam, opts ...schema.DatasetFileQueryOptions) (*schema.DatasetFileQueryResult, error) {
- result, err := a.DatasetFileModel.Query(ctx, params, opts...)
- if err != nil {
- return nil, err
- }
- if len(result.Data) > 0 {
- user, err := a.userModel.Query(ctx, schema.UserQueryParam{RoleCode: []string{"11", "12"}})
- if err != nil {
- return nil, err
- }
- result.Data.FillCreator(user.Data)
- }
- return result, nil
- }
- // Get 查询指定数据
- func (a *DatasetFile) Get(ctx context.Context, recordID string, opts ...schema.DatasetFileQueryOptions) (*schema.DatasetFile, error) {
- item, err := a.DatasetFileModel.Get(ctx, recordID, opts...)
- if err != nil {
- return nil, err
- } else if item == nil {
- return nil, errors.ErrNotFound
- }
- return item, nil
- }
- func (a *DatasetFile) getUpdate(ctx context.Context, recordID string) (*schema.DatasetFile, error) {
- return a.Get(ctx, recordID)
- }
- // Create 创建数据
- // 流程:
- // 1. 事务内:写 DatasetFile、累加 Dataset.FileCount、调用 ragflow 上传与解析;
- // 2. 事务成功后,异步轮询 ragflow 获取解析状态;
- //
- // 注意:
- // - 上传与解析只发生在事务中一次;
- // - 轮询使用独立 background context,避免父请求结束后 ctx 取消导致的 ragflow 调用失败;
- // - 只有事务成功才会启动轮询。
- func (a *DatasetFile) Create(ctx context.Context, item schema.DatasetFile) error {
- item.RecordID = guid.S()
- dataset, err := a.datasetModel.Get(ctx, item.DatasetId)
- if err != nil {
- return err
- }
- if dataset == nil || dataset.RagDataId == "" {
- return errors.New("知识库不存在")
- }
- err = ExecTrans(ctx, a.transModel, func(ctx context.Context) error {
- fileInfo := ragflow.FileInfo{
- FileName: item.Name,
- Url: fmt.Sprintf("%s%s", "https://app.yongxulvjian.com", item.Url),
- }
- uploadResp, err := ragflow.GetHttpClient().UploadDocument(ctx, dataset.RagDataId, ragflow.UploadFileReq{
- File: []*ragflow.FileInfo{&fileInfo},
- })
- if err != nil {
- return errors.New(fmt.Sprintf("文件上传失败:%s", err.Error()))
- }
- if uploadResp == nil || len(uploadResp.Data) == 0 {
- return errors.New("文件上传失败:ragflow 返回为空")
- }
- item.RagFileId = uploadResp.Data[0].ID
- item.ParseStatus = 1
- if _, err := ragflow.GetHttpClient().ParseDocuments(ctx, dataset.RagDataId, []string{item.RagFileId}); err != nil {
- glog.Errorf(ctx, "文件解析触发失败:%s", err.Error())
- }
- if err := a.DatasetFileModel.Create(ctx, item); err != nil {
- return err
- }
- dataset.FileCount += 1
- return a.datasetModel.Update(ctx, dataset.RecordID, *dataset)
- })
- if err != nil {
- return err
- }
- // 异步轮询解析状态,使用 Background 避免请求 ctx 提前取消
- go a.waitParse(context.Background(), item.RecordID, item.RagFileId, dataset.RagDataId)
- return nil
- }
- // CreateV2 创建数据(V2版本:直接从内存上传到 RagFlow,无需经 Minio / 临时文件)
- // 流程:
- // 1. 参数校验 & 知识库校验;
- // 2. 事务外调用 RagFlow 上传(IO 密集,避免长事务占用 DB 连接);
- // 3. 事务内只做 DB 操作:写 DatasetFile + 累加 Dataset.FileCount;
- // 4. 若入库失败,尝试反向删除 RagFlow 已上传的文档,避免数据残留;
- // 5. 事务成功后,异步轮询 RagFlow 解析状态。
- func (a *DatasetFile) CreateV2(ctx context.Context, param schema.UpdateFileParam) error {
- if len(param.FileData) == 0 {
- return errors.New("文件内容不能为空")
- }
- if param.DatasetId == "" {
- return errors.New("知识库ID不能为空")
- }
- dataset, err := a.datasetModel.Get(ctx, param.DatasetId)
- if err != nil {
- return err
- }
- if dataset == nil || dataset.RagDataId == "" {
- return errors.New("知识库不存在")
- }
- ragClient := ragflow.GetHttpClient()
- ragResp, err := ragClient.UploadDocumentV2(ctx, dataset.RagDataId, &ragflow.UploadDocumentV2Req{
- FileName: param.FileName,
- FileData: param.FileData,
- })
- if err != nil {
- return fmt.Errorf("RagFlow上传失败: %s", err.Error())
- }
- if ragResp == nil || len(ragResp.Data) == 0 {
- return errors.New("RagFlow上传失败:返回为空")
- }
- ragFileId := ragResp.Data[0].ID
- // 解析失败不回滚上传,后续由 waitParse 反映真实状态
- if _, err := ragClient.ParseDocuments(ctx, dataset.RagDataId, []string{ragFileId}); err != nil {
- glog.Errorf(ctx, "文件解析触发失败:%s", err.Error())
- }
- fileSize := param.FileSize
- if fileSize <= 0 {
- fileSize = int64(len(param.FileData))
- }
- item := schema.DatasetFile{
- RecordID: guid.S(),
- Name: param.FileName,
- DatasetId: param.DatasetId,
- Size: fileSize,
- Type: a.getFileType(param.FileName),
- Enabled: true,
- CreatorId: param.CreatorId,
- RagFileId: ragFileId,
- ParseStatus: 1,
- }
- err = ExecTrans(ctx, a.transModel, func(ctx context.Context) error {
- if err := a.DatasetFileModel.Create(ctx, item); err != nil {
- return err
- }
- dataset.FileCount += 1
- return a.datasetModel.Update(ctx, dataset.RecordID, *dataset)
- })
- if err != nil {
- if _, delErr := ragClient.DeleteDocuments(ctx, dataset.RagDataId, []string{ragFileId}); delErr != nil {
- glog.Errorf(ctx, "入库失败后清理 RagFlow 文档失败:%s", delErr.Error())
- }
- return err
- }
- go a.waitParse(context.Background(), item.RecordID, ragFileId, dataset.RagDataId)
- return nil
- }
- // getFileType 根据文件名获取文件类型
- func (a *DatasetFile) getFileType(fileName string) string {
- ext := strings.ToLower(filepath.Ext(fileName))
- switch ext {
- case ".pdf":
- return "pdf"
- case ".doc", ".docx":
- return "word"
- case ".xls", ".xlsx":
- return "excel"
- case ".txt":
- return "txt"
- case ".md":
- return "markdown"
- case ".jpg", ".jpeg":
- return "jpg"
- case ".png":
- return "png"
- default:
- return "other"
- }
- }
- // waitParse 轮询 ragflow 获取指定文档的解析状态并更新本地 parse_status
- // 1 解析中 / 2 解析完成 / 3 解析失败
- func (a *DatasetFile) waitParse(ctx context.Context, recordId, fileId, ragDataId string) {
- if fileId == "" || ragDataId == "" {
- return
- }
- const maxAttempts = 600 // ~20 分钟上限,防止死循环
- for i := 0; i < maxAttempts; i++ {
- res, err := ragflow.GetHttpClient().ListDocuments(ctx, ragDataId, &ragflow.ListDocumentReq{DocumentID: fileId})
- if err != nil || res == nil || len(res.Data.Docs) == 0 {
- if err != nil {
- glog.Errorf(ctx, "查询文件解析状态失败:%s", err.Error())
- }
- time.Sleep(2 * time.Second)
- continue
- }
- doc := res.Data.Docs[0]
- switch doc.Run {
- case "RUNNING":
- time.Sleep(2 * time.Second)
- continue
- case "DONE":
- glog.Info(ctx, fmt.Sprintf("%s:【%s】文件解析完成", fileId, doc.Name))
- _ = a.DatasetFileModel.UpdateParseStatus(ctx, recordId, 2)
- return
- default:
- glog.Error(ctx, fmt.Sprintf("%s:【%s】文件解析失败,Run=%s", fileId, doc.Name, doc.Run))
- _ = a.DatasetFileModel.UpdateParseStatus(ctx, recordId, 3)
- return
- }
- }
- glog.Error(ctx, fmt.Sprintf("%s: 文件解析状态轮询超时", fileId))
- _ = a.DatasetFileModel.UpdateParseStatus(ctx, recordId, 3)
- }
- // BatchCreate 批量创建数据
- func (a *DatasetFile) BatchCreate(ctx context.Context, files schema.DatasetFiles) error {
- if len(files) == 0 {
- return errors.New("文件不能为空")
- }
- for _, v := range files {
- err := a.Create(ctx, *v)
- if err != nil {
- return err
- }
- }
- return nil
- }
- // Update 更新数据
- func (a *DatasetFile) Update(ctx context.Context, recordID string, item schema.DatasetFile) error {
- oldItem, err := a.DatasetFileModel.Get(ctx, recordID)
- if err != nil {
- return err
- } else if oldItem == nil {
- return errors.ErrNotFound
- }
- return a.DatasetFileModel.Update(ctx, recordID, item)
- }
- // Delete 删除数据
- func (a *DatasetFile) Delete(ctx context.Context, recordID string) error {
- oldItem, err := a.DatasetFileModel.Get(ctx, recordID)
- if err != nil {
- return err
- } else if oldItem == nil {
- return errors.ErrNotFound
- }
- dataset, err := a.datasetModel.Get(ctx, oldItem.DatasetId)
- if err != nil {
- return err
- }
- if dataset == nil || dataset.RagDataId == "" {
- return errors.New("知识库不存在")
- }
- err = ExecTrans(ctx, a.transModel, func(ctx context.Context) error {
- dataset, err := a.datasetModel.Get(ctx, oldItem.DatasetId)
- if err != nil {
- return err
- }
- dataset.FileCount -= 1
- err = a.datasetModel.Update(ctx, dataset.RecordID, *dataset)
- if err != nil {
- return err
- }
- _, err = ragflow.GetHttpClient().DeleteDocuments(ctx, dataset.RagDataId, []string{oldItem.RagFileId})
- if err != nil {
- return err
- }
- return a.DatasetFileModel.Delete(ctx, recordID)
- })
- return err
- }
- // UpdateStatus 更新状态
- func (a *DatasetFile) UpdateStatus(ctx context.Context, recordID string, status int) error {
- oldItem, err := a.DatasetFileModel.Get(ctx, recordID)
- if err != nil {
- return err
- } else if oldItem == nil {
- return errors.ErrNotFound
- }
- return a.DatasetFileModel.UpdateStatus(ctx, recordID, status)
- }
- // UpdateEnabled 更新启用状态
- func (a *DatasetFile) UpdateEnabled(ctx context.Context, recordID string, status bool) error {
- oldItem, err := a.DatasetFileModel.Get(ctx, recordID)
- if err != nil {
- return err
- } else if oldItem == nil {
- return errors.ErrNotFound
- }
- dataset, err := a.datasetModel.Get(ctx, oldItem.DatasetId)
- if err != nil {
- return err
- }
- if dataset == nil || dataset.RagDataId == "" {
- return errors.New("知识库不存在")
- }
- enabled := 0
- if status {
- enabled = 1
- }
- // 更新ragFlow
- _, err = ragflow.GetHttpClient().UpdateDocument(ctx, dataset.RagDataId, oldItem.RagFileId, &ragflow.UpdateDocumentReq{Enabled: enabled})
- if err != nil {
- return err
- }
- return a.DatasetFileModel.UpdateEnabled(ctx, recordID, status)
- }
- // BatchDelete 批量删除
- func (a *DatasetFile) BatchDelete(ctx context.Context, fileIDs []string) error {
- if len(fileIDs) == 0 {
- return errors.New("文件不能为空")
- }
- file, err := a.DatasetFileModel.Query(ctx, schema.DatasetFileQueryParam{RecordIDs: fileIDs})
- if err != nil {
- return err
- }
- if len(file.Data) == 0 {
- return nil
- }
- dataset, err := a.datasetModel.Get(ctx, file.Data[0].DatasetId)
- if err != nil {
- return err
- }
- _, err = ragflow.GetHttpClient().DeleteDocuments(ctx, dataset.RagDataId, file.Data.ToRagFileIds())
- if err != nil {
- return err
- }
- return a.DatasetFileModel.BatchDelete(ctx, fileIDs)
- }
- // BatchCreateV2 批量上传多个文件到指定知识库
- // 入参 files 通常由 controller 从 multipart 表单解析而来,已包含文件名与内容。
- // 单文件失败不会中断整批;最终成功数和失败明细统一返回给调用方。
- func (a *DatasetFile) BatchCreateV2(ctx context.Context, datasetId string, files []schema.UpdateFileParam) error {
- if datasetId == "" {
- return errors.New("dataset_id 不能为空")
- }
- if len(files) == 0 {
- return errors.New("文件列表不能为空")
- }
- dataset, err := a.datasetModel.Get(ctx, datasetId)
- if err != nil {
- return err
- }
- if dataset == nil || dataset.RagDataId == "" {
- return errors.New("知识库不存在")
- }
- var (
- failed []string
- success int
- )
- for _, f := range files {
- // 保证 datasetId 一致,避免调用方遗漏
- f.DatasetId = datasetId
- if err := a.CreateV2(ctx, f); err != nil {
- glog.Errorf(ctx, "批量导入【%s】失败:%s", f.FileName, err.Error())
- failed = append(failed, f.FileName)
- continue
- }
- success++
- }
- if len(failed) > 0 {
- return errors.New(fmt.Sprintf("部分文件导入失败(成功 %d 个):%v", success, failed))
- }
- return nil
- }
|