package dingtalk import ( "bytes" "context" "encoding/json" "fmt" "io" "log" "net/http" "regexp" "strings" "sync" "time" "github.com/google/uuid" dtcard "github.com/open-dingtalk/dingtalk-stream-sdk-go/card" "github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot" "github.com/open-dingtalk/dingtalk-stream-sdk-go/client" "yx-dataset-server/library/ragflow" ) // Stream 连接异常后的外层重连间隔 const ( streamReconnectInitialDelay = 3 * time.Second streamReconnectMaxDelay = 60 * time.Second ) // referenceIDRegex 用于过滤 RAGFlow 答复中的 [ID:n] 引用标记 var referenceIDRegex = regexp.MustCompile(`\[ID:\d+\]`) // Robot 机器人实例(每个机器人独立 stream 长连接) type Robot struct { config *BotConfig dtClient *Client cardSvc *CardService streamCli *client.StreamClient ctx context.Context cancel context.CancelFunc mu sync.Mutex isRunning bool } // RobotManager 机器人管理器(全局单例) type RobotManager struct { robots map[string]*Robot // key: RobotConfigID mu sync.RWMutex } var ( manager *RobotManager managerOnce sync.Once ) // GetRobotManager 获取机器人管理器单例 func GetRobotManager() *RobotManager { managerOnce.Do(func() { manager = &RobotManager{ robots: make(map[string]*Robot), } }) return manager } // NewRobot 创建机器人实例 func NewRobot(cfg *BotConfig) *Robot { ctx, cancel := context.WithCancel(context.Background()) dtClient := NewDingTalkClient(cfg.ClientID, cfg.ClientSecret) // robotCode 缺省回退到 ClientID(企业内部应用机器人通常一致) if cfg.RobotCode == "" { cfg.RobotCode = cfg.ClientID } return &Robot{ config: cfg, dtClient: dtClient, cardSvc: NewCardService(dtClient), ctx: ctx, cancel: cancel, } } // Start 启动 Stream 监听 // // SDK 行为说明(v0.8.0): // - cli.Start(ctx) 建连成功后会立刻返回 nil(非阻塞),真正的读循环在 SDK 内部 goroutine (processLoop) 里跑。 // - SDK 默认 AutoReconnect=true,读循环挂掉后会自己起一个 reconnect goroutine 无限重连,外层不需要也不应该再做重连循环, // 否则会把 SDK 内部已建好的连接主动 Close 掉,形成"建连-关连接"的死循环。 // - 我们外层唯一要做的是"首次建连失败时退避重试"——因为 GetConnectionEndpoint/Dial 失败会直接返回 err。 func (r *Robot) Start() error { r.mu.Lock() if r.isRunning { r.mu.Unlock() return fmt.Errorf("机器人[%s]已在运行中", r.config.Name) } cli := client.NewStreamClient( client.WithAppCredential(client.NewAppCredentialConfig(r.config.ClientID, r.config.ClientSecret)), client.WithAutoReconnect(true), ) cli.RegisterChatBotCallbackRouter(r.onMessage) // 注册 card 回调(避免 DingTalk 向该 stream 推送卡片事件时被当作"未知 topic"丢弃并产生噪音日志) cli.RegisterCardCallbackRouter(r.onCardCallback) r.streamCli = cli r.isRunning = true r.mu.Unlock() go r.runInitialConnect(cli) return nil } // runInitialConnect 仅负责"首次建连"的重试;一旦 Start 返回 nil(连接建立成功), // 后续断线/重连完全交给 SDK 内部的 AutoReconnect 机制处理。 func (r *Robot) runInitialConnect(cli *client.StreamClient) { defer func() { if rec := recover(); rec != nil { log.Printf("机器人[%s] 建连循环 panic: %v", r.config.Name, rec) } }() delay := streamReconnectInitialDelay for { select { case <-r.ctx.Done(): return default: } err := cli.Start(r.ctx) if err == nil { return } select { case <-r.ctx.Done(): return case <-time.After(delay): } if delay < streamReconnectMaxDelay { delay *= 2 if delay > streamReconnectMaxDelay { delay = streamReconnectMaxDelay } } } } // Stop 停止机器人 // // 注意:SDK 默认 AutoReconnect=true,单纯 Close 会触发 SDK 内部的 reconnect goroutine 立刻重建连接。 // 因此这里必须先把 cli.AutoReconnect 置为 false,再 Close,才能真正停掉。 func (r *Robot) Stop() { r.mu.Lock() if !r.isRunning { r.mu.Unlock() return } r.isRunning = false cli := r.streamCli r.streamCli = nil r.mu.Unlock() r.cancel() if cli != nil { cli.AutoReconnect = false cli.Close() } } // IsRunning 是否运行中 func (r *Robot) IsRunning() bool { r.mu.Lock() defer r.mu.Unlock() return r.isRunning } // Config 当前配置 func (r *Robot) Config() *BotConfig { return r.config } // StartRobot 启动并注册到管理器(按 RobotConfigID 唯一) func (m *RobotManager) StartRobot(cfg *BotConfig) error { if cfg == nil { return fmt.Errorf("BotConfig 不能为空") } if cfg.RobotConfigID == "" { return fmt.Errorf("BotConfig.RobotConfigID 不能为空") } m.mu.Lock() defer m.mu.Unlock() if _, exists := m.robots[cfg.RobotConfigID]; exists { return fmt.Errorf("机器人[%s]已存在", cfg.RobotConfigID) } robot := NewRobot(cfg) if err := robot.Start(); err != nil { return err } m.robots[cfg.RobotConfigID] = robot return nil } // StopRobot 按 RobotConfigID 停止并移除机器人 func (m *RobotManager) StopRobot(robotConfigID string) error { m.mu.Lock() defer m.mu.Unlock() robot, exists := m.robots[robotConfigID] if !exists { return nil // 不存在视为已停止,幂等处理 } robot.Stop() delete(m.robots, robotConfigID) return nil } // RestartRobot 按 RobotConfigID 重启(更新配置场景) func (m *RobotManager) RestartRobot(cfg *BotConfig) error { if err := m.StopRobot(cfg.RobotConfigID); err != nil { return err } return m.StartRobot(cfg) } // GetRobot 获取指定机器人 func (m *RobotManager) GetRobot(robotConfigID string) (*Robot, bool) { m.mu.RLock() defer m.mu.RUnlock() robot, exists := m.robots[robotConfigID] return robot, exists } // ListRobots 列出所有机器人 func (m *RobotManager) ListRobots() []*BotConfig { m.mu.RLock() defer m.mu.RUnlock() configs := make([]*BotConfig, 0, len(m.robots)) for _, robot := range m.robots { configs = append(configs, robot.config) } return configs } // onMessage 钉钉消息回调:钉钉要求回调在有限时间内(通常 5s)返回 ACK, // 因此此处只做最小校验 + 异步处理,立刻返回 ACK。 func (r *Robot) onMessage(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) { if data == nil { return []byte(`{"msg":"empty"}`), nil } question := strings.TrimSpace(data.Text.Content) if question == "" { return []byte(`{"msg":"empty question"}`), nil } outTrackId := uuid.NewString() senderStaffId := data.SenderStaffId conversationId := data.ConversationId conversationType := data.ConversationType // 异步:创建并投放卡片 + RAGFlow 全量问答 + 一次更新卡片 + 落库,脱离回调 ACK 时限 go func() { defer func() { recover() }() deliver := &DeliverParams{ OutTrackId: outTrackId, CardTemplateId: r.config.CardTemplateId, RobotCode: r.config.RobotCode, UserID: senderStaffId, ConversationID: conversationId, ConversationType: conversationType, } if err := r.cardSvc.CreateAndDeliverCard(deliver); err != nil { return } r.handleRAGStream(outTrackId, question, senderStaffId) }() return []byte(`{"msg":true}`), nil } // onCardCallback 卡片回调占位:目前我们只做主动推送,不依赖卡片内按钮/表单事件, // 但仍需注册 handler,否则 SDK 会打印 "no handler for topic" 之类的噪音日志。 // 后续若支持按钮动作(例如「重新生成」「停止」),可在此处扩展。 func (r *Robot) onCardCallback(_ context.Context, req *dtcard.CardRequest) (*dtcard.CardResponse, error) { _ = req return &dtcard.CardResponse{}, nil } // handleRAGStream RAGFlow 非流式拉全量答复,再一次性 StreamingUpdate(finalize);成功后可选落库 chat_message。 func (r *Robot) handleRAGStream(outTrackId, question, senderStaffId string) { defer func() { if rec := recover(); rec != nil { _ = r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, "**服务异常,请稍后重试**", true, true) } }() resp, err := ragflow.GetHttpClient().ChatCompletions(r.ctx, r.config.RagChatId, &ragflow.ChatCompletionReq{ Question: question, Stream: false, SessionID: r.config.RagSessionId, }) if err != nil { _ = r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, "**服务异常,请稍后重试**", true, true) return } if resp.Code != 0 { hint := "**服务异常,请稍后重试**" if resp.Message != "" { hint = fmt.Sprintf("**RAGFlow 错误(%d)**:%s", resp.Code, resp.Message) } _ = r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, hint, true, true) return } answer := strings.TrimSpace(resp.Data.Answer) if answer == "" { _ = r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, "**未获取到回复内容**", true, true) return } cleaned := referenceIDRegex.ReplaceAllString(answer, "") if err := r.cardSvc.StreamingUpdate(outTrackId, CardContentKey, cleaned, true, false); err != nil { return } if r.config.OnDingtalkQA != nil { if err := r.config.OnDingtalkQA(r.ctx, DingtalkQAParams{ Question: question, Answer: cleaned, SenderStaffId: senderStaffId, }); err != nil { log.Printf("写入 chat_message 失败: %v", err) } } } // StartRobotWithDelay 带延迟的机器人启动 func StartRobotWithDelay(cfg *BotConfig, delay time.Duration) { time.Sleep(delay) _ = GetRobotManager().StartRobot(cfg) } // ============ 以下为 HTTP Webhook 文本回复模式(ReplyType=1)所用的结构体 ============ // Message 钉钉 HTTP Webhook 消息结构体 type Message struct { ConversationID string `json:"conversationId"` AtUsers []AtUser `json:"atUsers"` ChatbotCorpID string `json:"chatbotCorpId"` ChatbotUserID string `json:"chatbotUserId"` MsgID string `json:"msgId"` SenderNick string `json:"senderNick"` IsAdmin bool `json:"isAdmin"` SenderStaffID string `json:"senderStaffId"` SessionWebhookExpiredTime int64 `json:"sessionWebhookExpiredTime"` CreateAt int64 `json:"createAt"` SenderCorpID string `json:"senderCorpId"` ConversationType string `json:"conversationType"` SenderID string `json:"senderId"` ConversationTitle string `json:"conversationTitle"` IsInAtList bool `json:"isInAtList"` SessionWebhook string `json:"sessionWebhook"` Text Text `json:"text"` MsgType string `json:"msgtype"` } // AtUser @的用户信息 type AtUser struct { DingtalkID string `json:"dingtalkId"` StaffID string `json:"staffId"` } // Text 文本消息结构 type Text struct { Content string `json:"content"` } // Markdown markdown 消息结构 type Markdown struct { Title string `json:"title"` Text string `json:"text"` } // DingReplyMsg 钉钉 webhook 回复消息体 type DingReplyMsg struct { MsgType string `json:"msgtype"` Text Text `json:"text"` Markdown Markdown `json:"markdown"` } // SendReplyMsg 通过 sessionWebhook 发送回复(HTTP webhook 文本模式) func SendReplyMsg(webhookUrl string, reply DingReplyMsg) error { jsonData, err := json.Marshal(reply) if err != nil { return fmt.Errorf("序列化消息失败:%v", err) } resp, err := http.Post(webhookUrl, "application/json", bytes.NewBuffer(jsonData)) if err != nil { return fmt.Errorf("发送请求失败:%v", err) } defer resp.Body.Close() respBody, _ := io.ReadAll(resp.Body) fmt.Printf("钉钉回复接口响应:%s\n", string(respBody)) return nil }