package internal import ( "context" "fmt" "path/filepath" "strings" "time" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/util/guid" "yx-dataset-server/app/errors" "yx-dataset-server/app/model" "yx-dataset-server/app/schema" "yx-dataset-server/library/ragflow" ) // NewDatasetFile 创建DatasetFile func NewDatasetFile( mDatasetFile model.IDatasetFile, mDataset model.IDataset, mTrans model.ITrans, mUser model.IUser, ) *DatasetFile { return &DatasetFile{ DatasetFileModel: mDatasetFile, datasetModel: mDataset, transModel: mTrans, userModel: mUser, } } // DatasetFile 创建DatasetFile对象 type DatasetFile struct { DatasetFileModel model.IDatasetFile datasetModel model.IDataset transModel model.ITrans userModel model.IUser } // Query 查询数据 func (a *DatasetFile) Query(ctx context.Context, params schema.DatasetFileQueryParam, opts ...schema.DatasetFileQueryOptions) (*schema.DatasetFileQueryResult, error) { result, err := a.DatasetFileModel.Query(ctx, params, opts...) if err != nil { return nil, err } if len(result.Data) > 0 { user, err := a.userModel.Query(ctx, schema.UserQueryParam{RoleCode: []string{"11", "12"}}) if err != nil { return nil, err } result.Data.FillCreator(user.Data) } return result, nil } // Get 查询指定数据 func (a *DatasetFile) Get(ctx context.Context, recordID string, opts ...schema.DatasetFileQueryOptions) (*schema.DatasetFile, error) { item, err := a.DatasetFileModel.Get(ctx, recordID, opts...) if err != nil { return nil, err } else if item == nil { return nil, errors.ErrNotFound } return item, nil } func (a *DatasetFile) getUpdate(ctx context.Context, recordID string) (*schema.DatasetFile, error) { return a.Get(ctx, recordID) } // Create 创建数据 // 流程: // 1. 事务内:写 DatasetFile、累加 Dataset.FileCount、调用 ragflow 上传与解析; // 2. 事务成功后,异步轮询 ragflow 获取解析状态; // // 注意: // - 上传与解析只发生在事务中一次; // - 轮询使用独立 background context,避免父请求结束后 ctx 取消导致的 ragflow 调用失败; // - 只有事务成功才会启动轮询。 func (a *DatasetFile) Create(ctx context.Context, item schema.DatasetFile) error { item.RecordID = guid.S() dataset, err := a.datasetModel.Get(ctx, item.DatasetId) if err != nil { return err } if dataset == nil || dataset.RagDataId == "" { return errors.New("知识库不存在") } err = ExecTrans(ctx, a.transModel, func(ctx context.Context) error { fileInfo := ragflow.FileInfo{ FileName: item.Name, Url: fmt.Sprintf("%s%s", "https://app.yongxulvjian.com", item.Url), } uploadResp, err := ragflow.GetHttpClient().UploadDocument(ctx, dataset.RagDataId, ragflow.UploadFileReq{ File: []*ragflow.FileInfo{&fileInfo}, }) if err != nil { return errors.New(fmt.Sprintf("文件上传失败:%s", err.Error())) } if uploadResp == nil || len(uploadResp.Data) == 0 { return errors.New("文件上传失败:ragflow 返回为空") } item.RagFileId = uploadResp.Data[0].ID item.ParseStatus = 1 if _, err := ragflow.GetHttpClient().ParseDocuments(ctx, dataset.RagDataId, []string{item.RagFileId}); err != nil { glog.Errorf(ctx, "文件解析触发失败:%s", err.Error()) } if err := a.DatasetFileModel.Create(ctx, item); err != nil { return err } dataset.FileCount += 1 return a.datasetModel.Update(ctx, dataset.RecordID, *dataset) }) if err != nil { return err } // 异步轮询解析状态,使用 Background 避免请求 ctx 提前取消 go a.waitParse(context.Background(), item.RecordID, item.RagFileId, dataset.RagDataId) return nil } // CreateV2 创建数据(V2版本:直接从内存上传到 RagFlow,无需经 Minio / 临时文件) // 流程: // 1. 参数校验 & 知识库校验; // 2. 事务外调用 RagFlow 上传(IO 密集,避免长事务占用 DB 连接); // 3. 事务内只做 DB 操作:写 DatasetFile + 累加 Dataset.FileCount; // 4. 若入库失败,尝试反向删除 RagFlow 已上传的文档,避免数据残留; // 5. 事务成功后,异步轮询 RagFlow 解析状态。 func (a *DatasetFile) CreateV2(ctx context.Context, param schema.UpdateFileParam) error { if len(param.FileData) == 0 { return errors.New("文件内容不能为空") } if param.DatasetId == "" { return errors.New("知识库ID不能为空") } dataset, err := a.datasetModel.Get(ctx, param.DatasetId) if err != nil { return err } if dataset == nil || dataset.RagDataId == "" { return errors.New("知识库不存在") } ragClient := ragflow.GetHttpClient() ragResp, err := ragClient.UploadDocumentV2(ctx, dataset.RagDataId, &ragflow.UploadDocumentV2Req{ FileName: param.FileName, FileData: param.FileData, }) if err != nil { return fmt.Errorf("RagFlow上传失败: %s", err.Error()) } if ragResp == nil || len(ragResp.Data) == 0 { return errors.New("RagFlow上传失败:返回为空") } ragFileId := ragResp.Data[0].ID // 解析失败不回滚上传,后续由 waitParse 反映真实状态 if _, err := ragClient.ParseDocuments(ctx, dataset.RagDataId, []string{ragFileId}); err != nil { glog.Errorf(ctx, "文件解析触发失败:%s", err.Error()) } fileSize := param.FileSize if fileSize <= 0 { fileSize = int64(len(param.FileData)) } item := schema.DatasetFile{ RecordID: guid.S(), Name: param.FileName, DatasetId: param.DatasetId, Size: fileSize, Type: a.getFileType(param.FileName), Enabled: true, CreatorId: param.CreatorId, RagFileId: ragFileId, ParseStatus: 1, } err = ExecTrans(ctx, a.transModel, func(ctx context.Context) error { if err := a.DatasetFileModel.Create(ctx, item); err != nil { return err } dataset.FileCount += 1 return a.datasetModel.Update(ctx, dataset.RecordID, *dataset) }) if err != nil { if _, delErr := ragClient.DeleteDocuments(ctx, dataset.RagDataId, []string{ragFileId}); delErr != nil { glog.Errorf(ctx, "入库失败后清理 RagFlow 文档失败:%s", delErr.Error()) } return err } go a.waitParse(context.Background(), item.RecordID, ragFileId, dataset.RagDataId) return nil } // getFileType 根据文件名获取文件类型 func (a *DatasetFile) getFileType(fileName string) string { ext := strings.ToLower(filepath.Ext(fileName)) switch ext { case ".pdf": return "pdf" case ".doc", ".docx": return "word" case ".xls", ".xlsx": return "excel" case ".txt": return "txt" case ".md": return "markdown" case ".jpg", ".jpeg": return "jpg" case ".png": return "png" default: return "other" } } // waitParse 轮询 ragflow 获取指定文档的解析状态并更新本地 parse_status // 1 解析中 / 2 解析完成 / 3 解析失败 func (a *DatasetFile) waitParse(ctx context.Context, recordId, fileId, ragDataId string) { if fileId == "" || ragDataId == "" { return } const maxAttempts = 600 // ~20 分钟上限,防止死循环 for i := 0; i < maxAttempts; i++ { res, err := ragflow.GetHttpClient().ListDocuments(ctx, ragDataId, &ragflow.ListDocumentReq{DocumentID: fileId}) if err != nil || res == nil || len(res.Data.Docs) == 0 { if err != nil { glog.Errorf(ctx, "查询文件解析状态失败:%s", err.Error()) } time.Sleep(2 * time.Second) continue } doc := res.Data.Docs[0] switch doc.Run { case "RUNNING": time.Sleep(2 * time.Second) continue case "DONE": glog.Info(ctx, fmt.Sprintf("%s:【%s】文件解析完成", fileId, doc.Name)) _ = a.DatasetFileModel.UpdateParseStatus(ctx, recordId, 2) return default: glog.Error(ctx, fmt.Sprintf("%s:【%s】文件解析失败,Run=%s", fileId, doc.Name, doc.Run)) _ = a.DatasetFileModel.UpdateParseStatus(ctx, recordId, 3) return } } glog.Error(ctx, fmt.Sprintf("%s: 文件解析状态轮询超时", fileId)) _ = a.DatasetFileModel.UpdateParseStatus(ctx, recordId, 3) } // BatchCreate 批量创建数据 func (a *DatasetFile) BatchCreate(ctx context.Context, files schema.DatasetFiles) error { if len(files) == 0 { return errors.New("文件不能为空") } for _, v := range files { err := a.Create(ctx, *v) if err != nil { return err } } return nil } // Update 更新数据 func (a *DatasetFile) Update(ctx context.Context, recordID string, item schema.DatasetFile) error { oldItem, err := a.DatasetFileModel.Get(ctx, recordID) if err != nil { return err } else if oldItem == nil { return errors.ErrNotFound } return a.DatasetFileModel.Update(ctx, recordID, item) } // Delete 删除数据 func (a *DatasetFile) Delete(ctx context.Context, recordID string) error { oldItem, err := a.DatasetFileModel.Get(ctx, recordID) if err != nil { return err } else if oldItem == nil { return errors.ErrNotFound } dataset, err := a.datasetModel.Get(ctx, oldItem.DatasetId) if err != nil { return err } if dataset == nil || dataset.RagDataId == "" { return errors.New("知识库不存在") } err = ExecTrans(ctx, a.transModel, func(ctx context.Context) error { dataset, err := a.datasetModel.Get(ctx, oldItem.DatasetId) if err != nil { return err } dataset.FileCount -= 1 err = a.datasetModel.Update(ctx, dataset.RecordID, *dataset) if err != nil { return err } _, err = ragflow.GetHttpClient().DeleteDocuments(ctx, dataset.RagDataId, []string{oldItem.RagFileId}) if err != nil { return err } return a.DatasetFileModel.Delete(ctx, recordID) }) return err } // UpdateStatus 更新状态 func (a *DatasetFile) UpdateStatus(ctx context.Context, recordID string, status int) error { oldItem, err := a.DatasetFileModel.Get(ctx, recordID) if err != nil { return err } else if oldItem == nil { return errors.ErrNotFound } return a.DatasetFileModel.UpdateStatus(ctx, recordID, status) } // UpdateEnabled 更新启用状态 func (a *DatasetFile) UpdateEnabled(ctx context.Context, recordID string, status bool) error { oldItem, err := a.DatasetFileModel.Get(ctx, recordID) if err != nil { return err } else if oldItem == nil { return errors.ErrNotFound } dataset, err := a.datasetModel.Get(ctx, oldItem.DatasetId) if err != nil { return err } if dataset == nil || dataset.RagDataId == "" { return errors.New("知识库不存在") } enabled := 0 if status { enabled = 1 } // 更新ragFlow _, err = ragflow.GetHttpClient().UpdateDocument(ctx, dataset.RagDataId, oldItem.RagFileId, &ragflow.UpdateDocumentReq{Enabled: enabled}) if err != nil { return err } return a.DatasetFileModel.UpdateEnabled(ctx, recordID, status) } // BatchDelete 批量删除 func (a *DatasetFile) BatchDelete(ctx context.Context, fileIDs []string) error { if len(fileIDs) == 0 { return errors.New("文件不能为空") } file, err := a.DatasetFileModel.Query(ctx, schema.DatasetFileQueryParam{RecordIDs: fileIDs}) if err != nil { return err } if len(file.Data) == 0 { return nil } dataset, err := a.datasetModel.Get(ctx, file.Data[0].DatasetId) if err != nil { return err } _, err = ragflow.GetHttpClient().DeleteDocuments(ctx, dataset.RagDataId, file.Data.ToRagFileIds()) if err != nil { return err } return a.DatasetFileModel.BatchDelete(ctx, fileIDs) } // BatchCreateV2 批量上传多个文件到指定知识库 // 入参 files 通常由 controller 从 multipart 表单解析而来,已包含文件名与内容。 // 单文件失败不会中断整批;最终成功数和失败明细统一返回给调用方。 func (a *DatasetFile) BatchCreateV2(ctx context.Context, datasetId string, files []schema.UpdateFileParam) error { if datasetId == "" { return errors.New("dataset_id 不能为空") } if len(files) == 0 { return errors.New("文件列表不能为空") } dataset, err := a.datasetModel.Get(ctx, datasetId) if err != nil { return err } if dataset == nil || dataset.RagDataId == "" { return errors.New("知识库不存在") } var ( failed []string success int ) for _, f := range files { // 保证 datasetId 一致,避免调用方遗漏 f.DatasetId = datasetId if err := a.CreateV2(ctx, f); err != nil { glog.Errorf(ctx, "批量导入【%s】失败:%s", f.FileName, err.Error()) failed = append(failed, f.FileName) continue } success++ } if len(failed) > 0 { return errors.New(fmt.Sprintf("部分文件导入失败(成功 %d 个):%v", success, failed)) } return nil }