robot.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. package dingtalk
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net/http"
  10. "regexp"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/google/uuid"
  15. dtcard "github.com/open-dingtalk/dingtalk-stream-sdk-go/card"
  16. "github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot"
  17. "github.com/open-dingtalk/dingtalk-stream-sdk-go/client"
  18. "yx-dataset-server/library/ragflow"
  19. )
  20. // Stream 连接异常后的外层重连间隔
  21. const (
  22. streamReconnectInitialDelay = 3 * time.Second
  23. streamReconnectMaxDelay = 60 * time.Second
  24. )
  25. // referenceIDRegex 用于过滤 RAGFlow 答复中的 [ID:n] 引用标记
  26. var referenceIDRegex = regexp.MustCompile(`\[ID:\d+\]`)
  27. // Robot 机器人实例(每个机器人独立 stream 长连接)
  28. type Robot struct {
  29. config *BotConfig
  30. dtClient *Client
  31. cardSvc *CardService
  32. streamCli *client.StreamClient
  33. ctx context.Context
  34. cancel context.CancelFunc
  35. mu sync.Mutex
  36. isRunning bool
  37. }
  38. // RobotManager 机器人管理器(全局单例)
  39. type RobotManager struct {
  40. robots map[string]*Robot // key: RobotConfigID
  41. mu sync.RWMutex
  42. }
  43. var (
  44. manager *RobotManager
  45. managerOnce sync.Once
  46. )
  47. // GetRobotManager 获取机器人管理器单例
  48. func GetRobotManager() *RobotManager {
  49. managerOnce.Do(func() {
  50. manager = &RobotManager{
  51. robots: make(map[string]*Robot),
  52. }
  53. })
  54. return manager
  55. }
  56. // NewRobot 创建机器人实例
  57. func NewRobot(cfg *BotConfig) *Robot {
  58. ctx, cancel := context.WithCancel(context.Background())
  59. dtClient := NewDingTalkClient(cfg.ClientID, cfg.ClientSecret)
  60. // robotCode 缺省回退到 ClientID(企业内部应用机器人通常一致)
  61. if cfg.RobotCode == "" {
  62. cfg.RobotCode = cfg.ClientID
  63. }
  64. return &Robot{
  65. config: cfg,
  66. dtClient: dtClient,
  67. cardSvc: NewCardService(dtClient),
  68. ctx: ctx,
  69. cancel: cancel,
  70. }
  71. }
  72. // Start 启动 Stream 监听
  73. //
  74. // SDK 行为说明(v0.8.0):
  75. // - cli.Start(ctx) 建连成功后会立刻返回 nil(非阻塞),真正的读循环在 SDK 内部 goroutine (processLoop) 里跑。
  76. // - SDK 默认 AutoReconnect=true,读循环挂掉后会自己起一个 reconnect goroutine 无限重连,外层不需要也不应该再做重连循环,
  77. // 否则会把 SDK 内部已建好的连接主动 Close 掉,形成"建连-关连接"的死循环。
  78. // - 我们外层唯一要做的是"首次建连失败时退避重试"——因为 GetConnectionEndpoint/Dial 失败会直接返回 err。
  79. func (r *Robot) Start() error {
  80. r.mu.Lock()
  81. if r.isRunning {
  82. r.mu.Unlock()
  83. return fmt.Errorf("机器人[%s]已在运行中", r.config.Name)
  84. }
  85. cli := client.NewStreamClient(
  86. client.WithAppCredential(client.NewAppCredentialConfig(r.config.ClientID, r.config.ClientSecret)),
  87. client.WithAutoReconnect(true),
  88. )
  89. cli.RegisterChatBotCallbackRouter(r.onMessage)
  90. // 注册 card 回调(避免 DingTalk 向该 stream 推送卡片事件时被当作"未知 topic"丢弃并产生噪音日志)
  91. cli.RegisterCardCallbackRouter(r.onCardCallback)
  92. r.streamCli = cli
  93. r.isRunning = true
  94. r.mu.Unlock()
  95. go r.runInitialConnect(cli)
  96. return nil
  97. }
  98. // runInitialConnect 仅负责"首次建连"的重试;一旦 Start 返回 nil(连接建立成功),
  99. // 后续断线/重连完全交给 SDK 内部的 AutoReconnect 机制处理。
  100. func (r *Robot) runInitialConnect(cli *client.StreamClient) {
  101. defer func() {
  102. if rec := recover(); rec != nil {
  103. log.Printf("机器人[%s] 建连循环 panic: %v", r.config.Name, rec)
  104. }
  105. }()
  106. delay := streamReconnectInitialDelay
  107. for {
  108. select {
  109. case <-r.ctx.Done():
  110. return
  111. default:
  112. }
  113. err := cli.Start(r.ctx)
  114. if err == nil {
  115. return
  116. }
  117. select {
  118. case <-r.ctx.Done():
  119. return
  120. case <-time.After(delay):
  121. }
  122. if delay < streamReconnectMaxDelay {
  123. delay *= 2
  124. if delay > streamReconnectMaxDelay {
  125. delay = streamReconnectMaxDelay
  126. }
  127. }
  128. }
  129. }
  130. // Stop 停止机器人
  131. //
  132. // 注意:SDK 默认 AutoReconnect=true,单纯 Close 会触发 SDK 内部的 reconnect goroutine 立刻重建连接。
  133. // 因此这里必须先把 cli.AutoReconnect 置为 false,再 Close,才能真正停掉。
  134. func (r *Robot) Stop() {
  135. r.mu.Lock()
  136. if !r.isRunning {
  137. r.mu.Unlock()
  138. return
  139. }
  140. r.isRunning = false
  141. cli := r.streamCli
  142. r.streamCli = nil
  143. r.mu.Unlock()
  144. r.cancel()
  145. if cli != nil {
  146. cli.AutoReconnect = false
  147. cli.Close()
  148. }
  149. }
  150. // IsRunning 是否运行中
  151. func (r *Robot) IsRunning() bool {
  152. r.mu.Lock()
  153. defer r.mu.Unlock()
  154. return r.isRunning
  155. }
  156. // Config 当前配置
  157. func (r *Robot) Config() *BotConfig { return r.config }
  158. // StartRobot 启动并注册到管理器(按 RobotConfigID 唯一)
  159. func (m *RobotManager) StartRobot(cfg *BotConfig) error {
  160. if cfg == nil {
  161. return fmt.Errorf("BotConfig 不能为空")
  162. }
  163. if cfg.RobotConfigID == "" {
  164. return fmt.Errorf("BotConfig.RobotConfigID 不能为空")
  165. }
  166. m.mu.Lock()
  167. defer m.mu.Unlock()
  168. if _, exists := m.robots[cfg.RobotConfigID]; exists {
  169. return fmt.Errorf("机器人[%s]已存在", cfg.RobotConfigID)
  170. }
  171. robot := NewRobot(cfg)
  172. if err := robot.Start(); err != nil {
  173. return err
  174. }
  175. m.robots[cfg.RobotConfigID] = robot
  176. return nil
  177. }
  178. // StopRobot 按 RobotConfigID 停止并移除机器人
  179. func (m *RobotManager) StopRobot(robotConfigID string) error {
  180. m.mu.Lock()
  181. defer m.mu.Unlock()
  182. robot, exists := m.robots[robotConfigID]
  183. if !exists {
  184. return nil // 不存在视为已停止,幂等处理
  185. }
  186. robot.Stop()
  187. delete(m.robots, robotConfigID)
  188. return nil
  189. }
  190. // RestartRobot 按 RobotConfigID 重启(更新配置场景)
  191. func (m *RobotManager) RestartRobot(cfg *BotConfig) error {
  192. if err := m.StopRobot(cfg.RobotConfigID); err != nil {
  193. return err
  194. }
  195. return m.StartRobot(cfg)
  196. }
  197. // GetRobot 获取指定机器人
  198. func (m *RobotManager) GetRobot(robotConfigID string) (*Robot, bool) {
  199. m.mu.RLock()
  200. defer m.mu.RUnlock()
  201. robot, exists := m.robots[robotConfigID]
  202. return robot, exists
  203. }
  204. // ListRobots 列出所有机器人
  205. func (m *RobotManager) ListRobots() []*BotConfig {
  206. m.mu.RLock()
  207. defer m.mu.RUnlock()
  208. configs := make([]*BotConfig, 0, len(m.robots))
  209. for _, robot := range m.robots {
  210. configs = append(configs, robot.config)
  211. }
  212. return configs
  213. }
  214. // onMessage 钉钉消息回调:钉钉要求回调在有限时间内(通常 5s)返回 ACK,
  215. // 因此此处只做最小校验 + 异步处理,立刻返回 ACK。
  216. func (r *Robot) onMessage(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) {
  217. if data == nil {
  218. return []byte(`{"msg":"empty"}`), nil
  219. }
  220. question := strings.TrimSpace(data.Text.Content)
  221. if question == "" {
  222. return []byte(`{"msg":"empty question"}`), nil
  223. }
  224. outTrackId := uuid.NewString()
  225. senderStaffId := data.SenderStaffId
  226. conversationId := data.ConversationId
  227. conversationType := data.ConversationType
  228. // 异步:创建并投放卡片 + RAGFlow 全量问答 + 一次更新卡片 + 落库,脱离回调 ACK 时限
  229. go func() {
  230. defer func() { recover() }()
  231. deliver := &DeliverParams{
  232. OutTrackId: outTrackId,
  233. CardTemplateId: r.config.CardTemplateId,
  234. RobotCode: r.config.RobotCode,
  235. UserID: senderStaffId,
  236. ConversationID: conversationId,
  237. ConversationType: conversationType,
  238. }
  239. if err := r.cardSvc.CreateAndDeliverCard(deliver); err != nil {
  240. return
  241. }
  242. r.handleRAGStream(outTrackId, question, senderStaffId)
  243. }()
  244. return []byte(`{"msg":true}`), nil
  245. }
  246. // onCardCallback 卡片回调占位:目前我们只做主动推送,不依赖卡片内按钮/表单事件,
  247. // 但仍需注册 handler,否则 SDK 会打印 "no handler for topic" 之类的噪音日志。
  248. // 后续若支持按钮动作(例如「重新生成」「停止」),可在此处扩展。
  249. func (r *Robot) onCardCallback(_ context.Context, req *dtcard.CardRequest) (*dtcard.CardResponse, error) {
  250. _ = req
  251. return &dtcard.CardResponse{}, nil
  252. }
  253. // handleRAGStream RAGFlow 非流式拉全量答复,再一次性 StreamingUpdate(finalize);成功后可选落库 chat_message。
  254. func (r *Robot) handleRAGStream(outTrackId, question, senderStaffId string) {
  255. defer func() {
  256. if rec := recover(); rec != nil {
  257. _ = r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, "**服务异常,请稍后重试**", true, true)
  258. }
  259. }()
  260. resp, err := ragflow.GetHttpClient().ChatCompletions(r.ctx, r.config.RagChatId, &ragflow.ChatCompletionReq{
  261. Question: question,
  262. Stream: false,
  263. SessionID: r.config.RagSessionId,
  264. })
  265. if err != nil {
  266. _ = r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, "**服务异常,请稍后重试**", true, true)
  267. return
  268. }
  269. if resp.Code != 0 {
  270. hint := "**服务异常,请稍后重试**"
  271. if resp.Message != "" {
  272. hint = fmt.Sprintf("**RAGFlow 错误(%d)**:%s", resp.Code, resp.Message)
  273. }
  274. _ = r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, hint, true, true)
  275. return
  276. }
  277. answer := strings.TrimSpace(resp.Data.Answer)
  278. if answer == "" {
  279. _ = r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, "**未获取到回复内容**", true, true)
  280. return
  281. }
  282. cleaned := referenceIDRegex.ReplaceAllString(answer, "")
  283. if err := r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, cleaned, true, false); err != nil {
  284. return
  285. }
  286. if r.config.OnDingtalkQA != nil {
  287. if err := r.config.OnDingtalkQA(r.ctx, DingtalkQAParams{
  288. Question: question,
  289. Answer: cleaned,
  290. SenderStaffId: senderStaffId,
  291. }); err != nil {
  292. log.Printf("写入 chat_message 失败: %v", err)
  293. }
  294. }
  295. }
  296. // StartRobotWithDelay 带延迟的机器人启动
  297. func StartRobotWithDelay(cfg *BotConfig, delay time.Duration) {
  298. time.Sleep(delay)
  299. _ = GetRobotManager().StartRobot(cfg)
  300. }
  301. // ============ 以下为 HTTP Webhook 文本回复模式(ReplyType=1)所用的结构体 ============
  302. // Message 钉钉 HTTP Webhook 消息结构体
  303. type Message struct {
  304. ConversationID string `json:"conversationId"`
  305. AtUsers []AtUser `json:"atUsers"`
  306. ChatbotCorpID string `json:"chatbotCorpId"`
  307. ChatbotUserID string `json:"chatbotUserId"`
  308. MsgID string `json:"msgId"`
  309. SenderNick string `json:"senderNick"`
  310. IsAdmin bool `json:"isAdmin"`
  311. SenderStaffID string `json:"senderStaffId"`
  312. SessionWebhookExpiredTime int64 `json:"sessionWebhookExpiredTime"`
  313. CreateAt int64 `json:"createAt"`
  314. SenderCorpID string `json:"senderCorpId"`
  315. ConversationType string `json:"conversationType"`
  316. SenderID string `json:"senderId"`
  317. ConversationTitle string `json:"conversationTitle"`
  318. IsInAtList bool `json:"isInAtList"`
  319. SessionWebhook string `json:"sessionWebhook"`
  320. Text Text `json:"text"`
  321. MsgType string `json:"msgtype"`
  322. }
  323. // AtUser @的用户信息
  324. type AtUser struct {
  325. DingtalkID string `json:"dingtalkId"`
  326. StaffID string `json:"staffId"`
  327. }
  328. // Text 文本消息结构
  329. type Text struct {
  330. Content string `json:"content"`
  331. }
  332. // Markdown markdown 消息结构
  333. type Markdown struct {
  334. Title string `json:"title"`
  335. Text string `json:"text"`
  336. }
  337. // DingReplyMsg 钉钉 webhook 回复消息体
  338. type DingReplyMsg struct {
  339. MsgType string `json:"msgtype"`
  340. Text Text `json:"text"`
  341. Markdown Markdown `json:"markdown"`
  342. }
  343. // SendReplyMsg 通过 sessionWebhook 发送回复(HTTP webhook 文本模式)
  344. func SendReplyMsg(webhookUrl string, reply DingReplyMsg) error {
  345. jsonData, err := json.Marshal(reply)
  346. if err != nil {
  347. return fmt.Errorf("序列化消息失败:%v", err)
  348. }
  349. resp, err := http.Post(webhookUrl, "application/json", bytes.NewBuffer(jsonData))
  350. if err != nil {
  351. return fmt.Errorf("发送请求失败:%v", err)
  352. }
  353. defer resp.Body.Close()
  354. respBody, _ := io.ReadAll(resp.Body)
  355. fmt.Printf("钉钉回复接口响应:%s\n", string(respBody))
  356. return nil
  357. }