package internal import ( "context" "fmt" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/util/guid" "yx-dataset-server/app/errors" "yx-dataset-server/app/model" "yx-dataset-server/app/schema" "yx-dataset-server/library/dingtalk" "yx-dataset-server/library/ragflow" ) // 钉钉机器人类型 const ( robotTypeDingtalk = 1 ) // 钉钉机器人回复类型 const ( replyTypeText = 1 // 文本回复(HTTP Webhook) replyTypeCard = 2 // 卡片回复(Stream + AI 卡片流式更新) ) // NewRobotConfig 创建RobotConfig func NewRobotConfig( mRobotConfig model.IRobotConfig, mRobotDataset model.IRobotDataset, mTrans model.ITrans, mChatMessage model.IChatMessage, mChatAssistant model.IChatAssistant, mChatSession model.IChatSession, mChatDataset model.IChatDataset, mUser model.IUser, mDataset model.IDataset, mRole model.IRole, ) *RobotConfig { return &RobotConfig{ RobotConfigModel: mRobotConfig, robotDatasetModel: mRobotDataset, transModel: mTrans, chatMessageModel: mChatMessage, chatAssistantModel: mChatAssistant, chatSessionModel: mChatSession, chatDatasetModel: mChatDataset, userModel: mUser, datasetModel: mDataset, roleModel: mRole, } } // RobotConfig 创建RobotConfig对象 type RobotConfig struct { RobotConfigModel model.IRobotConfig robotDatasetModel model.IRobotDataset transModel model.ITrans chatMessageModel model.IChatMessage chatAssistantModel model.IChatAssistant chatSessionModel model.IChatSession chatDatasetModel model.IChatDataset userModel model.IUser datasetModel model.IDataset roleModel model.IRole } // Query 查询数据 func (a *RobotConfig) Query(ctx context.Context, params schema.RobotConfigQueryParam, opts ...schema.RobotConfigQueryOptions) (*schema.RobotConfigQueryResult, error) { if !CheckIsRootUser(ctx) { user, err := a.userModel.Get(ctx, GetUserID(ctx)) if err != nil { return nil, err } params.OrgId = user.OrgId role, err := a.roleModel.Get(ctx, user.RoleId) if err != nil { return nil, err } if role.Code == "99" { params.UserId = GetUserID(ctx) } } users, err := a.userModel.Query(ctx, schema.UserQueryParam{}) if err != nil { return nil, err } result, err := a.RobotConfigModel.Query(ctx, params, opts...) if err != nil { return nil, err } result.Data.FillCreator(users.Data) return result, nil } // Get 查询指定数据 func (a *RobotConfig) Get(ctx context.Context, recordID string, opts ...schema.RobotConfigQueryOptions) (*schema.RobotConfig, error) { item, err := a.RobotConfigModel.Get(ctx, recordID, opts...) if err != nil { return nil, err } else if item == nil { return nil, errors.ErrNotFound } user, err := a.userModel.Get(ctx, item.CreatorId) if err != nil { return nil, err } item.CreatorName = user.RealName robotDatasets, err := a.robotDatasetModel.Query(ctx, schema.RobotDatasetQueryParam{ RobotId: item.RecordID, }) if err != nil { return nil, err } if len(robotDatasets.Data) > 0 { datasets, err := a.datasetModel.Query(ctx, schema.DatasetQueryParam{RecordIds: robotDatasets.Data.ToDatasetIds()}) if err != nil { return nil, err } item.Datasets = datasets.Data } return item, nil } func (a *RobotConfig) getUpdate(ctx context.Context, recordID string) (*schema.RobotConfig, error) { return a.Get(ctx, recordID) } // Create 创建数据 func (a *RobotConfig) Create(ctx context.Context, item schema.RobotConfig) (*schema.RobotConfig, error) { item.RecordID = guid.S() item.CreatorId = GetUserID(ctx) item.AssistantId = guid.S() item.SessionId = guid.S() item.Webhook = fmt.Sprintf("%s%s", "https://dataset.yongxulvjian.com/robot/v1/robots/receive/", item.RecordID) user := GetRootUser() var err error if !CheckIsRootUser(ctx) { user, err = a.userModel.Get(ctx, GetUserID(ctx)) if err != nil { return nil, err } item.OrgId = user.OrgId } // 创建对话助手及会话 if err := a.createAssistant(ctx, item.AssistantId, item.SessionId, *user, item); err != nil { return nil, err } // 在事务中无条件落库 robot_config,并按需创建 robot_dataset err = ExecTrans(ctx, a.transModel, func(ctx context.Context) error { for _, dataset := range item.Datasets { if err := a.robotDatasetModel.Create(ctx, schema.RobotDataset{ RecordID: guid.S(), RobotId: item.RecordID, DatasetId: dataset.RecordID, }); err != nil { return err } } return a.RobotConfigModel.Create(ctx, item) }) if err != nil { return nil, err } // 钉钉机器人 + 卡片回复模式:拉起 stream 监听 if item.Type == robotTypeDingtalk && item.ReplyType == replyTypeCard { go a.startDingtalkRobot(ctx, item.RecordID) } return a.getUpdate(ctx, item.RecordID) } // Update 更新数据 func (a *RobotConfig) Update(ctx context.Context, recordID string, item schema.RobotConfig) (*schema.RobotConfig, error) { oldItem, err := a.RobotConfigModel.Get(ctx, recordID) if err != nil { return nil, err } else if oldItem == nil { return nil, errors.ErrNotFound } if err := a.RobotConfigModel.Update(ctx, recordID, item); err != nil { return nil, err } // 同步钉钉机器人监听状态:先按旧配置停掉,再按新配置决定是否拉起 go a.syncDingtalkRobot(ctx, recordID) return a.getUpdate(ctx, recordID) } // Delete 删除数据 func (a *RobotConfig) Delete(ctx context.Context, recordID string) error { oldItem, err := a.RobotConfigModel.Get(ctx, recordID) if err != nil { return err } else if oldItem == nil { return errors.ErrNotFound } // 先停机器人,再删 DB(即使停止失败也不阻塞 DB 操作) if err := dingtalk.GetRobotManager().StopRobot(recordID); err != nil { glog.Warningf(ctx, "停止钉钉机器人[%s]失败:%v", recordID, err) } return a.RobotConfigModel.Delete(ctx, recordID) } // UpdateStatus 更新状态 func (a *RobotConfig) UpdateStatus(ctx context.Context, recordID string, status int) error { oldItem, err := a.RobotConfigModel.Get(ctx, recordID) if err != nil { return err } else if oldItem == nil { return errors.ErrNotFound } if err := a.RobotConfigModel.UpdateStatus(ctx, recordID, status); err != nil { return err } // 状态变更后同步监听 go a.syncDingtalkRobot(ctx, recordID) return nil } // ReceiveMessage 接收HTTP Webhook消息(用于文本回复,ReplyType=1) func (a *RobotConfig) ReceiveMessage(ctx context.Context, id string, message dingtalk.Message) error { robotConfig, err := a.RobotConfigModel.Get(ctx, id) if err != nil { glog.Errorf(ctx, "获取机器人配置失败:%s", err.Error()) return err } assistant, err := a.chatAssistantModel.Get(ctx, robotConfig.AssistantId) if err != nil { glog.Errorf(ctx, "获取对话助手失败:%s", err.Error()) return err } session, err := a.chatSessionModel.Get(ctx, robotConfig.SessionId) if err != nil { glog.Errorf(ctx, "获取对话失败:%s", err.Error()) return err } sessionId := session.RagSessionId stream, err := ragflow.GetHttpClient().ChatCompletionsStream(ctx, assistant.RagChatId, &ragflow.ChatCompletionReq{ Question: message.Text.Content, SessionID: sessionId, }) if err != nil { return err } defer stream.Close() m := new(schema.ChatMessage) m.RecordID = guid.S() m.Question = message.Text.Content m.AssistantId = assistant.RecordID m.SessionId = session.RecordID m.UserId = GetUserID(ctx) m, err = CollectStreamDataAndCreate(ctx, stream, m) if err != nil { return err } if err := a.chatMessageModel.Create(ctx, *m); err != nil { return err } replyMsg := dingtalk.DingReplyMsg{ MsgType: "markdown", Markdown: dingtalk.Markdown{ Title: m.Question, Text: m.Answer, }, } if err := dingtalk.SendReplyMsg(message.SessionWebhook, replyMsg); err != nil { fmt.Println(err) return err } return nil } // RestartAllRobots 服务启动时调用,把 DB 里所有钉钉卡片回复模式的机器人重新拉起 func (a *RobotConfig) RestartAllRobots(ctx context.Context) error { result, err := a.RobotConfigModel.Query(ctx, schema.RobotConfigQueryParam{ ReplyType: replyTypeCard, }) if err != nil { return err } for _, cfg := range result.Data { if cfg.Type != robotTypeDingtalk || cfg.ReplyType != replyTypeCard { continue } botCfg, err := a.buildBotConfig(ctx, cfg) if err != nil { glog.Errorf(ctx, "组装钉钉机器人配置失败[%s]:%v", cfg.RecordID, err) continue } if err := dingtalk.GetRobotManager().StartRobot(botCfg); err != nil { glog.Errorf(ctx, "启动钉钉机器人[%s]失败:%v", cfg.Name, err) continue } glog.Infof(ctx, "钉钉机器人[%s]启动成功", cfg.Name) } return nil } // startDingtalkRobot 按 RobotConfig 拉起 stream 机器人 func (a *RobotConfig) startDingtalkRobot(ctx context.Context, recordID string) { cfg, err := a.RobotConfigModel.Get(ctx, recordID) if err != nil || cfg == nil { glog.Errorf(ctx, "启动钉钉机器人时获取配置失败[%s]:%v", recordID, err) return } botCfg, err := a.buildBotConfig(ctx, cfg) if err != nil { glog.Errorf(ctx, "组装钉钉机器人配置失败[%s]:%v", recordID, err) return } if err := dingtalk.GetRobotManager().StartRobot(botCfg); err != nil { glog.Errorf(ctx, "启动钉钉机器人失败[%s]:%v", cfg.Name, err) return } glog.Infof(ctx, "钉钉机器人[%s]启动成功", cfg.Name) } // syncDingtalkRobot 按当前 DB 中的 RobotConfig 同步 RobotManager(停 + 视情况重启) func (a *RobotConfig) syncDingtalkRobot(ctx context.Context, recordID string) { // 始终先停旧实例(幂等) if err := dingtalk.GetRobotManager().StopRobot(recordID); err != nil { glog.Warningf(ctx, "停止钉钉机器人[%s]失败:%v", recordID, err) } cfg, err := a.RobotConfigModel.Get(ctx, recordID) if err != nil || cfg == nil { return } if cfg.Type != robotTypeDingtalk || cfg.ReplyType != replyTypeCard { return } botCfg, err := a.buildBotConfig(ctx, cfg) if err != nil { glog.Errorf(ctx, "同步钉钉机器人配置失败[%s]:%v", recordID, err) return } if err := dingtalk.GetRobotManager().StartRobot(botCfg); err != nil { glog.Errorf(ctx, "重启钉钉机器人失败[%s]:%v", cfg.Name, err) return } glog.Infof(ctx, "钉钉机器人[%s]重启成功", cfg.Name) } // buildBotConfig 把 schema.RobotConfig + 关联的 RAGFlow chat/session 组装成 dingtalk.BotConfig func (a *RobotConfig) buildBotConfig(ctx context.Context, cfg *schema.RobotConfig) (*dingtalk.BotConfig, error) { if cfg.ClientId == "" || cfg.ClientSecret == "" { return nil, fmt.Errorf("机器人 ClientID/ClientSecret 不能为空") } if cfg.CardTemplateId == "" { return nil, fmt.Errorf("机器人卡片模板 ID 不能为空") } assistant, err := a.chatAssistantModel.Get(ctx, cfg.AssistantId) if err != nil { return nil, fmt.Errorf("获取对话助手失败:%w", err) } if assistant == nil || assistant.RagChatId == "" { return nil, fmt.Errorf("对话助手未关联 RAGFlow chat") } session, err := a.chatSessionModel.Get(ctx, cfg.SessionId) if err != nil { return nil, fmt.Errorf("获取会话失败:%w", err) } if session == nil || session.RagSessionId == "" { return nil, fmt.Errorf("会话未关联 RAGFlow session") } bot := &dingtalk.BotConfig{ RobotConfigID: cfg.RecordID, Name: cfg.Name, ClientID: cfg.ClientId, ClientSecret: cfg.ClientSecret, RobotCode: cfg.ClientId, // 企业内部应用机器人 robotCode 通常等同于 ClientID/AppKey CardTemplateId: cfg.CardTemplateId, RagChatId: assistant.RagChatId, RagSessionId: session.RagSessionId, AssistantId: cfg.AssistantId, SessionId: cfg.SessionId, CreatorId: cfg.CreatorId, } assistantID := cfg.AssistantId sessionID := cfg.SessionId ragSessionID := session.RagSessionId creatorID := cfg.CreatorId bot.OnDingtalkQA = func(ctx context.Context, p dingtalk.DingtalkQAParams) error { return a.chatMessageModel.Create(ctx, schema.ChatMessage{ RecordID: guid.S(), UserId: p.SenderStaffId, AssistantId: assistantID, SessionId: sessionID, RagSessionId: ragSessionID, Question: p.Question, Answer: p.Answer, CreatorId: creatorID, }) } return bot, nil } func (a *RobotConfig) createAssistant(ctx context.Context, assistantId, sessionId string, user schema.User, item schema.RobotConfig) error { return ExecTrans(ctx, a.transModel, func(ctx context.Context) error { dataset, err := a.datasetModel.Query(ctx, schema.DatasetQueryParam{RecordIds: item.Datasets.ToRecordIds()}) if err != nil { return err } // TODO:没有已解析文件的知识库创建对话助手会报错 c, err := ragflow.GetHttpClient().CreateChat(ctx, &ragflow.CreateChatReq{ Name: fmt.Sprintf("%s%s", item.Name, "助手"), DatasetIDs: dataset.Data.ToRagDataIds(), }) if err != nil { return err } if err := a.chatAssistantModel.Create(ctx, schema.ChatAssistant{ RecordID: assistantId, OrgId: user.OrgId, Name: fmt.Sprintf("%s%s", item.Name, "助手"), RagChatId: c.Data.ID, Source: 2, CreatorId: user.RecordID, }); err != nil { return err } for _, v := range item.Datasets { if err := a.chatDatasetModel.Create(ctx, schema.ChatDataset{ RecordID: guid.S(), ChatAssistantId: assistantId, DatasetId: v.RecordID, }); err != nil { return err } } s, err := ragflow.GetHttpClient().CreateSession(ctx, c.Data.ID, &ragflow.CreateSessionReq{ Name: fmt.Sprintf("%s%s", item.Name, "会话"), }) if err != nil { return err } return a.chatSessionModel.Create(ctx, schema.ChatSession{ RecordID: sessionId, Name: fmt.Sprintf("%s%s", item.Name, "会话"), AssistantId: assistantId, RagSessionId: s.Data.Id, CreatorId: user.RecordID, }) }) }