| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- package dingtalk
- import (
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "io"
- "log"
- "net/http"
- "regexp"
- "strings"
- "sync"
- "time"
- "github.com/google/uuid"
- dtcard "github.com/open-dingtalk/dingtalk-stream-sdk-go/card"
- "github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot"
- "github.com/open-dingtalk/dingtalk-stream-sdk-go/client"
- "yx-dataset-server/library/ragflow"
- )
- // Stream 连接异常后的外层重连间隔
- const (
- streamReconnectInitialDelay = 3 * time.Second
- streamReconnectMaxDelay = 60 * time.Second
- )
- // referenceIDRegex 用于过滤 RAGFlow 答复中的 [ID:n] 引用标记
- var referenceIDRegex = regexp.MustCompile(`\[ID:\d+\]`)
- // Robot 机器人实例(每个机器人独立 stream 长连接)
- type Robot struct {
- config *BotConfig
- dtClient *Client
- cardSvc *CardService
- streamCli *client.StreamClient
- ctx context.Context
- cancel context.CancelFunc
- mu sync.Mutex
- isRunning bool
- }
- // RobotManager 机器人管理器(全局单例)
- type RobotManager struct {
- robots map[string]*Robot // key: RobotConfigID
- mu sync.RWMutex
- }
- var (
- manager *RobotManager
- managerOnce sync.Once
- )
- // GetRobotManager 获取机器人管理器单例
- func GetRobotManager() *RobotManager {
- managerOnce.Do(func() {
- manager = &RobotManager{
- robots: make(map[string]*Robot),
- }
- })
- return manager
- }
- // NewRobot 创建机器人实例
- func NewRobot(cfg *BotConfig) *Robot {
- ctx, cancel := context.WithCancel(context.Background())
- dtClient := NewDingTalkClient(cfg.ClientID, cfg.ClientSecret)
- // robotCode 缺省回退到 ClientID(企业内部应用机器人通常一致)
- if cfg.RobotCode == "" {
- cfg.RobotCode = cfg.ClientID
- }
- return &Robot{
- config: cfg,
- dtClient: dtClient,
- cardSvc: NewCardService(dtClient),
- ctx: ctx,
- cancel: cancel,
- }
- }
- // Start 启动 Stream 监听
- //
- // SDK 行为说明(v0.8.0):
- // - cli.Start(ctx) 建连成功后会立刻返回 nil(非阻塞),真正的读循环在 SDK 内部 goroutine (processLoop) 里跑。
- // - SDK 默认 AutoReconnect=true,读循环挂掉后会自己起一个 reconnect goroutine 无限重连,外层不需要也不应该再做重连循环,
- // 否则会把 SDK 内部已建好的连接主动 Close 掉,形成"建连-关连接"的死循环。
- // - 我们外层唯一要做的是"首次建连失败时退避重试"——因为 GetConnectionEndpoint/Dial 失败会直接返回 err。
- func (r *Robot) Start() error {
- r.mu.Lock()
- if r.isRunning {
- r.mu.Unlock()
- return fmt.Errorf("机器人[%s]已在运行中", r.config.Name)
- }
- cli := client.NewStreamClient(
- client.WithAppCredential(client.NewAppCredentialConfig(r.config.ClientID, r.config.ClientSecret)),
- client.WithAutoReconnect(true),
- )
- cli.RegisterChatBotCallbackRouter(r.onMessage)
- // 注册 card 回调(避免 DingTalk 向该 stream 推送卡片事件时被当作"未知 topic"丢弃并产生噪音日志)
- cli.RegisterCardCallbackRouter(r.onCardCallback)
- r.streamCli = cli
- r.isRunning = true
- r.mu.Unlock()
- go r.runInitialConnect(cli)
- return nil
- }
- // runInitialConnect 仅负责"首次建连"的重试;一旦 Start 返回 nil(连接建立成功),
- // 后续断线/重连完全交给 SDK 内部的 AutoReconnect 机制处理。
- func (r *Robot) runInitialConnect(cli *client.StreamClient) {
- defer func() {
- if rec := recover(); rec != nil {
- log.Printf("机器人[%s] 建连循环 panic: %v", r.config.Name, rec)
- }
- }()
- delay := streamReconnectInitialDelay
- for {
- select {
- case <-r.ctx.Done():
- return
- default:
- }
- err := cli.Start(r.ctx)
- if err == nil {
- return
- }
- select {
- case <-r.ctx.Done():
- return
- case <-time.After(delay):
- }
- if delay < streamReconnectMaxDelay {
- delay *= 2
- if delay > streamReconnectMaxDelay {
- delay = streamReconnectMaxDelay
- }
- }
- }
- }
- // Stop 停止机器人
- //
- // 注意:SDK 默认 AutoReconnect=true,单纯 Close 会触发 SDK 内部的 reconnect goroutine 立刻重建连接。
- // 因此这里必须先把 cli.AutoReconnect 置为 false,再 Close,才能真正停掉。
- func (r *Robot) Stop() {
- r.mu.Lock()
- if !r.isRunning {
- r.mu.Unlock()
- return
- }
- r.isRunning = false
- cli := r.streamCli
- r.streamCli = nil
- r.mu.Unlock()
- r.cancel()
- if cli != nil {
- cli.AutoReconnect = false
- cli.Close()
- }
- }
- // IsRunning 是否运行中
- func (r *Robot) IsRunning() bool {
- r.mu.Lock()
- defer r.mu.Unlock()
- return r.isRunning
- }
- // Config 当前配置
- func (r *Robot) Config() *BotConfig { return r.config }
- // StartRobot 启动并注册到管理器(按 RobotConfigID 唯一)
- func (m *RobotManager) StartRobot(cfg *BotConfig) error {
- if cfg == nil {
- return fmt.Errorf("BotConfig 不能为空")
- }
- if cfg.RobotConfigID == "" {
- return fmt.Errorf("BotConfig.RobotConfigID 不能为空")
- }
- m.mu.Lock()
- defer m.mu.Unlock()
- if _, exists := m.robots[cfg.RobotConfigID]; exists {
- return fmt.Errorf("机器人[%s]已存在", cfg.RobotConfigID)
- }
- robot := NewRobot(cfg)
- if err := robot.Start(); err != nil {
- return err
- }
- m.robots[cfg.RobotConfigID] = robot
- return nil
- }
- // StopRobot 按 RobotConfigID 停止并移除机器人
- func (m *RobotManager) StopRobot(robotConfigID string) error {
- m.mu.Lock()
- defer m.mu.Unlock()
- robot, exists := m.robots[robotConfigID]
- if !exists {
- return nil // 不存在视为已停止,幂等处理
- }
- robot.Stop()
- delete(m.robots, robotConfigID)
- return nil
- }
- // RestartRobot 按 RobotConfigID 重启(更新配置场景)
- func (m *RobotManager) RestartRobot(cfg *BotConfig) error {
- if err := m.StopRobot(cfg.RobotConfigID); err != nil {
- return err
- }
- return m.StartRobot(cfg)
- }
- // GetRobot 获取指定机器人
- func (m *RobotManager) GetRobot(robotConfigID string) (*Robot, bool) {
- m.mu.RLock()
- defer m.mu.RUnlock()
- robot, exists := m.robots[robotConfigID]
- return robot, exists
- }
- // ListRobots 列出所有机器人
- func (m *RobotManager) ListRobots() []*BotConfig {
- m.mu.RLock()
- defer m.mu.RUnlock()
- configs := make([]*BotConfig, 0, len(m.robots))
- for _, robot := range m.robots {
- configs = append(configs, robot.config)
- }
- return configs
- }
- // onMessage 钉钉消息回调:钉钉要求回调在有限时间内(通常 5s)返回 ACK,
- // 因此此处只做最小校验 + 异步处理,立刻返回 ACK。
- func (r *Robot) onMessage(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) {
- if data == nil {
- return []byte(`{"msg":"empty"}`), nil
- }
- question := strings.TrimSpace(data.Text.Content)
- if question == "" {
- return []byte(`{"msg":"empty question"}`), nil
- }
- outTrackId := uuid.NewString()
- senderStaffId := data.SenderStaffId
- conversationId := data.ConversationId
- conversationType := data.ConversationType
- // 异步:创建并投放卡片 + RAGFlow 全量问答 + 一次更新卡片 + 落库,脱离回调 ACK 时限
- go func() {
- defer func() { recover() }()
- deliver := &DeliverParams{
- OutTrackId: outTrackId,
- CardTemplateId: r.config.CardTemplateId,
- RobotCode: r.config.RobotCode,
- UserID: senderStaffId,
- ConversationID: conversationId,
- ConversationType: conversationType,
- }
- if err := r.cardSvc.CreateAndDeliverCard(deliver); err != nil {
- return
- }
- r.handleRAGStream(outTrackId, question, senderStaffId)
- }()
- return []byte(`{"msg":true}`), nil
- }
- // onCardCallback 卡片回调占位:目前我们只做主动推送,不依赖卡片内按钮/表单事件,
- // 但仍需注册 handler,否则 SDK 会打印 "no handler for topic" 之类的噪音日志。
- // 后续若支持按钮动作(例如「重新生成」「停止」),可在此处扩展。
- func (r *Robot) onCardCallback(_ context.Context, req *dtcard.CardRequest) (*dtcard.CardResponse, error) {
- _ = req
- return &dtcard.CardResponse{}, nil
- }
- // handleRAGStream RAGFlow 非流式拉全量答复,再一次性 StreamingUpdate(finalize);成功后可选落库 chat_message。
- func (r *Robot) handleRAGStream(outTrackId, question, senderStaffId string) {
- defer func() {
- if rec := recover(); rec != nil {
- _ = r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, "**服务异常,请稍后重试**", true, true)
- }
- }()
- resp, err := ragflow.GetHttpClient().ChatCompletions(r.ctx, r.config.RagChatId, &ragflow.ChatCompletionReq{
- Question: question,
- Stream: false,
- SessionID: r.config.RagSessionId,
- })
- if err != nil {
- _ = r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, "**服务异常,请稍后重试**", true, true)
- return
- }
- if resp.Code != 0 {
- hint := "**服务异常,请稍后重试**"
- if resp.Message != "" {
- hint = fmt.Sprintf("**RAGFlow 错误(%d)**:%s", resp.Code, resp.Message)
- }
- _ = r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, hint, true, true)
- return
- }
- answer := strings.TrimSpace(resp.Data.Answer)
- if answer == "" {
- _ = r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, "**未获取到回复内容**", true, true)
- return
- }
- cleaned := referenceIDRegex.ReplaceAllString(answer, "")
- if err := r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, cleaned, true, false); err != nil {
- return
- }
- if r.config.OnDingtalkQA != nil {
- if err := r.config.OnDingtalkQA(r.ctx, DingtalkQAParams{
- Question: question,
- Answer: cleaned,
- SenderStaffId: senderStaffId,
- }); err != nil {
- log.Printf("写入 chat_message 失败: %v", err)
- }
- }
- }
- // StartRobotWithDelay 带延迟的机器人启动
- func StartRobotWithDelay(cfg *BotConfig, delay time.Duration) {
- time.Sleep(delay)
- _ = GetRobotManager().StartRobot(cfg)
- }
- // ============ 以下为 HTTP Webhook 文本回复模式(ReplyType=1)所用的结构体 ============
- // Message 钉钉 HTTP Webhook 消息结构体
- type Message struct {
- ConversationID string `json:"conversationId"`
- AtUsers []AtUser `json:"atUsers"`
- ChatbotCorpID string `json:"chatbotCorpId"`
- ChatbotUserID string `json:"chatbotUserId"`
- MsgID string `json:"msgId"`
- SenderNick string `json:"senderNick"`
- IsAdmin bool `json:"isAdmin"`
- SenderStaffID string `json:"senderStaffId"`
- SessionWebhookExpiredTime int64 `json:"sessionWebhookExpiredTime"`
- CreateAt int64 `json:"createAt"`
- SenderCorpID string `json:"senderCorpId"`
- ConversationType string `json:"conversationType"`
- SenderID string `json:"senderId"`
- ConversationTitle string `json:"conversationTitle"`
- IsInAtList bool `json:"isInAtList"`
- SessionWebhook string `json:"sessionWebhook"`
- Text Text `json:"text"`
- MsgType string `json:"msgtype"`
- }
- // AtUser @的用户信息
- type AtUser struct {
- DingtalkID string `json:"dingtalkId"`
- StaffID string `json:"staffId"`
- }
- // Text 文本消息结构
- type Text struct {
- Content string `json:"content"`
- }
- // Markdown markdown 消息结构
- type Markdown struct {
- Title string `json:"title"`
- Text string `json:"text"`
- }
- // DingReplyMsg 钉钉 webhook 回复消息体
- type DingReplyMsg struct {
- MsgType string `json:"msgtype"`
- Text Text `json:"text"`
- Markdown Markdown `json:"markdown"`
- }
- // SendReplyMsg 通过 sessionWebhook 发送回复(HTTP webhook 文本模式)
- func SendReplyMsg(webhookUrl string, reply DingReplyMsg) error {
- jsonData, err := json.Marshal(reply)
- if err != nil {
- return fmt.Errorf("序列化消息失败:%v", err)
- }
- resp, err := http.Post(webhookUrl, "application/json", bytes.NewBuffer(jsonData))
- if err != nil {
- return fmt.Errorf("发送请求失败:%v", err)
- }
- defer resp.Body.Close()
- respBody, _ := io.ReadAll(resp.Body)
- fmt.Printf("钉钉回复接口响应:%s\n", string(respBody))
- return nil
- }
|