package ragflow import ( "bytes" "context" "encoding/json" "fmt" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/gclient" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/util/gconv" "io" "mime/multipart" "net/http" "regexp" "strings" "sync" ) var ( once sync.Once internalClient *Client // 内部私有客户端 ) // 32位小写十六进制正则校验 var pipelineIDRegex = regexp.MustCompile(`^[0-9a-f]{32}$`) // Client 包装 gclient.Client type Client struct { cli *gclient.Client // 原生GoFrame HTTP客户端 address string // RAGFlow服务地址 apiKey string // 认证密钥 } // Init 初始化客户端 func Init(ctx context.Context, addr, apiKey string) *Client { once.Do(func() { internalClient = newClient(ctx, addr, apiKey) }) return internalClient } // GetHttpClient 获取原生 gclient.Client func GetHttpClient() *Client { return internalClient } // newClient 创建客户端实例 func newClient(ctx context.Context, addr, apiKey string) *Client { // 初始化GoFrame HTTP客户端 cli := g.Client() // 设置全局默认请求头 //cli.SetHeader("Content-Permission", "application/json") cli.SetHeader("Content-Type", "application/json") cli.SetHeader("Authorization", "Bearer "+apiKey) // 初始化校验 if addr == "" || apiKey == "" { panic("ragflow client init failed: address or apiKey is empty") } return &Client{ cli: cli, address: addr, apiKey: apiKey, } } // -------------------------- 工具函数 -------------------------- func Log(ctx context.Context, v ...interface{}) { glog.Info(ctx, v...) } func NewCtx() context.Context { return gctx.New() } // -------------------------- 公共结构体 -------------------------- type CommonResp struct { Code int `json:"code"` Msg string `json:"message,omitempty"` } // -------------------------- 1. 数据集接口(原有) -------------------------- type CreateDatasetReq struct { Name string `json:"name"` ChunkMethod string `json:"chunk_method,omitempty"` Description string `json:"description,omitempty"` EmbeddingModel string `json:"embedding_model,omitempty"` //Avatar string `json:"avatar,omitempty"` //Permission string `json:"permission,omitempty"` //ParserConfig interface{} `json:"parser_config,omitempty"` //ParseType int `json:"parse_type,omitempty"` //PipelineID string `json:"pipeline_id,omitempty"` } type CreateDatasetResp struct { Code int `json:"code"` Data DatasetData `json:"data"` } type DatasetData struct { ID string `json:"id"` Name string `json:"name"` ChunkMethod string `json:"chunk_method"` EmbeddingModel string `json:"embedding_model"` Permission string `json:"permission"` ParserConfig interface{} `json:"parser_config"` CreateTime int64 `json:"create_time"` DocumentCount int `json:"document_count"` ChunkCount int `json:"chunk_count"` } type DeleteDatasetReq struct { IDs []string `json:"ids"` } type UpdateDatasetReq struct { Name string `json:"name,omitempty"` Language string `json:"language"` Avatar string `json:"avatar,omitempty"` Description string `json:"description,omitempty"` EmbeddingModel string `json:"embedding_model,omitempty"` Permission string `json:"permission,omitempty"` ChunkMethod string `json:"chunk_method,omitempty"` ParserConfig interface{} `json:"parser_config,omitempty"` Pagerank int `json:"pagerank,omitempty"` } // 创建知识库 func (r *Client) CreateDataset(ctx context.Context, req *CreateDatasetReq) (*CreateDatasetResp, error) { if req.Name == "" { return nil, fmt.Errorf("name 是必填参数") } url := fmt.Sprintf("%s/api/v1/datasets", r.address) var resp CreateDatasetResp err := r.cli.PostVar(ctx, url, req).Scan(&resp) if err != nil { return nil, fmt.Errorf("请求失败: %v", err) } if resp.Code != 0 { return nil, fmt.Errorf("接口错误 code=%d", resp.Code) } return &resp, nil } // DeleteDataset 删除知识库 func (r *Client) DeleteDataset(ctx context.Context, ids []string) (*CommonResp, error) { url := fmt.Sprintf("%s/api/v1/datasets", r.address) req := DeleteDatasetReq{IDs: ids} var resp CommonResp err := r.cli.DeleteVar(ctx, url, req).Scan(&resp) if err != nil { return nil, fmt.Errorf("请求失败: %v", err) } return &resp, nil } // UpdateDataset 更新知识库 func (r *Client) UpdateDataset(ctx context.Context, datasetID string, req *UpdateDatasetReq) (*CommonResp, error) { if datasetID == "" { return nil, fmt.Errorf("datasetID 不能为空") } url := fmt.Sprintf("%s/api/v1/datasets/%s", r.address, datasetID) var resp CommonResp err := r.cli.PutVar(ctx, url, req).Scan(&resp) if err != nil { return nil, fmt.Errorf("请求失败: %v", err) } return &resp, nil } // -------------------------- 2. 文档接口(原有) -------------------------- type UploadFileReq struct { File []*FileInfo `json:"file"` } type FileInfo struct { FileName string `json:"file_name"` // 文件名称 Url string `json:"url"` // 文件地址 } type UploadDocumentResp struct { Code int `json:"code"` Data []DocumentDetail `json:"data"` Message string `json:"message"` // 失败时返回 } // UploadDocumentV2Req V2版本上传文档请求(直接接收文件数据) type UploadDocumentV2Req struct { FileName string // 文件名 FileData []byte // 文件数据 ChunkMethod string `json:"chunk_method,omitempty"` // 分块方式 } // UploadDocumentV2Resp V2版本上传文档响应 type UploadDocumentV2Resp struct { Code int `json:"code"` Data []DocumentDetail `json:"data"` Message string `json:"message"` } type DocumentDetail struct { ID string `json:"id"` Name string `json:"name"` DatasetID string `json:"dataset_id"` Size int64 `json:"size"` Run string `json:"run"` ChunkMethod string `json:"chunk_method"` ParserConfig interface{} `json:"parser_config"` Status string `json:"status"` } type UpdateDocumentReq struct { Name string `json:"name,omitempty"` MetaFields interface{} `json:"meta_fields,omitempty"` ChunkMethod string `json:"chunk_method,omitempty"` ParserConfig interface{} `json:"parser_config,omitempty"` Enabled int `json:"enabled,omitempty"` } type UpdateDocumentResp struct { Code int `json:"code"` Message string `json:"message"` // 失败时返回 Data DocumentDetail `json:"data"` } type ListDocumentReq struct { Page int `json:"page,omitempty"` PageSize int `json:"page_size,omitempty"` OrderBy string `json:"orderby,omitempty"` Desc bool `json:"desc,omitempty"` Keywords string `json:"keywords,omitempty"` DocumentID string `json:"id,omitempty"` DocumentName string `json:"name,omitempty"` CreateTimeFrom int64 `json:"create_time_from,omitempty"` CreateTimeTo int64 `json:"create_time_to,omitempty"` Suffix string `json:"suffix,omitempty"` Run string `json:"run,omitempty"` MetadataCondition string `json:"metadata_condition,omitempty"` } type ListDocumentResp struct { Code int `json:"code"` Data struct { Docs []DocumentDetail `json:"docs"` Total int `json:"total_datasets"` } `json:"data"` } type DeleteDocumentsReq struct { IDs []string `json:"ids"` } type ParseDocumentReq struct { DocumentIDs []string `json:"document_ids"` } //func (r *Client) UploadDocument(ctx context.Context, datasetID, fileName string, url string) error { // // 参数校验 // remoteFileUrl := url // apiUrl := fmt.Sprintf("%s/api/v1/datasets/%s/documents", r.address, datasetID) // fieldName := "file" // // resp, err := http.Get(remoteFileUrl) // if err != nil { // panic("下载远程文件失败:" + err.Error()) // } // defer resp.Body.Close() // // body := &bytes.Buffer{} // writer := multipart.NewWriter(body) // // formFile, _ := writer.CreateFormFile(fieldName, "xinfengjinghua.pdf") // io.Copy(formFile, resp.Body) // _ = writer.Close() // // req, _ := http.NewRequest("POST", apiUrl, body) // req.Header.Set("Content-Permission", writer.FormDataContentType()) // req.Header.Set("Authorization", "Bearer "+"ragflow-nkGJCVuFSxgt7X6AkzuOXjx_9Q3hN3zAUP9LNpmJOFk") // // client := &http.Client{} // uploadResp, err := client.Do(req) // if err != nil { // panic("上传失败:" + err.Error()) // } // defer uploadResp.Body.Close() // // result, _ := io.ReadAll(uploadResp.Body) // println("上传状态:", uploadResp.Status) // println("接口返回:", string(result)) // // return nil //} func (r *Client) UploadDocument(ctx context.Context, datasetID string, req UploadFileReq) (*UploadDocumentResp, error) { // 参数校验 if datasetID == "" { return nil, fmt.Errorf("datasetID 不能为空") } body := &bytes.Buffer{} writer := multipart.NewWriter(body) for _, v := range req.File { //下载远程文件 f, err := r.cli.Get(ctx, v.Url) if err != nil { return nil, fmt.Errorf("下载文件失败: %v", err) } part, err := writer.CreateFormFile("file", v.FileName) if err != nil { return nil, fmt.Errorf("创建表单失败: %v", err) } _, err = io.Copy(part, f.Response.Body) if err != nil { return nil, fmt.Errorf("写入文件流失败: %v", err) } _ = f.Response.Body.Close() } err := writer.Close() if err != nil { return nil, fmt.Errorf("关闭表单失败: %v", err) } r.cli.SetHeader("Content-Type", writer.FormDataContentType()) // 拼接上传接口地址 apiUrl := fmt.Sprintf("%s/api/v1/datasets/%s/documents", r.address, datasetID) var resp UploadDocumentResp if err = r.cli.PostVar(ctx, apiUrl, body).Scan(&resp); err != nil { return nil, fmt.Errorf("解析响应失败: %v", err) } if resp.Code != 0 { return nil, fmt.Errorf("上传失败[%d]: %s", resp.Code, resp.Message) } r.cli.SetHeader("Content-Type", "application/json") return &resp, nil } // UploadDocumentV2 直接接收文件数据上传到RagFlow // 适用于本地文件直接上传场景,无需先上传到Minio func (r *Client) UploadDocumentV2(ctx context.Context, datasetID string, req *UploadDocumentV2Req) (*UploadDocumentV2Resp, error) { // 参数校验 if datasetID == "" { return nil, fmt.Errorf("datasetID 不能为空") } if req.FileName == "" { return nil, fmt.Errorf("fileName 不能为空") } if len(req.FileData) == 0 { return nil, fmt.Errorf("fileData 不能为空") } Log(ctx, fmt.Sprintf("[RagFlow V2] 开始上传文件: %s, 大小: %d bytes", req.FileName, len(req.FileData))) body := &bytes.Buffer{} writer := multipart.NewWriter(body) // 直接使用传入的文件数据创建表单文件 part, err := writer.CreateFormFile("file", req.FileName) if err != nil { return nil, fmt.Errorf("创建表单文件失败: %v", err) } _, err = part.Write(req.FileData) if err != nil { return nil, fmt.Errorf("写入文件数据失败: %v", err) } err = writer.Close() if err != nil { return nil, fmt.Errorf("关闭表单失败: %v", err) } r.cli.SetHeader("Content-Type", writer.FormDataContentType()) // 拼接上传接口地址 apiUrl := fmt.Sprintf("%s/api/v1/datasets/%s/documents", r.address, datasetID) Log(ctx, fmt.Sprintf("[RagFlow V2] 请求地址: %s", apiUrl)) var resp UploadDocumentV2Resp if err = r.cli.PostVar(ctx, apiUrl, body).Scan(&resp); err != nil { return nil, fmt.Errorf("上传请求失败: %v", err) } if resp.Code != 0 { Log(ctx, fmt.Sprintf("[RagFlow V2] 上传失败[%d]: %s", resp.Code, resp.Message)) return nil, fmt.Errorf("上传失败[%d]: %s", resp.Code, resp.Message) } Log(ctx, fmt.Sprintf("[RagFlow V2] 上传成功,文档ID: %s", resp.Data[0].ID)) r.cli.SetHeader("Content-Type", "application/json") return &resp, nil } // UpdateDocument 更新文档 func (r *Client) UpdateDocument(ctx context.Context, datasetID, documentID string, req *UpdateDocumentReq) (*UpdateDocumentResp, error) { url := fmt.Sprintf("%s/api/v1/datasets/%s/documents/%s", r.address, datasetID, documentID) var resp UpdateDocumentResp err := r.cli.PutVar(ctx, url, req).Scan(&resp) if err != nil { return nil, fmt.Errorf("更新失败: %v", err) } return &resp, nil } // ListDocuments 列出文档 func (r *Client) ListDocuments(ctx context.Context, datasetID string, req *ListDocumentReq) (*ListDocumentResp, error) { url := fmt.Sprintf("%s/api/v1/datasets/%s/documents", r.address, datasetID) params := gconv.Map(req) var resp ListDocumentResp err := r.cli.GetVar(ctx, url, params).Scan(&resp) if err != nil { return nil, err } return &resp, nil } // DeleteDocuments 删除文档 func (r *Client) DeleteDocuments(ctx context.Context, datasetID string, ids []string) (*CommonResp, error) { url := fmt.Sprintf("%s/api/v1/datasets/%s/documents", r.address, datasetID) req := DeleteDocumentsReq{IDs: ids} var resp CommonResp err := r.cli.DeleteVar(ctx, url, req).Scan(&resp) if err != nil { return nil, err } return &resp, nil } // ParseDocuments 解析文档 func (r *Client) ParseDocuments(ctx context.Context, datasetID string, ids []string) (*CommonResp, error) { if len(ids) == 0 { return nil, fmt.Errorf("document_ids 不能为空") } url := fmt.Sprintf("%s/api/v1/datasets/%s/chunks", r.address, datasetID) req := ParseDocumentReq{DocumentIDs: ids} var resp CommonResp err := r.cli.PostVar(ctx, url, req).Scan(&resp) if err != nil { return nil, err } return &resp, nil } // -------------------------- 3. 对话助手接口(新增) -------------------------- // LLM配置 type ChatLLM struct { ModelName string `json:"model_name,omitempty"` ModelType string `json:"model_type,omitempty"` // chat/image2text Temperature float64 `json:"temperature,omitempty"` TopP float64 `json:"top_p,omitempty"` PresencePenalty float64 `json:"presence_penalty,omitempty"` FrequencyPenalty float64 `json:"frequency_penalty,omitempty"` } // Prompt配置 type ChatPrompt struct { SimilarityThreshold float64 `json:"similarity_threshold,omitempty"` KeywordsSimilarityWeight float64 `json:"keywords_similarity_weight,omitempty"` TopN int `json:"top_n,omitempty"` Variables []interface{} `json:"variables,omitempty"` RerankModel string `json:"rerank_model,omitempty"` EmptyResponse string `json:"empty_response,omitempty"` Opener string `json:"opener,omitempty"` ShowQuote bool `json:"show_quote,omitempty"` Prompt string `json:"prompt,omitempty"` } // 创建对话助手 type CreateChatReq struct { Name string `json:"name"` // 必填 Avatar string `json:"avatar,omitempty"` DatasetIDs []string `json:"dataset_ids,omitempty"` LLM *ChatLLM `json:"llm,omitempty"` Prompt *ChatPrompt `json:"prompt,omitempty"` } type CreateChatResp struct { Code int `json:"code"` Data ChatInfo `json:"data"` } type ChatInfo struct { ID string `json:"id"` Name string `json:"name"` } // 更新对话助手 type UpdateChatReq struct { Name string `json:"name,omitempty"` Avatar string `json:"avatar,omitempty"` DatasetIDs []string `json:"dataset_ids,omitempty"` LLM *ChatLLM `json:"llm,omitempty"` Prompt *ChatPrompt `json:"prompt,omitempty"` } // 列表助手 type ListChatReq struct { Page int `json:"page,omitempty"` PageSize int `json:"page_size,omitempty"` OrderBy string `json:"orderby,omitempty"` Desc bool `json:"desc,omitempty"` ID string `json:"id,omitempty"` Name string `json:"name,omitempty"` } type ListChatResp struct { Code int `json:"code"` Data []interface{} `json:"data"` } // 删除助手 type DeleteChatReq struct { IDs []string `json:"ids"` } // CreateChat 创建对话助手 func (r *Client) CreateChat(ctx context.Context, req *CreateChatReq) (*CreateChatResp, error) { if req.Name == "" { return nil, fmt.Errorf("name 是必填参数") } url := fmt.Sprintf("%s/api/v1/chats", r.address) r.cli.SetHeader("Content-Type", "application/json") var resp CreateChatResp err := r.cli.PostVar(ctx, url, req).Scan(&resp) if err != nil { return nil, err } return &resp, nil } // UpdateChat 更新对话助手 func (r *Client) UpdateChat(ctx context.Context, chatID string, req *UpdateChatReq) (*CommonResp, error) { if chatID == "" { return nil, fmt.Errorf("chatID 不能为空") } url := fmt.Sprintf("%s/api/v1/chats/%s", r.address, chatID) r.cli.SetHeader("Content-Type", "application/json") var resp CommonResp err := r.cli.PutVar(ctx, url, req).Scan(&resp) if err != nil { return nil, err } return &resp, nil } // ListChats 列出助手 func (r *Client) ListChats(ctx context.Context, req *ListChatReq) (*ListChatResp, error) { url := fmt.Sprintf("%s/api/v1/chats", r.address) params := gconv.Map(req) var resp ListChatResp err := r.cli.GetVar(ctx, url, params).Scan(&resp) if err != nil { return nil, err } return &resp, nil } // DeleteChats 删除助手 func (r *Client) DeleteChats(ctx context.Context, ids []string) (*CommonResp, error) { url := fmt.Sprintf("%s/api/v1/chats", r.address) r.cli.SetHeader("Content-Type", "application/json") req := DeleteChatReq{IDs: ids} var resp CommonResp err := r.cli.DeleteVar(ctx, url, req).Scan(&resp) if err != nil { return nil, err } return &resp, nil } // -------------------------- 4. 会话管理接口(新增) -------------------------- type CreateSessionReq struct { Name string `json:"name"` // 必填 UserID string `json:"user_id,omitempty"` } type CreateSessionResp struct { Code int `json:"code"` Data SessionInfo `json:"data"` } type SessionInfo struct { Id string `json:"id"` Message struct { Content string `json:"content"` Role string `json:"role"` } `json:"message"` Name string `json:"name"` } type UpdateSessionReq struct { Name string `json:"name,omitempty"` UserID string `json:"user_id,omitempty"` } type ListSessionReq struct { Page int `json:"page,omitempty"` PageSize int `json:"page_size,omitempty"` OrderBy string `json:"orderby,omitempty"` Desc bool `json:"desc,omitempty"` ID string `json:"id,omitempty"` Name string `json:"name,omitempty"` UserID string `json:"user_id,omitempty"` } type DeleteSessionReq struct { IDs []string `json:"ids"` } // CreateSession 创建会话 func (r *Client) CreateSession(ctx context.Context, chatID string, req *CreateSessionReq) (*CreateSessionResp, error) { if chatID == "" || req.Name == "" { return nil, fmt.Errorf("参数不能为空") } url := fmt.Sprintf("%s/api/v1/chats/%s/sessions", r.address, chatID) var resp CreateSessionResp err := r.cli.PostVar(ctx, url, req).Scan(&resp) if err != nil { return nil, err } return &resp, nil } // UpdateSession 更新会话 func (r *Client) UpdateSession(ctx context.Context, chatID, sessionID string, req *UpdateSessionReq) (*CommonResp, error) { url := fmt.Sprintf("%s/api/v1/chats/%s/sessions/%s", r.address, chatID, sessionID) var resp CommonResp err := r.cli.PutVar(ctx, url, req).Scan(&resp) if err != nil { return nil, err } return &resp, nil } // ListSessions 列出会话 func (r *Client) ListSessions(ctx context.Context, chatID string, req *ListSessionReq) (*CommonResp, error) { url := fmt.Sprintf("%s/api/v1/chats/%s/sessions", r.address, chatID) params := gconv.Map(req) var resp CommonResp err := r.cli.GetVar(ctx, url, params).Scan(&resp) if err != nil { return nil, err } return &resp, nil } // DeleteSessions 删除会话 func (r *Client) DeleteSessions(ctx context.Context, chatID string, ids []string) (*CommonResp, error) { url := fmt.Sprintf("%s/api/v1/chats/%s/sessions", r.address, chatID) req := DeleteSessionReq{IDs: ids} var resp CommonResp err := r.cli.DeleteVar(ctx, url, req).Scan(&resp) if err != nil { return nil, err } return &resp, nil } // -------------------------- 5. AI对话接口(流式/非流式 新增) -------------------------- // 元数据过滤条件 type MetaCondition struct { Logic string `json:"logic,omitempty"` // and/or Conditions []Condition `json:"conditions,omitempty"` } type Condition struct { Name string `json:"name"` ComparisonOperator string `json:"comparison_operator"` Value interface{} `json:"value,omitempty"` } // 对话请求 type ChatCompletionReq struct { Question string `json:"question"` // 必填 Stream bool `json:"stream"` SessionID string `json:"session_id,omitempty"` UserID string `json:"user_id,omitempty"` MetadataCondition *MetaCondition `json:"metadata_condition,omitempty"` } type Completion struct { Answer string `json:"answer"` Reference interface{} `json:"reference"` AudioBinary interface{} `json:"audio_binary"` Id string `json:"id"` SessionId string `json:"session_id"` } // 对话请求 type ChatCompletionResp struct { Code int `json:"code"` Message string `json:"message"` Data Completion `json:"data"` } // ChatCompletions 对话 func (r *Client) ChatCompletions(ctx context.Context, chatID string, req *ChatCompletionReq) (*ChatCompletionResp, error) { if chatID == "" || req.Question == "" { return nil, fmt.Errorf("chatID 和 question 不能为空") } url := fmt.Sprintf("%s/api/v1/chats/%s/completions", r.address, chatID) req.Stream = false var resp ChatCompletionResp d, err := r.cli.Post(ctx, url, req) if err != nil { return nil, err } raw := d.ReadAll() if err = json.Unmarshal(raw, &resp); err != nil { return nil, fmt.Errorf("解析 RAGFlow completions 响应失败: %w body=%.500s", err, string(raw)) } return &resp, nil } type StreamData struct { Answer string `json:"answer"` // 流式文本片段 Final bool `json:"final"` // 是否为最后一条 StartToThink bool `json:"start_to_think"` // 思考状态(可忽略) ID string `json:"id"` SessionID string `json:"session_id"` } // StreamResponse 对应完整的流式响应结构 type StreamResponse struct { Code int `json:"code"` // 状态码,0为正常 Data StreamData `json:"data"` // 业务数据 } // ParseChatStreamEnvelope 解析 RAGFlow POST /api/v1/chats/{chat_id}/completions 流式中的单条 JSON。 // // 官方文档约定:流式最后一帧为 {"code":0,"data":true}(data 为布尔 true),表示流结束; // 若仍用 Data struct 反序列化,json.Unmarshal 会失败,客户端若忽略该帧且连接长期不关闭,会一直卡在「处理中」。 func ParseChatStreamEnvelope(raw []byte) (answer string, streamDone bool, code int, msg string, err error) { var envelope struct { Code int `json:"code"` Message string `json:"message"` Data json.RawMessage `json:"data"` } if err = json.Unmarshal(raw, &envelope); err != nil { return "", false, 0, "", err } code, msg = envelope.Code, envelope.Message if len(envelope.Data) == 0 { return "", false, code, msg, nil } var asBool bool if json.Unmarshal(envelope.Data, &asBool) == nil { if asBool { return "", true, code, msg, nil } return "", false, code, msg, nil } var inner struct { Answer string `json:"answer"` Final bool `json:"final"` } if err = json.Unmarshal(envelope.Data, &inner); err != nil { return "", false, code, msg, fmt.Errorf("解析 data 对象: %w", err) } if inner.Final { return inner.Answer, true, code, msg, nil } return inner.Answer, false, code, msg, nil } // SplitRAGFlowSSEDataPayloads 从一行 SSE 文本中拆出 0..n 条 JSON。 // 支持:标准前缀 "data:{...}"、同一行多个 "data:{...}data:{...}"、以及无前缀的裸 JSON(部分代理场景)。 func SplitRAGFlowSSEDataPayloads(line string) [][]byte { line = strings.TrimSpace(line) if line == "" || strings.HasPrefix(line, ":") { return nil } var blobs [][]byte parts := strings.Split(line, "data:") for _, part := range parts { part = strings.TrimSpace(part) if part == "" { continue } dec := json.NewDecoder(strings.NewReader(part)) for { var raw json.RawMessage if err := dec.Decode(&raw); err != nil { break } if len(raw) > 0 { blobs = append(blobs, append([]byte(nil), raw...)) } } } return blobs } // ChatCompletionsStream 获取流式消息 func (r *Client) ChatCompletionsStream(ctx context.Context, chatID string, req *ChatCompletionReq) (io.ReadCloser, error) { if chatID == "" || req.Question == "" { return nil, fmt.Errorf("chatID 和 question 不能为空") } // 强制开启流式 req.Stream = true url := fmt.Sprintf("%s/api/v1/chats/%s/completions", r.address, chatID) bodyBytes, err := json.Marshal(req) if err != nil { return nil, err } httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(bodyBytes)) if err != nil { return nil, err } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Authorization", "Bearer "+r.apiKey) // 独立 http.Client,避免与全局 gclient 并发写 Timeout/Transport 导致互相干扰 httpClient := &http.Client{ Timeout: 0, Transport: &http.Transport{ DisableCompression: true, }, } resp, err := httpClient.Do(httpReq) if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { b, _ := io.ReadAll(resp.Body) _ = resp.Body.Close() return nil, fmt.Errorf("ragflow 流式接口 HTTP %d: %s", resp.StatusCode, string(b)) } return resp.Body, nil }