func (f *FreeAskUsecase) freeAskHandle(ctx context.Context, req *FreeAskReq, output *FreeAskResp) { ctx, cancel := context.WithTimeout(ctx, time.Minute*2) defer cancel() modelCh := make(chan *ModelResponse, 10) interruptCh := make(chan struct{}, 1) var msgIdStr string var msg *Message msg, err := f.createInitialMessage(ctx, req) if err != nil { f.log.Errorf("创建初始消息失败: %v", err) output.Status = StatusFailed output.Message = "创建消息失败" return } msgIdStr = msg.Msg.Id output.MsgId = msgIdStr chatHistory, err := f.repo.GetRecentChatHistory(ctx, req.TalId, req.SubjectId, 2) if err != nil { f.log.Warnf("获取历史对话记录失败: %v", err) } messages := make([]Message, 0, len(chatHistory)*2+1) systemPrompt := f.buildSystemPrompt(req.SubjectId, intent, intentDetails) messages = append(messages, Message{ Role: "system", Content: systemPrompt, }) for _, chat := range chatHistory { messages = append(messages, Message{ Role: "user", Content: chat.Question, }) messages = append(messages, Message{ Role: "assistant", Content: chat.Answer, }) } messages = append(messages, Message{ Role: "user", Content: req.Question, }) intent, intentDetails := f.recognizeUserIntent(ctx, req.Question) f.log.Infof("用户意图识别结果: %s, 详情: %+v", intent, intentDetails) if f.handleSpecialIntent(ctx, intent, intentDetails, req, output) { return } safeResult, err := f.safetyCheck(ctx, req.Question) if err != nil || !safeResult.IsSafe { f.log.Errorf("内容安全审核未通过: %v", err) output.Status = StatusRejected output.Message = "内容包含不安全信息,请修改后重试" f.repo.UpdateMessageStatus(ctx, msgIdStr, MessageStatusRejected) return } go f.listenForInterruption(ctx, req.TalId, msgIdStr, interruptCh) promptOptions := &PromptOptions{ Intent: intent, IntentDetails: intentDetails, } prompt, err := f.buildPromptWithOptions(ctx, req, promptOptions) if err != nil { f.log.Errorf("构建提示词失败: %v", err) output.Status = StatusFailed output.Message = "系统处理异常" return } modelRequest := &DeepSeekModelRequest{ Model: "deepseek-r1", Messages: messages, MaxTokens: 2048, Temperature: 0.7, Stream: true, } modelCtx, modelCancel := context.WithCancel(ctx) defer modelCancel() go func() { select { case <-interruptCh: modelCancel() case <-ctx.Done(): return } }() modelCh := make(chan *DeepSeekResponse, 10) go func() { defer close(modelCh) err := f.deepSeekClient.GenerateStream(modelCtx, modelRequest, func(chunk *DeepSeekChunk) error { if chunk.Error != nil { modelCh <- &DeepSeekResponse{ Error: chunk.Error, } return chunk.Error } modelCh <- &DeepSeekResponse{ Content: chunk.Content, IsFinal: chunk.IsFinal, ToolCalls: chunk.ToolCalls, GeneratedText: chunk.GeneratedText, Usage: chunk.Usage, } return nil }) if err != nil && !errors.Is(err, context.Canceled) { f.log.Errorf("DeepSeek-R1模型调用失败: %v", err) modelCh <- &DeepSeekResponse{ Error: err, } } }() var fullContent strings.Builder isFirstChunk := true for { select { case <-ctx.Done(): f.log.Warnf("请求处理超时: %s", msgIdStr) output.Status = StatusTimeout output.Message = "处理超时,请稍后重试" sseWriter.WriteEvent(&SSEEvent{ Event: "timeout", Data: map[string]interface{}{ "msg_id": msgIdStr, "message": "处理超时,请稍后重试", }, }) f.repo.UpdateMessageStatus(ctx, msgIdStr, MessageStatusFailed) return case resp, ok := <-modelCh: if !ok { goto END } if resp.Error != nil { f.log.Errorf("模型返回错误: %v", resp.Error) output.Status = StatusFailed output.Message = "AI生成回答失败" sseWriter.WriteEvent(&SSEEvent{ Event: "error", Data: map[string]interface{}{ "msg_id": msgIdStr, "message": "AI生成回答失败", }, }) f.repo.UpdateMessageStatus(ctx, msgIdStr, MessageStatusFailed) return } content := resp.Content if len(content) > 0 { safeResult, _ := f.safetyCheck(ctx, content) if !safeResult.IsSafe { f.log.Warnf("模型回复内容存在安全风险: %s", content) content = "对不起,我无法提供这方面的回答。" } } fullContent.WriteString(content) if isFirstChunk { isFirstChunk = false f.repo.UpdateMessageStatus(ctx, msgIdStr, MessageStatusInProgress) output.Status = StatusSuccess output.AnswerBegin = content sseWriter.WriteEvent(&SSEEvent{ Event: "answer_begin", Data: map[string]interface{}{ "msg_id": msgIdStr, "content": content, }, }) } else { sseWriter.WriteEvent(&SSEEvent{ Event: "answer_chunk", Data: map[string]interface{}{ "msg_id": msgIdStr, "content": content, }, }) } if resp.HasSpecialFunction { f.handleSpecialFunction(ctx, resp.SpecialFunction, msgIdStr, req.TalId, sseWriter) } case _, ok := <-interruptCh: if !ok { continue } f.log.Infof("用户中断请求: %s", msgIdStr) msg.Msg.IsInterrupt = 1 sseWriter.WriteEvent(&SSEEvent{ Event: "interrupted", Data: map[string]interface{}{ "msg_id": msgIdStr, }, }) f.handelInterrupt(ctx, msgIdStr, msg.SubjectId) goto END } } END: finalContent := fullContent.String() err = f.repo.UpdateMessageContent(ctx, msgIdStr, finalContent) if err != nil { f.log.Errorf("更新消息内容失败: %v", err) } if msg.Msg.IsInterrupt == 0 { f.repo.UpdateMessageStatus(ctx, msgIdStr, MessageStatusCompleted) sseWriter.WriteEvent(&SSEEvent{ Event: "answer_complete", Data: map[string]interface{}{ "msg_id": msgIdStr, "content": finalContent, }, }) f.updateSessionHistory(ctx, req.TalId, req.Question, finalContent, msgIdStr) } else { f.repo.UpdateMessageStatus(ctx, msgIdStr, MessageStatusInterrupted) } output.FullAnswer = finalContent output.Status = StatusSuccess output.Suggestions = suggestions output.Knowledge = knowledge}