import json import requests import os from typing import Generator from openai import OpenAI from .base import BaseModel class DeepSeekModel(BaseModel): def __init__(self, api_key: str, temperature: float = 0.7, system_prompt: str = None, language: str = None, model_name: str = "deepseek-reasoner", api_base_url: str = None): super().__init__(api_key, temperature, system_prompt, language) self.model_name = model_name self.api_base_url = api_base_url # 存储API基础URL def get_default_system_prompt(self) -> str: return """You are an expert at analyzing questions and providing detailed solutions. When presented with an image of a question: 1. First read and understand the question carefully 2. Break down the key components of the question 3. Provide a clear, step-by-step solution 4. If relevant, explain any concepts or theories involved 5. If there are multiple approaches, explain the most efficient one first""" def get_model_identifier(self) -> str: """根据模型名称返回正确的API标识符""" # 通过模型名称来确定实际的API调用标识符 if self.model_name == "deepseek-chat": return "deepseek-chat" # 如果是deepseek-reasoner或包含reasoner的模型名称,返回推理模型标识符 if "reasoner" in self.model_name.lower(): return "deepseek-reasoner" # 对于deepseek-chat也返回对应的模型名称 if "chat" in self.model_name.lower() or self.model_name == "deepseek-chat": return "deepseek-chat" # 根据配置中的模型ID来确定实际的模型类型 if self.model_name == "deepseek-reasoner": return "deepseek-reasoner" elif self.model_name == "deepseek-chat": return "deepseek-chat" # 默认使用deepseek-chat作为API标识符 print(f"未知的DeepSeek模型名称: {self.model_name},使用deepseek-chat作为默认值") return "deepseek-chat" def analyze_text(self, text: str, proxies: dict = None) -> Generator[dict, None, None]: """Stream DeepSeek's response for text analysis""" try: # Initial status yield {"status": "started", "content": ""} # 保存原始环境变量 original_env = { 'http_proxy': os.environ.get('http_proxy'), 'https_proxy': os.environ.get('https_proxy') } try: # 如果提供了代理设置,通过环境变量设置 if proxies: if 'http' in proxies: os.environ['http_proxy'] = proxies['http'] if 'https' in proxies: os.environ['https_proxy'] = proxies['https'] # 初始化DeepSeek客户端,不再使用session对象 client = OpenAI( api_key=self.api_key, base_url="https://api.deepseek.com" ) # 使用系统提供的系统提示词,不再自动添加语言指令 system_prompt = self.system_prompt # 构建请求参数 params = { "model": self.get_model_identifier(), "messages": [ { 'role': 'system', 'content': system_prompt }, { 'role': 'user', 'content': text } ], "stream": True } # 只有非推理模型才设置temperature参数 if not self.get_model_identifier().endswith('reasoner') and self.temperature is not None: params["temperature"] = self.temperature print(f"调用DeepSeek API: {self.get_model_identifier()}, 是否设置温度: {not self.get_model_identifier().endswith('reasoner')}, 温度值: {self.temperature if not self.get_model_identifier().endswith('reasoner') else 'N/A'}") response = client.chat.completions.create(**params) # 使用两个缓冲区,分别用于常规内容和思考内容 response_buffer = "" thinking_buffer = "" for chunk in response: # 打印chunk以调试 try: print(f"DeepSeek API返回chunk: {chunk}") except: print("无法打印chunk") try: # 同时处理两种不同的内容,确保正确区分思考内容和最终内容 delta = chunk.choices[0].delta # 处理推理模型的思考内容 if hasattr(delta, 'reasoning_content') and delta.reasoning_content: content = delta.reasoning_content thinking_buffer += content # 发送思考内容更新 if len(content) >= 20 or content.endswith(('.', '!', '?', '。', '!', '?', '\n')): yield { "status": "thinking", "content": thinking_buffer } # 处理最终结果内容 - 即使在推理模型中也会有content字段 if hasattr(delta, 'content') and delta.content: content = delta.content response_buffer += content print(f"累积响应内容: '{content}', 当前buffer: '{response_buffer}'") # 发送结果内容更新 if len(content) >= 10 or content.endswith(('.', '!', '?', '。', '!', '?', '\n')): yield { "status": "streaming", "content": response_buffer } # 处理消息结束 if hasattr(chunk.choices[0], 'finish_reason') and chunk.choices[0].finish_reason: print(f"生成结束,原因: {chunk.choices[0].finish_reason}") # 注意:不要在这里把思考内容作为正文,因为这可能导致重复内容 except Exception as e: print(f"解析响应chunk时出错: {str(e)}") continue # 确保发送最终的缓冲内容 if thinking_buffer: yield { "status": "thinking_complete", "content": thinking_buffer } # 发送最终响应内容 if response_buffer: yield { "status": "completed", "content": response_buffer } # 如果没有正常的响应内容,但有思考内容,则将思考内容作为最终结果 elif thinking_buffer: yield { "status": "completed", "content": thinking_buffer } else: # 如果两者都没有,返回一个空结果 yield { "status": "completed", "content": "没有获取到内容" } except Exception as e: error_msg = str(e) print(f"DeepSeek API调用出错: {error_msg}") # 提供具体的错误信息 if "invalid_api_key" in error_msg.lower(): error_msg = "DeepSeek API密钥无效,请检查您的API密钥" elif "rate_limit" in error_msg.lower(): error_msg = "DeepSeek API请求频率超限,请稍后再试" elif "quota_exceeded" in error_msg.lower(): error_msg = "DeepSeek API配额已用完,请续费或等待下个计费周期" yield { "status": "error", "error": f"DeepSeek API错误: {error_msg}" } finally: # 恢复原始环境变量 for key, value in original_env.items(): if value is None: if key in os.environ: del os.environ[key] else: os.environ[key] = value except Exception as e: error_msg = str(e) print(f"调用DeepSeek模型时发生错误: {error_msg}") if "invalid_api_key" in error_msg.lower(): error_msg = "API密钥无效,请检查设置" elif "rate_limit" in error_msg.lower(): error_msg = "API请求频率超限,请稍后再试" yield { "status": "error", "error": f"DeepSeek API错误: {error_msg}" } def analyze_image(self, image_data: str, proxies: dict = None) -> Generator[dict, None, None]: """Stream DeepSeek's response for image analysis""" try: # 检查我们是否有支持图像的模型 if self.model_name == "deepseek-chat" or self.model_name == "deepseek-reasoner": yield { "status": "error", "error": "当前DeepSeek模型不支持图像分析,请使用Anthropic或OpenAI的多模态模型" } return # Initial status yield {"status": "started", "content": ""} # 保存原始环境变量 original_env = { 'http_proxy': os.environ.get('http_proxy'), 'https_proxy': os.environ.get('https_proxy') } try: # 如果提供了代理设置,通过环境变量设置 if proxies: if 'http' in proxies: os.environ['http_proxy'] = proxies['http'] if 'https' in proxies: os.environ['https_proxy'] = proxies['https'] # 初始化DeepSeek客户端,不再使用session对象 client = OpenAI( api_key=self.api_key, base_url="https://api.deepseek.com" ) # 使用系统提供的系统提示词,不再自动添加语言指令 system_prompt = self.system_prompt # 构建请求参数 params = { "model": self.get_model_identifier(), "messages": [ { 'role': 'system', 'content': system_prompt }, { 'role': 'user', 'content': f"Here's an image of a question to analyze: data:image/png;base64,{image_data}" } ], "stream": True } # 只有非推理模型才设置temperature参数 if not self.get_model_identifier().endswith('reasoner') and self.temperature is not None: params["temperature"] = self.temperature response = client.chat.completions.create(**params) # 使用两个缓冲区,分别用于常规内容和思考内容 response_buffer = "" thinking_buffer = "" for chunk in response: # 打印chunk以调试 try: print(f"DeepSeek图像API返回chunk: {chunk}") except: print("无法打印chunk") try: # 同时处理两种不同的内容,确保正确区分思考内容和最终内容 delta = chunk.choices[0].delta # 处理推理模型的思考内容 if hasattr(delta, 'reasoning_content') and delta.reasoning_content: content = delta.reasoning_content thinking_buffer += content # 发送思考内容更新 if len(content) >= 20 or content.endswith(('.', '!', '?', '。', '!', '?', '\n')): yield { "status": "thinking", "content": thinking_buffer } # 处理最终结果内容 - 即使在推理模型中也会有content字段 if hasattr(delta, 'content') and delta.content: content = delta.content response_buffer += content print(f"累积图像响应内容: '{content}', 当前buffer: '{response_buffer}'") # 发送结果内容更新 if len(content) >= 10 or content.endswith(('.', '!', '?', '。', '!', '?', '\n')): yield { "status": "streaming", "content": response_buffer } # 处理消息结束 if hasattr(chunk.choices[0], 'finish_reason') and chunk.choices[0].finish_reason: print(f"图像生成结束,原因: {chunk.choices[0].finish_reason}") except Exception as e: print(f"解析图像响应chunk时出错: {str(e)}") continue # 确保发送最终的缓冲内容 if thinking_buffer: yield { "status": "thinking_complete", "content": thinking_buffer } # 发送最终响应内容 if response_buffer: yield { "status": "completed", "content": response_buffer } except Exception as e: error_msg = str(e) print(f"DeepSeek API调用出错: {error_msg}") # 提供具体的错误信息 if "invalid_api_key" in error_msg.lower(): error_msg = "DeepSeek API密钥无效,请检查您的API密钥" elif "rate_limit" in error_msg.lower(): error_msg = "DeepSeek API请求频率超限,请稍后再试" yield { "status": "error", "error": f"DeepSeek API错误: {error_msg}" } finally: # 恢复原始环境变量 for key, value in original_env.items(): if value is None: if key in os.environ: del os.environ[key] else: os.environ[key] = value except Exception as e: error_msg = str(e) if "invalid_api_key" in error_msg.lower(): error_msg = "API密钥无效,请检查设置" elif "rate_limit" in error_msg.lower(): error_msg = "API请求频率超限,请稍后再试" yield { "status": "error", "error": f"DeepSeek API错误: {error_msg}" }