package internal import ( "context" "fmt" "github.com/gogf/gf/util/guid" "yx-dataset-server/app/errors" "yx-dataset-server/app/model" "yx-dataset-server/app/schema" "yx-dataset-server/library/ragflow" "yx-dataset-server/library/robot" ) // NewRobotConfig 创建RobotConfig func NewRobotConfig( mRobotConfig model.IRobotConfig, mRobotDataset model.IRobotDataset, mTrans model.ITrans, mChatMessage model.IChatMessage, mChatAssistant model.IChatAssistant, mChatSession model.IChatSession, mChatDataset model.IChatDataset, mUser model.IUser, mDataset model.IDataset, ) *RobotConfig { return &RobotConfig{ RobotConfigModel: mRobotConfig, robotDatasetModel: mRobotDataset, transModel: mTrans, chatMessageModel: mChatMessage, chatAssistantModel: mChatAssistant, chatSessionModel: mChatSession, chatDatasetModel: mChatDataset, userModel: mUser, datasetModel: mDataset, } } // RobotConfig 创建RobotConfig对象 type RobotConfig struct { RobotConfigModel model.IRobotConfig robotDatasetModel model.IRobotDataset transModel model.ITrans chatMessageModel model.IChatMessage chatAssistantModel model.IChatAssistant chatSessionModel model.IChatSession chatDatasetModel model.IChatDataset userModel model.IUser datasetModel model.IDataset } // Query 查询数据 func (a *RobotConfig) Query(ctx context.Context, params schema.RobotConfigQueryParam, opts ...schema.RobotConfigQueryOptions) (*schema.RobotConfigQueryResult, error) { return a.RobotConfigModel.Query(ctx, params, opts...) } // Get 查询指定数据 func (a *RobotConfig) Get(ctx context.Context, recordID string, opts ...schema.RobotConfigQueryOptions) (*schema.RobotConfig, error) { item, err := a.RobotConfigModel.Get(ctx, recordID, opts...) if err != nil { return nil, err } else if item == nil { return nil, errors.ErrNotFound } return item, nil } func (a *RobotConfig) checkCode(ctx context.Context, code string) error { result, err := a.RobotConfigModel.Query(ctx, schema.RobotConfigQueryParam{ Code: code, }, schema.RobotConfigQueryOptions{ PageParam: &schema.PaginationParam{PageSize: -1}, }) if err != nil { return err } else if result.PageResult.Total > 0 { return errors.New400Response("编号已经存在") } return nil } func (a *RobotConfig) getUpdate(ctx context.Context, recordID string) (*schema.RobotConfig, error) { return a.Get(ctx, recordID) } // Create 创建数据 func (a *RobotConfig) Create(ctx context.Context, item schema.RobotConfig) (*schema.RobotConfig, error) { item.RecordID = guid.S() item.CreatorId = GetUserID(ctx) item.AssistantId = guid.S() item.SessionId = guid.S() item.Webhook = fmt.Sprintf("%s%s", "http://113.128.186.214:6666/robot/v1/robots/receive/", item.RecordID) user := GetRootUser() var err error if !CheckIsRootUser(ctx) { user, err = a.userModel.Get(ctx, GetUserID(ctx)) if err != nil { return nil, err } } // 创建对话助手及会话 err = a.createAssistant(ctx, item.AssistantId, item.SessionId, *user, item) if err != nil { return nil, err } err = ExecTrans(ctx, a.transModel, func(ctx context.Context) error { if len(item.Datasets) > 0 { for _, dataset := range item.Datasets { err := a.robotDatasetModel.Create(ctx, schema.RobotDataset{ RecordID: guid.S(), RobotId: item.RecordID, DatasetId: dataset.RecordID, }) if err != nil { return err } } return a.RobotConfigModel.Create(ctx, item) } return nil }) if err != nil { return nil, err } return a.getUpdate(ctx, item.RecordID) } // Update 更新数据 func (a *RobotConfig) Update(ctx context.Context, recordID string, item schema.RobotConfig) (*schema.RobotConfig, error) { oldItem, err := a.RobotConfigModel.Get(ctx, recordID) if err != nil { return nil, err } else if oldItem == nil { return nil, errors.ErrNotFound } err = a.RobotConfigModel.Update(ctx, recordID, item) if err != nil { return nil, err } return a.getUpdate(ctx, recordID) } // Delete 删除数据 func (a *RobotConfig) Delete(ctx context.Context, recordID string) error { oldItem, err := a.RobotConfigModel.Get(ctx, recordID) if err != nil { return err } else if oldItem == nil { return errors.ErrNotFound } return a.RobotConfigModel.Delete(ctx, recordID) } // UpdateStatus 更新状态 func (a *RobotConfig) UpdateStatus(ctx context.Context, recordID string, status int) error { oldItem, err := a.RobotConfigModel.Get(ctx, recordID) if err != nil { return err } else if oldItem == nil { return errors.ErrNotFound } return a.RobotConfigModel.UpdateStatus(ctx, recordID, status) } func (a *RobotConfig) ReceiveMessage(ctx context.Context, id string, message robot.DingTalkRobotMessage) error { robotConfig, err := a.RobotConfigModel.Get(ctx, id) if err != nil { return err } assistant, err := a.chatAssistantModel.Get(ctx, robotConfig.AssistantId) if err != nil { return err } session, err := a.chatSessionModel.Get(ctx, robotConfig.SessionId) if err != nil { return err } stream, err := ragflow.GetHttpClient().ChatCompletionsStream(ctx, assistant.RagChatId, &ragflow.ChatCompletionReq{ Question: message.Text.Content, SessionID: session.RagSessionId, }) if err != nil { return err } defer stream.Close() // 关闭流 m := new(schema.ChatMessage) m.RecordID = guid.S() m.Question = message.Text.Content m.AssistantId = assistant.RecordID m.SessionId = session.RecordID m.UserId = GetUserID(ctx) m, err = CollectStreamDataAndCreate(ctx, stream, m) if err != nil { return err } err = a.chatMessageModel.Create(ctx, *m) if err != nil { return err } replyMsg := robot.DingReplyMsg{ MsgType: "markdown", Markdown: robot.Markdown{ Title: m.Question, Text: m.Answer, }, } err = robot.SendReplyMsg(message.SessionWebhook, replyMsg) if err != nil { fmt.Println(err) return err } return nil } func (a *RobotConfig) createAssistant(ctx context.Context, assistantId, sessionId string, user schema.User, item schema.RobotConfig) error { err := ExecTrans(ctx, a.transModel, func(ctx context.Context) error { dataset, err := a.datasetModel.Query(ctx, schema.DatasetQueryParam{RecordIds: item.Datasets.ToRecordIds()}) if err != nil { return err } c, err := ragflow.GetHttpClient().CreateChat(ctx, &ragflow.CreateChatReq{ Name: fmt.Sprintf("%s%s", item.Name, "助手"), DatasetIDs: dataset.Data.ToRagDataIds(), }) if err != nil { return err } err = a.chatAssistantModel.Create(ctx, schema.ChatAssistant{ RecordID: assistantId, OrgId: user.OrgId, Name: fmt.Sprintf("%s%s", item.Name, "助手"), RagChatId: c.Data.ID, CreatorId: user.RecordID, }) if err != nil { return err } if len(item.Datasets) > 0 { for _, v := range item.Datasets { err = a.chatDatasetModel.Create(ctx, schema.ChatDataset{ RecordID: guid.S(), ChatAssistantId: assistantId, DatasetId: v.RecordID, }) if err != nil { return err } } } s, err := ragflow.GetHttpClient().CreateSession(ctx, c.Data.ID, &ragflow.CreateSessionReq{ Name: fmt.Sprintf("%s%s", item.Name, "会话"), }) if err != nil { return err } err = a.chatSessionModel.Create(ctx, schema.ChatSession{ RecordID: sessionId, Name: fmt.Sprintf("%s%s", item.Name, "会话"), AssistantId: assistantId, RagSessionId: s.Data.Id, CreatorId: user.RecordID, }) return nil }) return err }