b_robot_config.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. package internal
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/os/glog"
  6. "github.com/gogf/gf/v2/util/guid"
  7. "yx-dataset-server/app/errors"
  8. "yx-dataset-server/app/model"
  9. "yx-dataset-server/app/schema"
  10. "yx-dataset-server/library/dingtalk"
  11. "yx-dataset-server/library/ragflow"
  12. )
  13. // 钉钉机器人类型
  14. const (
  15. robotTypeDingtalk = 1
  16. )
  17. // 钉钉机器人回复类型
  18. const (
  19. replyTypeText = 1 // 文本回复(HTTP Webhook)
  20. replyTypeCard = 2 // 卡片回复(Stream + AI 卡片流式更新)
  21. )
  22. // NewRobotConfig 创建RobotConfig
  23. func NewRobotConfig(
  24. mRobotConfig model.IRobotConfig,
  25. mRobotDataset model.IRobotDataset,
  26. mTrans model.ITrans,
  27. mChatMessage model.IChatMessage,
  28. mChatAssistant model.IChatAssistant,
  29. mChatSession model.IChatSession,
  30. mChatDataset model.IChatDataset,
  31. mUser model.IUser,
  32. mDataset model.IDataset,
  33. mRole model.IRole,
  34. ) *RobotConfig {
  35. return &RobotConfig{
  36. RobotConfigModel: mRobotConfig,
  37. robotDatasetModel: mRobotDataset,
  38. transModel: mTrans,
  39. chatMessageModel: mChatMessage,
  40. chatAssistantModel: mChatAssistant,
  41. chatSessionModel: mChatSession,
  42. chatDatasetModel: mChatDataset,
  43. userModel: mUser,
  44. datasetModel: mDataset,
  45. roleModel: mRole,
  46. }
  47. }
  48. // RobotConfig 创建RobotConfig对象
  49. type RobotConfig struct {
  50. RobotConfigModel model.IRobotConfig
  51. robotDatasetModel model.IRobotDataset
  52. transModel model.ITrans
  53. chatMessageModel model.IChatMessage
  54. chatAssistantModel model.IChatAssistant
  55. chatSessionModel model.IChatSession
  56. chatDatasetModel model.IChatDataset
  57. userModel model.IUser
  58. datasetModel model.IDataset
  59. roleModel model.IRole
  60. }
  61. // Query 查询数据
  62. func (a *RobotConfig) Query(ctx context.Context, params schema.RobotConfigQueryParam, opts ...schema.RobotConfigQueryOptions) (*schema.RobotConfigQueryResult, error) {
  63. if !CheckIsRootUser(ctx) {
  64. user, err := a.userModel.Get(ctx, GetUserID(ctx))
  65. if err != nil {
  66. return nil, err
  67. }
  68. params.OrgId = user.OrgId
  69. role, err := a.roleModel.Get(ctx, user.RoleId)
  70. if err != nil {
  71. return nil, err
  72. }
  73. if role.Code == "99" {
  74. params.UserId = GetUserID(ctx)
  75. }
  76. }
  77. users, err := a.userModel.Query(ctx, schema.UserQueryParam{})
  78. if err != nil {
  79. return nil, err
  80. }
  81. result, err := a.RobotConfigModel.Query(ctx, params, opts...)
  82. if err != nil {
  83. return nil, err
  84. }
  85. result.Data.FillCreator(users.Data)
  86. return result, nil
  87. }
  88. // Get 查询指定数据
  89. func (a *RobotConfig) Get(ctx context.Context, recordID string, opts ...schema.RobotConfigQueryOptions) (*schema.RobotConfig, error) {
  90. item, err := a.RobotConfigModel.Get(ctx, recordID, opts...)
  91. if err != nil {
  92. return nil, err
  93. } else if item == nil {
  94. return nil, errors.ErrNotFound
  95. }
  96. user, err := a.userModel.Get(ctx, item.CreatorId)
  97. if err != nil {
  98. return nil, err
  99. }
  100. item.CreatorName = user.RealName
  101. robotDatasets, err := a.robotDatasetModel.Query(ctx, schema.RobotDatasetQueryParam{
  102. RobotId: item.RecordID,
  103. })
  104. if err != nil {
  105. return nil, err
  106. }
  107. if len(robotDatasets.Data) > 0 {
  108. datasets, err := a.datasetModel.Query(ctx, schema.DatasetQueryParam{RecordIds: robotDatasets.Data.ToDatasetIds()})
  109. if err != nil {
  110. return nil, err
  111. }
  112. item.Datasets = datasets.Data
  113. }
  114. return item, nil
  115. }
  116. func (a *RobotConfig) getUpdate(ctx context.Context, recordID string) (*schema.RobotConfig, error) {
  117. return a.Get(ctx, recordID)
  118. }
  119. // Create 创建数据
  120. func (a *RobotConfig) Create(ctx context.Context, item schema.RobotConfig) (*schema.RobotConfig, error) {
  121. item.RecordID = guid.S()
  122. item.CreatorId = GetUserID(ctx)
  123. item.AssistantId = guid.S()
  124. item.SessionId = guid.S()
  125. item.Webhook = fmt.Sprintf("%s%s", "https://dataset.yongxulvjian.com/robot/v1/robots/receive/", item.RecordID)
  126. user := GetRootUser()
  127. var err error
  128. if !CheckIsRootUser(ctx) {
  129. user, err = a.userModel.Get(ctx, GetUserID(ctx))
  130. if err != nil {
  131. return nil, err
  132. }
  133. item.OrgId = user.OrgId
  134. }
  135. // 创建对话助手及会话
  136. if err := a.createAssistant(ctx, item.AssistantId, item.SessionId, *user, item); err != nil {
  137. return nil, err
  138. }
  139. // 在事务中无条件落库 robot_config,并按需创建 robot_dataset
  140. err = ExecTrans(ctx, a.transModel, func(ctx context.Context) error {
  141. for _, dataset := range item.Datasets {
  142. if err := a.robotDatasetModel.Create(ctx, schema.RobotDataset{
  143. RecordID: guid.S(),
  144. RobotId: item.RecordID,
  145. DatasetId: dataset.RecordID,
  146. }); err != nil {
  147. return err
  148. }
  149. }
  150. return a.RobotConfigModel.Create(ctx, item)
  151. })
  152. if err != nil {
  153. return nil, err
  154. }
  155. // 钉钉机器人 + 卡片回复模式:拉起 stream 监听
  156. if item.Type == robotTypeDingtalk && item.ReplyType == replyTypeCard {
  157. go a.startDingtalkRobot(ctx, item.RecordID)
  158. }
  159. return a.getUpdate(ctx, item.RecordID)
  160. }
  161. // Update 更新数据
  162. func (a *RobotConfig) Update(ctx context.Context, recordID string, item schema.RobotConfig) (*schema.RobotConfig, error) {
  163. oldItem, err := a.RobotConfigModel.Get(ctx, recordID)
  164. if err != nil {
  165. return nil, err
  166. } else if oldItem == nil {
  167. return nil, errors.ErrNotFound
  168. }
  169. if err := a.RobotConfigModel.Update(ctx, recordID, item); err != nil {
  170. return nil, err
  171. }
  172. // 同步钉钉机器人监听状态:先按旧配置停掉,再按新配置决定是否拉起
  173. go a.syncDingtalkRobot(ctx, recordID)
  174. return a.getUpdate(ctx, recordID)
  175. }
  176. // Delete 删除数据
  177. func (a *RobotConfig) Delete(ctx context.Context, recordID string) error {
  178. oldItem, err := a.RobotConfigModel.Get(ctx, recordID)
  179. if err != nil {
  180. return err
  181. } else if oldItem == nil {
  182. return errors.ErrNotFound
  183. }
  184. // 先停机器人,再删 DB(即使停止失败也不阻塞 DB 操作)
  185. if err := dingtalk.GetRobotManager().StopRobot(recordID); err != nil {
  186. glog.Warningf(ctx, "停止钉钉机器人[%s]失败:%v", recordID, err)
  187. }
  188. return a.RobotConfigModel.Delete(ctx, recordID)
  189. }
  190. // UpdateStatus 更新状态
  191. func (a *RobotConfig) UpdateStatus(ctx context.Context, recordID string, status int) error {
  192. oldItem, err := a.RobotConfigModel.Get(ctx, recordID)
  193. if err != nil {
  194. return err
  195. } else if oldItem == nil {
  196. return errors.ErrNotFound
  197. }
  198. if err := a.RobotConfigModel.UpdateStatus(ctx, recordID, status); err != nil {
  199. return err
  200. }
  201. // 状态变更后同步监听
  202. go a.syncDingtalkRobot(ctx, recordID)
  203. return nil
  204. }
  205. // ReceiveMessage 接收HTTP Webhook消息(用于文本回复,ReplyType=1)
  206. func (a *RobotConfig) ReceiveMessage(ctx context.Context, id string, message dingtalk.Message) error {
  207. robotConfig, err := a.RobotConfigModel.Get(ctx, id)
  208. if err != nil {
  209. glog.Errorf(ctx, "获取机器人配置失败:%s", err.Error())
  210. return err
  211. }
  212. assistant, err := a.chatAssistantModel.Get(ctx, robotConfig.AssistantId)
  213. if err != nil {
  214. glog.Errorf(ctx, "获取对话助手失败:%s", err.Error())
  215. return err
  216. }
  217. session, err := a.chatSessionModel.Get(ctx, robotConfig.SessionId)
  218. if err != nil {
  219. glog.Errorf(ctx, "获取对话失败:%s", err.Error())
  220. return err
  221. }
  222. sessionId := session.RagSessionId
  223. stream, err := ragflow.GetHttpClient().ChatCompletionsStream(ctx, assistant.RagChatId, &ragflow.ChatCompletionReq{
  224. Question: message.Text.Content,
  225. SessionID: sessionId,
  226. })
  227. if err != nil {
  228. return err
  229. }
  230. defer stream.Close()
  231. m := new(schema.ChatMessage)
  232. m.RecordID = guid.S()
  233. m.Question = message.Text.Content
  234. m.AssistantId = assistant.RecordID
  235. m.SessionId = session.RecordID
  236. m.UserId = GetUserID(ctx)
  237. m, err = CollectStreamDataAndCreate(ctx, stream, m)
  238. if err != nil {
  239. return err
  240. }
  241. if err := a.chatMessageModel.Create(ctx, *m); err != nil {
  242. return err
  243. }
  244. replyMsg := dingtalk.DingReplyMsg{
  245. MsgType: "markdown",
  246. Markdown: dingtalk.Markdown{
  247. Title: m.Question,
  248. Text: m.Answer,
  249. },
  250. }
  251. if err := dingtalk.SendReplyMsg(message.SessionWebhook, replyMsg); err != nil {
  252. fmt.Println(err)
  253. return err
  254. }
  255. return nil
  256. }
  257. // RestartAllRobots 服务启动时调用,把 DB 里所有钉钉卡片回复模式的机器人重新拉起
  258. func (a *RobotConfig) RestartAllRobots(ctx context.Context) error {
  259. result, err := a.RobotConfigModel.Query(ctx, schema.RobotConfigQueryParam{
  260. ReplyType: replyTypeCard,
  261. })
  262. if err != nil {
  263. return err
  264. }
  265. for _, cfg := range result.Data {
  266. if cfg.Type != robotTypeDingtalk || cfg.ReplyType != replyTypeCard {
  267. continue
  268. }
  269. botCfg, err := a.buildBotConfig(ctx, cfg)
  270. if err != nil {
  271. glog.Errorf(ctx, "组装钉钉机器人配置失败[%s]:%v", cfg.RecordID, err)
  272. continue
  273. }
  274. if err := dingtalk.GetRobotManager().StartRobot(botCfg); err != nil {
  275. glog.Errorf(ctx, "启动钉钉机器人[%s]失败:%v", cfg.Name, err)
  276. continue
  277. }
  278. glog.Infof(ctx, "钉钉机器人[%s]启动成功", cfg.Name)
  279. }
  280. return nil
  281. }
  282. // startDingtalkRobot 按 RobotConfig 拉起 stream 机器人
  283. func (a *RobotConfig) startDingtalkRobot(ctx context.Context, recordID string) {
  284. cfg, err := a.RobotConfigModel.Get(ctx, recordID)
  285. if err != nil || cfg == nil {
  286. glog.Errorf(ctx, "启动钉钉机器人时获取配置失败[%s]:%v", recordID, err)
  287. return
  288. }
  289. botCfg, err := a.buildBotConfig(ctx, cfg)
  290. if err != nil {
  291. glog.Errorf(ctx, "组装钉钉机器人配置失败[%s]:%v", recordID, err)
  292. return
  293. }
  294. if err := dingtalk.GetRobotManager().StartRobot(botCfg); err != nil {
  295. glog.Errorf(ctx, "启动钉钉机器人失败[%s]:%v", cfg.Name, err)
  296. return
  297. }
  298. glog.Infof(ctx, "钉钉机器人[%s]启动成功", cfg.Name)
  299. }
  300. // syncDingtalkRobot 按当前 DB 中的 RobotConfig 同步 RobotManager(停 + 视情况重启)
  301. func (a *RobotConfig) syncDingtalkRobot(ctx context.Context, recordID string) {
  302. // 始终先停旧实例(幂等)
  303. if err := dingtalk.GetRobotManager().StopRobot(recordID); err != nil {
  304. glog.Warningf(ctx, "停止钉钉机器人[%s]失败:%v", recordID, err)
  305. }
  306. cfg, err := a.RobotConfigModel.Get(ctx, recordID)
  307. if err != nil || cfg == nil {
  308. return
  309. }
  310. if cfg.Type != robotTypeDingtalk || cfg.ReplyType != replyTypeCard {
  311. return
  312. }
  313. botCfg, err := a.buildBotConfig(ctx, cfg)
  314. if err != nil {
  315. glog.Errorf(ctx, "同步钉钉机器人配置失败[%s]:%v", recordID, err)
  316. return
  317. }
  318. if err := dingtalk.GetRobotManager().StartRobot(botCfg); err != nil {
  319. glog.Errorf(ctx, "重启钉钉机器人失败[%s]:%v", cfg.Name, err)
  320. return
  321. }
  322. glog.Infof(ctx, "钉钉机器人[%s]重启成功", cfg.Name)
  323. }
  324. // buildBotConfig 把 schema.RobotConfig + 关联的 RAGFlow chat/session 组装成 dingtalk.BotConfig
  325. func (a *RobotConfig) buildBotConfig(ctx context.Context, cfg *schema.RobotConfig) (*dingtalk.BotConfig, error) {
  326. if cfg.ClientId == "" || cfg.ClientSecret == "" {
  327. return nil, fmt.Errorf("机器人 ClientID/ClientSecret 不能为空")
  328. }
  329. if cfg.CardTemplateId == "" {
  330. return nil, fmt.Errorf("机器人卡片模板 ID 不能为空")
  331. }
  332. assistant, err := a.chatAssistantModel.Get(ctx, cfg.AssistantId)
  333. if err != nil {
  334. return nil, fmt.Errorf("获取对话助手失败:%w", err)
  335. }
  336. if assistant == nil || assistant.RagChatId == "" {
  337. return nil, fmt.Errorf("对话助手未关联 RAGFlow chat")
  338. }
  339. session, err := a.chatSessionModel.Get(ctx, cfg.SessionId)
  340. if err != nil {
  341. return nil, fmt.Errorf("获取会话失败:%w", err)
  342. }
  343. if session == nil || session.RagSessionId == "" {
  344. return nil, fmt.Errorf("会话未关联 RAGFlow session")
  345. }
  346. bot := &dingtalk.BotConfig{
  347. RobotConfigID: cfg.RecordID,
  348. Name: cfg.Name,
  349. ClientID: cfg.ClientId,
  350. ClientSecret: cfg.ClientSecret,
  351. RobotCode: cfg.ClientId, // 企业内部应用机器人 robotCode 通常等同于 ClientID/AppKey
  352. CardTemplateId: cfg.CardTemplateId,
  353. RagChatId: assistant.RagChatId,
  354. RagSessionId: session.RagSessionId,
  355. AssistantId: cfg.AssistantId,
  356. SessionId: cfg.SessionId,
  357. CreatorId: cfg.CreatorId,
  358. }
  359. assistantID := cfg.AssistantId
  360. sessionID := cfg.SessionId
  361. ragSessionID := session.RagSessionId
  362. creatorID := cfg.CreatorId
  363. bot.OnDingtalkQA = func(ctx context.Context, p dingtalk.DingtalkQAParams) error {
  364. return a.chatMessageModel.Create(ctx, schema.ChatMessage{
  365. RecordID: guid.S(),
  366. UserId: p.SenderStaffId,
  367. AssistantId: assistantID,
  368. SessionId: sessionID,
  369. RagSessionId: ragSessionID,
  370. Question: p.Question,
  371. Answer: p.Answer,
  372. CreatorId: creatorID,
  373. })
  374. }
  375. return bot, nil
  376. }
  377. func (a *RobotConfig) createAssistant(ctx context.Context, assistantId, sessionId string, user schema.User, item schema.RobotConfig) error {
  378. return ExecTrans(ctx, a.transModel, func(ctx context.Context) error {
  379. dataset, err := a.datasetModel.Query(ctx, schema.DatasetQueryParam{RecordIds: item.Datasets.ToRecordIds()})
  380. if err != nil {
  381. return err
  382. }
  383. // TODO:没有已解析文件的知识库创建对话助手会报错
  384. c, err := ragflow.GetHttpClient().CreateChat(ctx, &ragflow.CreateChatReq{
  385. Name: fmt.Sprintf("%s%s", item.Name, "助手"),
  386. DatasetIDs: dataset.Data.ToRagDataIds(),
  387. })
  388. if err != nil {
  389. return err
  390. }
  391. if err := a.chatAssistantModel.Create(ctx, schema.ChatAssistant{
  392. RecordID: assistantId,
  393. OrgId: user.OrgId,
  394. Name: fmt.Sprintf("%s%s", item.Name, "助手"),
  395. RagChatId: c.Data.ID,
  396. Source: 2,
  397. CreatorId: user.RecordID,
  398. }); err != nil {
  399. return err
  400. }
  401. for _, v := range item.Datasets {
  402. if err := a.chatDatasetModel.Create(ctx, schema.ChatDataset{
  403. RecordID: guid.S(),
  404. ChatAssistantId: assistantId,
  405. DatasetId: v.RecordID,
  406. }); err != nil {
  407. return err
  408. }
  409. }
  410. s, err := ragflow.GetHttpClient().CreateSession(ctx, c.Data.ID, &ragflow.CreateSessionReq{
  411. Name: fmt.Sprintf("%s%s", item.Name, "会话"),
  412. })
  413. if err != nil {
  414. return err
  415. }
  416. return a.chatSessionModel.Create(ctx, schema.ChatSession{
  417. RecordID: sessionId,
  418. Name: fmt.Sprintf("%s%s", item.Name, "会话"),
  419. AssistantId: assistantId,
  420. RagSessionId: s.Data.Id,
  421. CreatorId: user.RecordID,
  422. })
  423. })
  424. }