package internal import ( "context" "fmt" "sort" "strings" "time" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/util/guid" "yx-dataset-server/app/errors" "yx-dataset-server/app/model" "yx-dataset-server/app/schema" "yx-dataset-server/library/ragflow" ) // NewChatAssistant 创建ChatAssistant func NewChatAssistant( mChatAssistant model.IChatAssistant, mChatDataset model.IChatDataset, mTrans model.ITrans, mDataset model.IDataset, mUser model.IUser, mSession model.IChatSession, mMessage model.IChatMessage, mRelation model.IDatasetRelation, mFile model.IDatasetFile, ) *ChatAssistant { return &ChatAssistant{ ChatAssistantModel: mChatAssistant, chatDatasetModel: mChatDataset, transModel: mTrans, datasetModel: mDataset, userModel: mUser, sessionModel: mSession, messageModel: mMessage, relationModel: mRelation, fileModel: mFile, } } // ChatAssistant 创建ChatAssistant对象 type ChatAssistant struct { ChatAssistantModel model.IChatAssistant chatDatasetModel model.IChatDataset transModel model.ITrans datasetModel model.IDataset userModel model.IUser sessionModel model.IChatSession messageModel model.IChatMessage relationModel model.IDatasetRelation fileModel model.IDatasetFile } // Query 查询数据 func (a *ChatAssistant) Query(ctx context.Context, params schema.ChatAssistantQueryParam, opts ...schema.ChatAssistantQueryOptions) (*schema.ChatAssistantQueryResult, error) { var userQueryParam schema.UserQueryParam userQueryParam.RoleCode = []string{"11", "12"} if !CheckIsRootUser(ctx) { user, err := a.userModel.Get(ctx, GetUserID(ctx)) if err != nil { return nil, err } params.OrgId = user.OrgId userQueryParam.OrgId = user.OrgId } result, err := a.ChatAssistantModel.Query(ctx, params, opts...) if err != nil { return nil, err } chatDataset, err := a.chatDatasetModel.Query(ctx, schema.ChatDatasetQueryParam{}) if err != nil { return nil, err } result.Data.FillDatasetId(chatDataset.Data) dataset, err := a.datasetModel.Query(ctx, schema.DatasetQueryParam{}) if err != nil { return nil, err } result.Data.FillDataset(dataset.Data) users, err := a.userModel.Query(ctx, userQueryParam) if err != nil { return nil, err } result.Data.FillCreator(users.Data) sessions, err := a.sessionModel.Query(ctx, schema.ChatSessionQueryParam{}) if err != nil { return nil, err } result.Data.FillSession(sessions.Data) return result, nil } // Get 查询指定数据 func (a *ChatAssistant) Get(ctx context.Context, recordID string, opts ...schema.ChatAssistantQueryOptions) (*schema.ChatAssistant, error) { item, err := a.ChatAssistantModel.Get(ctx, recordID, opts...) if err != nil { return nil, err } else if item == nil { return nil, errors.ErrNotFound } user, err := a.userModel.Get(ctx, item.CreatorId) if err != nil { return nil, err } item.CreatorName = user.RealName chatDataset, err := a.chatDatasetModel.Query(ctx, schema.ChatDatasetQueryParam{ChatAssistantId: recordID}) if err != nil { return nil, err } dataset, err := a.datasetModel.Query(ctx, schema.DatasetQueryParam{RecordIds: chatDataset.Data.ToDatasetIds()}) if err != nil { return nil, err } users, err := a.userModel.Query(ctx, schema.UserQueryParam{RoleCode: []string{"11", "12"}}) if err != nil { return nil, err } dataset.Data.FillCreator(users.Data) item.Datasets = dataset.Data sessions, err := a.sessionModel.Query(ctx, schema.ChatSessionQueryParam{AssistantId: item.RecordID}) if err != nil { return nil, err } item.Sessions = sessions.Data return item, nil } // Create 创建数据 func (a *ChatAssistant) Create(ctx context.Context, item schema.ChatAssistant) (*schema.ChatAssistant, error) { // root用户不能创建对话助手 if CheckIsRootUser(ctx) { return nil, errors.New400Response("permission denied") } item.RecordID = guid.S() item.CreatorId = GetUserID(ctx) var ragDatasetIds []string // 获取用户组织id user, err := a.userModel.Get(ctx, item.CreatorId) if err != nil { return nil, err } item.OrgId = user.OrgId name := fmt.Sprintf("%s%s:%s", user.RealName, "对话助手", time.Now().Format("2006-01-02 15:04:05")) if item.Name != "" { name = item.Name } item.Name = name // 取当前用户(含其所属组织)能访问的全部知识库 bizIds := []string{GetUserID(ctx)} if user.OrgId != "" { bizIds = append(bizIds, user.OrgId) } rel, err := a.relationModel.Query(ctx, schema.DatasetRelationQueryParam{BizIds: bizIds}) if err != nil { return nil, err } dataset, err := a.datasetModel.Query(ctx, schema.DatasetQueryParam{RecordIds: rel.Data.ToDatasetIds()}) if err != nil { return nil, err } err = a.CheckFiles(ctx, dataset.Data.ToRecordIds()) if err != nil { return nil, err } ragDatasetIds = dataset.Data.ToRagDataIds() err = ExecTrans(ctx, a.transModel, func(ctx context.Context) error { for _, v := range dataset.Data { err = a.chatDatasetModel.Create(ctx, schema.ChatDataset{ RecordID: guid.S(), ChatAssistantId: item.RecordID, DatasetId: v.RecordID, }) if err != nil { return err } } resp, err := ragflow.GetHttpClient().CreateChat(ctx, &ragflow.CreateChatReq{ Name: name, DatasetIDs: ragDatasetIds, }) if err != nil { return err } item.RagChatId = resp.Data.ID err = a.ChatAssistantModel.Create(ctx, item) if err != nil { return err } if item.Source == 1 { user.H5AssistantId = item.RecordID err = a.userModel.Update(ctx, user.RecordID, *user) if err != nil { return err } } return nil }) return a.Get(ctx, item.RecordID) } func (a *ChatAssistant) CheckFiles(ctx context.Context, datasetIds []string) error { files, err := a.fileModel.Query(ctx, schema.DatasetFileQueryParam{DatasetIds: datasetIds}) if err != nil { return err } for _, v := range files.Data { if v.Enabled && v.ParseStatus == 2 { return nil } } return errors.New("用户关联知识库没有已解析的文件,不能使用对话助手,请联系管理员确认。") } // Update 更新数据 func (a *ChatAssistant) Update(ctx context.Context, recordID string, item schema.ChatAssistant) error { oldItem, err := a.ChatAssistantModel.Get(ctx, recordID) if err != nil { return err } else if oldItem == nil { return errors.ErrNotFound } err = ExecTrans(ctx, a.transModel, func(ctx context.Context) error { err = a.ChatAssistantModel.Update(ctx, recordID, item) if err != nil { return err } if item.Name != oldItem.Name { _, err = ragflow.GetHttpClient().UpdateChat(ctx, item.RagChatId, &ragflow.UpdateChatReq{ Name: item.Name, }) if err != nil { return err } } return nil }) return err } // Delete 删除数据 func (a *ChatAssistant) Delete(ctx context.Context, recordID string) error { oldItem, err := a.ChatAssistantModel.Get(ctx, recordID) if err != nil { return err } else if oldItem == nil { return errors.ErrNotFound } err = ExecTrans(ctx, a.transModel, func(ctx context.Context) error { err = a.ChatAssistantModel.Delete(ctx, recordID) if err != nil { return err } _, err = ragflow.GetHttpClient().DeleteChats(ctx, []string{oldItem.RagChatId}) if err != nil { return err } return nil }) return err } // UpdateStatus 更新状态 func (a *ChatAssistant) UpdateStatus(ctx context.Context, recordID string, status int) error { oldItem, err := a.ChatAssistantModel.Get(ctx, recordID) if err != nil { return err } else if oldItem == nil { return errors.ErrNotFound } return a.ChatAssistantModel.UpdateStatus(ctx, recordID, status) } // loadCurrentUserDatasets 取当前用户(及所属组织)可访问的全部知识库 func (a *ChatAssistant) loadCurrentUserDatasets(ctx context.Context, user *schema.User) (schema.Datasets, error) { bizIds := []string{user.RecordID} rel, err := a.relationModel.Query(ctx, schema.DatasetRelationQueryParam{BizIds: bizIds}) if err != nil { return nil, err } if len(rel.Data) == 0 { return schema.Datasets{}, nil } ds, err := a.datasetModel.Query(ctx, schema.DatasetQueryParam{RecordIds: rel.Data.ToDatasetIds()}) if err != nil { return nil, err } return ds.Data, nil } // diffIds 返回 a 相比 b 的差集(按排序后的字符串切片) func diffIds(a, b []string) []string { m := make(map[string]struct{}, len(b)) for _, v := range b { m[v] = struct{}{} } out := make([]string, 0) for _, v := range a { if _, ok := m[v]; !ok { out = append(out, v) } } sort.Strings(out) return out } // idsEqual 忽略顺序比较两组 id 是否一致 func idsEqual(a, b []string) bool { if len(a) != len(b) { return false } aa := append([]string(nil), a...) bb := append([]string(nil), b...) sort.Strings(aa) sort.Strings(bb) for i := range aa { if aa[i] != bb[i] { return false } } return true } // CheckPermissionSync 对比当前登录用户的 KB 权限与其 h5 对话助手绑定的 KB func (a *ChatAssistant) CheckPermissionSync(ctx context.Context) (*schema.AssistantPermissionStatus, error) { if CheckIsRootUser(ctx) { return nil, errors.New400Response("permission denied") } userId := GetUserID(ctx) user, err := a.userModel.Get(ctx, userId) if err != nil { return nil, err } else if user == nil { return nil, errors.ErrNotFound } currentDatasets, err := a.loadCurrentUserDatasets(ctx, user) if err != nil { return nil, err } currentIds := currentDatasets.ToRecordIds() status := &schema.AssistantPermissionStatus{ AssistantId: user.H5AssistantId, CurrentDatasetIds: currentIds, BoundDatasetIds: []string{}, Added: []string{}, Removed: []string{}, } // 没有 h5 助手 => 需要创建 if user.H5AssistantId == "" { status.Synced = false status.NeedCreate = true status.Added = append([]string{}, currentIds...) sort.Strings(status.Added) return status, nil } // 助手可能已被删除 assistant, err := a.ChatAssistantModel.Get(ctx, user.H5AssistantId) if err != nil { return nil, err } if assistant == nil { status.Synced = false status.NeedCreate = true status.AssistantId = "" status.Added = append([]string{}, currentIds...) sort.Strings(status.Added) return status, nil } cd, err := a.chatDatasetModel.Query(ctx, schema.ChatDatasetQueryParam{ChatAssistantId: assistant.RecordID}) if err != nil { return nil, err } boundIds := cd.Data.ToDatasetIds() status.BoundDatasetIds = boundIds status.Synced = idsEqual(currentIds, boundIds) status.Added = diffIds(currentIds, boundIds) status.Removed = diffIds(boundIds, currentIds) return status, nil } // SyncPermission 按当前用户的 KB 权限重建或创建 h5 对话助手、会话与欢迎消息 // 流程: // 1. 无助手 => 创建 ragflow chat + 本地助手 + 本地 chat_dataset + 新会话 + 欢迎消息 // 2. 已有助手且权限不一致 => 清理旧会话(含 ragflow)+ 重建 chat_dataset + 更新 ragflow chat + 创建新会话与欢迎消息 // 3. 权限已一致 => 直接返回 func (a *ChatAssistant) SyncPermission(ctx context.Context) (*schema.ChatAssistant, error) { if CheckIsRootUser(ctx) { return nil, errors.New400Response("permission denied") } userId := GetUserID(ctx) user, err := a.userModel.Get(ctx, userId) if err != nil { return nil, err } else if user == nil { return nil, errors.ErrNotFound } currentDatasets, err := a.loadCurrentUserDatasets(ctx, user) if err != nil { return nil, err } if len(currentDatasets) == 0 { return nil, errors.New400Response("当前用户没有可用的知识库,无法创建或更新对话助手") } if err := a.CheckFiles(ctx, currentDatasets.ToRecordIds()); err != nil { return nil, err } ragDatasetIds := currentDatasets.ToRagDataIds() // ---------- 1. 无助手:创建 ---------- if user.H5AssistantId == "" { return a.createH5Assistant(ctx, user, currentDatasets, ragDatasetIds) } assistant, err := a.ChatAssistantModel.Get(ctx, user.H5AssistantId) if err != nil { return nil, err } // 助手已被删 => 重建 if assistant == nil { user.H5AssistantId = "" return a.createH5Assistant(ctx, user, currentDatasets, ragDatasetIds) } // ---------- 2. 已有助手:检查是否一致 ---------- cd, err := a.chatDatasetModel.Query(ctx, schema.ChatDatasetQueryParam{ChatAssistantId: assistant.RecordID}) if err != nil { return nil, err } if idsEqual(currentDatasets.ToRecordIds(), cd.Data.ToDatasetIds()) { return a.Get(ctx, assistant.RecordID) } // ---------- 3. 已有助手且不一致:重建 ---------- if err := a.rebuildAssistant(ctx, assistant, currentDatasets, ragDatasetIds); err != nil { return nil, err } return a.Get(ctx, assistant.RecordID) } // createH5Assistant 为用户创建一个 h5 对话助手,并同步创建一个新会话+欢迎消息 func (a *ChatAssistant) createH5Assistant(ctx context.Context, user *schema.User, datasets schema.Datasets, ragDatasetIds []string) (*schema.ChatAssistant, error) { assistant := schema.ChatAssistant{ RecordID: guid.S(), OrgId: user.OrgId, UserId: user.RecordID, Source: 1, CreatorId: user.RecordID, Name: fmt.Sprintf("%s%s:%s", user.RealName, "对话助手", time.Now().Format("2006-01-02 15:04:05")), } err := ExecTrans(ctx, a.transModel, func(ctx context.Context) error { resp, err := ragflow.GetHttpClient().CreateChat(ctx, &ragflow.CreateChatReq{ Name: assistant.Name, DatasetIDs: ragDatasetIds, }) if err != nil { return err } assistant.RagChatId = resp.Data.ID for _, d := range datasets { if err := a.chatDatasetModel.Create(ctx, schema.ChatDataset{ RecordID: guid.S(), ChatAssistantId: assistant.RecordID, DatasetId: d.RecordID, RagDataId: d.RagDataId, }); err != nil { return err } } if err := a.ChatAssistantModel.Create(ctx, assistant); err != nil { return err } user.H5AssistantId = assistant.RecordID if err := a.userModel.Update(ctx, user.RecordID, *user); err != nil { return err } return a.createWelcomeSession(ctx, &assistant, datasets, user.RecordID) }) if err != nil { return nil, err } return a.Get(ctx, assistant.RecordID) } // rebuildAssistant 清空助手的旧会话、旧知识库映射,按最新 KB 重建,并创建新会话与欢迎消息 func (a *ChatAssistant) rebuildAssistant(ctx context.Context, assistant *schema.ChatAssistant, datasets schema.Datasets, ragDatasetIds []string) error { return ExecTrans(ctx, a.transModel, func(ctx context.Context) error { // 清理旧会话(先删 ragflow 侧再删本地) oldSessions, err := a.sessionModel.Query(ctx, schema.ChatSessionQueryParam{AssistantId: assistant.RecordID}) if err != nil { return err } if len(oldSessions.Data) > 0 && assistant.RagChatId != "" { ragSessionIds := make([]string, 0, len(oldSessions.Data)) for _, s := range oldSessions.Data { if s.RagSessionId != "" { ragSessionIds = append(ragSessionIds, s.RagSessionId) } } if len(ragSessionIds) > 0 { if _, err := ragflow.GetHttpClient().DeleteSessions(ctx, assistant.RagChatId, ragSessionIds); err != nil { glog.Errorf(ctx, "删除 ragflow 会话失败: %s", err.Error()) } } } if err := a.sessionModel.DeleteByAssistantId(ctx, assistant.RecordID); err != nil { return err } // 重建 chat_dataset 映射 if err := a.chatDatasetModel.DeleteByAssistantId(ctx, assistant.RecordID); err != nil { return err } for _, d := range datasets { if err := a.chatDatasetModel.Create(ctx, schema.ChatDataset{ RecordID: guid.S(), ChatAssistantId: assistant.RecordID, DatasetId: d.RecordID, RagDataId: d.RagDataId, }); err != nil { return err } } // 同步 ragflow 助手绑定的知识库 if assistant.RagChatId != "" { if _, err := ragflow.GetHttpClient().UpdateChat(ctx, assistant.RagChatId, &ragflow.UpdateChatReq{ DatasetIDs: ragDatasetIds, }); err != nil { return err } } return a.createWelcomeSession(ctx, assistant, datasets, assistant.CreatorId) }) } // createWelcomeSession 为助手创建一条新的 ragflow 会话,并写入一条欢迎消息 func (a *ChatAssistant) createWelcomeSession(ctx context.Context, assistant *schema.ChatAssistant, datasets schema.Datasets, userId string) error { if assistant.RagChatId == "" { return nil } sessionName := fmt.Sprintf("%s%s", time.Now().Format("2006-01-02 15:04:05 "), "会话") resp, err := ragflow.GetHttpClient().CreateSession(ctx, assistant.RagChatId, &ragflow.CreateSessionReq{Name: sessionName}) if err != nil { return err } session := schema.ChatSession{ RecordID: guid.S(), Name: sessionName, AssistantId: assistant.RecordID, RagChatId: assistant.RagChatId, RagSessionId: resp.Data.Id, Source: assistant.Source, CreatorId: userId, } if err := a.sessionModel.Create(ctx, session); err != nil { return err } datasetNames := make([]string, 0, len(datasets)) for _, d := range datasets { datasetNames = append(datasetNames, fmt.Sprintf("《%s》", d.Name)) } answer := fmt.Sprintf("你好,欢迎使用知识库问答助手!关于%s的疑问,我都会尽力为你解答,快来提问吧!", strings.Join(datasetNames, ",")) message := schema.ChatMessage{ RecordID: guid.S(), UserId: userId, AssistantId: assistant.RecordID, SessionId: session.RecordID, RagSessionId: resp.Data.Id, Question: "", Answer: strings.ReplaceAll(answer, `\`, ``), CreatorId: userId, } return a.messageModel.Create(ctx, message) }