package dingtalk import ( "context" "fmt" "strings" "time" "unicode/utf8" dingtalkcard_1_0 "github.com/alibabacloud-go/dingtalk/card_1_0" util "github.com/alibabacloud-go/tea-utils/v2/service" "github.com/alibabacloud-go/tea/tea" "github.com/google/uuid" ) // 会话类型常量(来自钉钉 stream sdk 回调中的 conversationType) const ( ConvTypeSingle = "1" // 单聊 ConvTypeGroup = "2" // 群聊 ) // CardContentKey 模板里绑定的流式 markdown 变量名,需与卡片模板保持一致 const CardContentKey = "content" // streamingMarkdownMaxContentBytes 钉钉 StreamingUpdate 单次 content 建议不超过约 1K(见官方文档) const streamingMarkdownMaxContentBytes = 1000 // CardService 卡片服务(基于钉钉官方 card_1.0 SDK) type CardService struct { dtClient *Client } func NewCardService(dtClient *Client) *CardService { return &CardService{dtClient: dtClient} } // DeliverParams 投放卡片所需参数(不同会话类型走不同分支) type DeliverParams struct { OutTrackId string // 外部卡片实例ID(业务侧每条消息生成一个新的) CardTemplateId string // 模板ID RobotCode string // 机器人 robotCode(一般等同于 ClientID/AppKey) UserID string // 接收人 staffId(单聊必填) ConversationID string // 钉钉会话ID(群聊必填) ConversationType string // "1"=单聊 "2"=群聊 InitTitle string // 卡片初始标题,可选 InitContent string // 卡片初始内容,可选 } // CreateAndDeliverCard 创建并投放 AI 卡片 func (c *CardService) CreateAndDeliverCard(p *DeliverParams) error { if p == nil { return fmt.Errorf("DeliverParams 不能为空") } if p.OutTrackId == "" || p.CardTemplateId == "" { return fmt.Errorf("OutTrackId/CardTemplateId 不能为空") } token, err := c.dtClient.GetAccessToken() if err != nil { return fmt.Errorf("获取钉钉 access_token 失败: %w", err) } headers := &dingtalkcard_1_0.CreateAndDeliverHeaders{ XAcsDingtalkAccessToken: tea.String(token), } cardParamMap := map[string]*string{ "title": tea.String(orDefault(p.InitTitle, "AI助理回复中")), CardContentKey: tea.String(orDefault(p.InitContent, "正在思考中...")), } cardData := &dingtalkcard_1_0.CreateAndDeliverRequestCardData{ CardParamMap: cardParamMap, } req := &dingtalkcard_1_0.CreateAndDeliverRequest{ OutTrackId: tea.String(p.OutTrackId), CardTemplateId: tea.String(p.CardTemplateId), CardData: cardData, CallbackType: tea.String("STREAM"), UserIdType: tea.Int32(1), // 1: staffId } switch p.ConversationType { case ConvTypeGroup: if p.ConversationID == "" { return fmt.Errorf("群聊投放缺少 ConversationID") } req.SetOpenSpaceId(fmt.Sprintf("dtv1.card//IM_GROUP.%s", p.ConversationID)) req.SetImGroupOpenDeliverModel( (&dingtalkcard_1_0.CreateAndDeliverRequestImGroupOpenDeliverModel{}). SetRobotCode(p.RobotCode), ) req.SetImGroupOpenSpaceModel( (&dingtalkcard_1_0.CreateAndDeliverRequestImGroupOpenSpaceModel{}). SetSupportForward(tea.BoolValue(tea.Bool(true))), ) default: if p.UserID == "" { return fmt.Errorf("单聊投放缺少 UserID") } req.SetUserId(p.UserID) req.SetOpenSpaceId(fmt.Sprintf("dtv1.card//IM_ROBOT.%s", p.UserID)) req.SetImRobotOpenDeliverModel( (&dingtalkcard_1_0.CreateAndDeliverRequestImRobotOpenDeliverModel{}). SetRobotCode(p.RobotCode). SetSpaceType("IM_ROBOT"), ) req.SetImRobotOpenSpaceModel( (&dingtalkcard_1_0.CreateAndDeliverRequestImRobotOpenSpaceModel{}). SetSupportForward(tea.BoolValue(tea.Bool(true))), ) } resp, err := c.dtClient.cardClient.CreateAndDeliverWithOptions(req, headers, &util.RuntimeOptions{}) if err != nil { if isAuthError(err) { c.dtClient.InvalidateAccessToken(context.Background()) token2, err2 := c.dtClient.GetAccessToken() if err2 != nil { return fmt.Errorf("刷新 access_token 失败: %w", err2) } headers.XAcsDingtalkAccessToken = tea.String(token2) resp, err = c.dtClient.cardClient.CreateAndDeliverWithOptions(req, headers, &util.RuntimeOptions{}) if err != nil { return fmt.Errorf("CreateAndDeliver 失败(重试): %w", err) } } else { return fmt.Errorf("CreateAndDeliver 失败: %w", err) } } if resp == nil || resp.Body == nil || !tea.BoolValue(resp.Body.Success) { return fmt.Errorf("CreateAndDeliver 返回失败: %+v", resp) } return nil } // StreamingUpdate 流式更新卡片内容 func (c *CardService) StreamingUpdate(outTrackId, key, content string, isFinalize, isError bool) error { if outTrackId == "" || key == "" { return fmt.Errorf("outTrackId/key 不能为空") } contentToSend := truncateUTF8ToMaxBytes(content, streamingMarkdownMaxContentBytes) if isFinalize && !isError && len(contentToSend) != len(content) { suffix := "\n\n> *(钉钉单次展示约 1KB 限制,此处为截断;完整内容见 RAGFlow 会话)*" contentToSend = truncateUTF8ToMaxBytes(contentToSend+suffix, streamingMarkdownMaxContentBytes) } var lastErr error for attempt := 0; attempt < 2; attempt++ { token, err := c.dtClient.GetAccessToken() if err != nil { lastErr = fmt.Errorf("获取钉钉 access_token 失败: %w", err) time.Sleep(200 * time.Millisecond) continue } headers := &dingtalkcard_1_0.StreamingUpdateHeaders{ XAcsDingtalkAccessToken: tea.String(token), } req := &dingtalkcard_1_0.StreamingUpdateRequest{ OutTrackId: tea.String(outTrackId), Guid: tea.String(uuid.NewString()), Key: tea.String(key), Content: tea.String(contentToSend), IsFull: tea.Bool(true), IsFinalize: tea.Bool(isFinalize), IsError: tea.Bool(isError), } if _, err := c.dtClient.cardClient.StreamingUpdateWithOptions(req, headers, &util.RuntimeOptions{}); err != nil { lastErr = err if isAuthError(err) { c.dtClient.InvalidateAccessToken(context.Background()) continue } time.Sleep(300 * time.Millisecond) continue } return nil } return fmt.Errorf("StreamingUpdate 失败: %w", lastErr) } func isAuthError(err error) bool { if err == nil { return false } msg := err.Error() return strings.Contains(msg, "40014") || strings.Contains(msg, "42001") || strings.Contains(msg, "InvalidAuthentication") || strings.Contains(msg, "AccessTokenExpired") || strings.Contains(msg, "invalid access_token") } func orDefault(s, def string) string { if s == "" { return def } return s } func truncateUTF8ToMaxBytes(s string, maxBytes int) string { if maxBytes <= 0 { return "" } b := []byte(s) if len(b) <= maxBytes { return s } b = b[:maxBytes] for len(b) > 0 && !utf8.RuneStart(b[len(b)-1]) { b = b[:len(b)-1] } return string(b) }