import json import requests from typing import Generator, Optional from .base import BaseModel class AnthropicModel(BaseModel): def __init__(self, api_key, temperature=0.7, system_prompt=None, language=None, api_base_url=None, model_identifier=None): super().__init__(api_key, temperature, system_prompt or self.get_default_system_prompt(), language or "en") # 设置API基础URL,默认为Anthropic官方API self.api_base_url = api_base_url or "https://api.anthropic.com/v1" # 设置模型标识符,支持动态选择 self.model_identifier = model_identifier or "claude-3-7-sonnet-20250219" # 初始化推理配置 self.reasoning_config = None # 初始化最大Token数 self.max_tokens = None def get_default_system_prompt(self) -> str: return """You are an expert at analyzing questions and providing detailed solutions. When presented with an image of a question: 1. First read and understand the question carefully 2. Break down the key components of the question 3. Provide a clear, step-by-step solution 4. If relevant, explain any concepts or theories involved 5. If there are multiple approaches, explain the most efficient one first""" def get_model_identifier(self) -> str: return self.model_identifier def analyze_text(self, text: str, proxies: Optional[dict] = None) -> Generator[dict, None, None]: """Stream Claude's response for text analysis""" try: yield {"status": "started"} api_key = self.api_key if api_key.startswith('Bearer '): api_key = api_key[7:] headers = { 'x-api-key': api_key, 'anthropic-version': '2023-06-01', 'content-type': 'application/json', 'accept': 'application/json', } # 获取最大输出Token设置 max_tokens = 8192 # 默认值 if hasattr(self, 'max_tokens') and self.max_tokens: max_tokens = self.max_tokens payload = { 'model': self.get_model_identifier(), 'stream': True, 'max_tokens': max_tokens, 'temperature': 1, 'system': self.system_prompt, 'messages': [{ 'role': 'user', 'content': [ { 'type': 'text', 'text': text } ] }] } # 处理推理配置 if hasattr(self, 'reasoning_config') and self.reasoning_config: # 如果设置了extended reasoning if self.reasoning_config.get('reasoning_depth') == 'extended': think_budget = self.reasoning_config.get('think_budget', max_tokens // 2) payload['thinking'] = { 'type': 'enabled', 'budget_tokens': think_budget } # 如果设置了instant模式 elif self.reasoning_config.get('speed_mode') == 'instant': # 确保当使用speed_mode时不包含thinking参数 if 'thinking' in payload: del payload['thinking'] # 默认启用思考但使用较小的预算 else: payload['thinking'] = { 'type': 'enabled', 'budget_tokens': min(4096, max_tokens // 4) } # 默认设置 else: payload['thinking'] = { 'type': 'enabled', 'budget_tokens': min(4096, max_tokens // 4) } print(f"Debug - 推理配置: max_tokens={max_tokens}, thinking={payload.get('thinking', payload.get('speed_mode', 'default'))}") # 使用配置的API基础URL api_endpoint = f"{self.api_base_url}/messages" response = requests.post( api_endpoint, headers=headers, json=payload, stream=True, proxies=proxies, timeout=60 ) if response.status_code != 200: error_msg = f'API error: {response.status_code}' try: error_data = response.json() if 'error' in error_data: error_msg += f" - {error_data['error']['message']}" except: error_msg += f" - {response.text}" yield {"status": "error", "error": error_msg} return thinking_content = "" response_buffer = "" for chunk in response.iter_lines(): if not chunk: continue try: chunk_str = chunk.decode('utf-8') if not chunk_str.startswith('data: '): continue chunk_str = chunk_str[6:] data = json.loads(chunk_str) if data.get('type') == 'content_block_delta': if 'delta' in data: if 'text' in data['delta']: text_chunk = data['delta']['text'] response_buffer += text_chunk # 只在每累积一定数量的字符后才发送,减少UI跳变 if len(text_chunk) >= 10 or text_chunk.endswith(('.', '!', '?', '。', '!', '?', '\n')): yield { "status": "streaming", "content": response_buffer } elif 'thinking' in data['delta']: thinking_chunk = data['delta']['thinking'] thinking_content += thinking_chunk # 只在每累积一定数量的字符后才发送,减少UI跳变 if len(thinking_chunk) >= 20 or thinking_chunk.endswith(('.', '!', '?', '。', '!', '?', '\n')): yield { "status": "thinking", "content": thinking_content } # 处理新的extended_thinking格式 elif data.get('type') == 'extended_thinking_delta': if 'delta' in data and 'text' in data['delta']: thinking_chunk = data['delta']['text'] thinking_content += thinking_chunk # 只在每累积一定数量的字符后才发送,减少UI跳变 if len(thinking_chunk) >= 20 or thinking_chunk.endswith(('.', '!', '?', '。', '!', '?', '\n')): yield { "status": "thinking", "content": thinking_content } elif data.get('type') == 'message_stop': # 确保发送完整的思考内容 if thinking_content: yield { "status": "thinking_complete", "content": thinking_content } # 确保发送完整的响应内容 yield { "status": "completed", "content": response_buffer } elif data.get('type') == 'error': error_msg = data.get('error', {}).get('message', 'Unknown error') yield { "status": "error", "error": error_msg } break except json.JSONDecodeError as e: print(f"JSON decode error: {str(e)}") continue except Exception as e: yield { "status": "error", "error": f"Streaming error: {str(e)}" } def analyze_image(self, image_data, proxies: Optional[dict] = None): yield {"status": "started"} api_key = self.api_key if api_key.startswith('Bearer '): api_key = api_key[7:] headers = { 'x-api-key': api_key, 'anthropic-version': '2023-06-01', 'content-type': 'application/json' } # 使用系统提供的系统提示词,不再自动添加语言指令 system_prompt = self.system_prompt # 获取最大输出Token设置 max_tokens = 8192 # 默认值 if hasattr(self, 'max_tokens') and self.max_tokens: max_tokens = self.max_tokens payload = { 'model': self.get_model_identifier(), 'stream': True, 'max_tokens': max_tokens, 'temperature': 1, 'system': system_prompt, 'messages': [{ 'role': 'user', 'content': [ { 'type': 'image', 'source': { 'type': 'base64', 'media_type': 'image/png', 'data': image_data } }, { 'type': 'text', 'text': "请分析这个问题并提供详细的解决方案。如果你看到多个问题,请逐一解决。" } ] }] } # 处理推理配置 if hasattr(self, 'reasoning_config') and self.reasoning_config: # 如果设置了extended reasoning if self.reasoning_config.get('reasoning_depth') == 'extended': think_budget = self.reasoning_config.get('think_budget', max_tokens // 2) payload['thinking'] = { 'type': 'enabled', 'budget_tokens': think_budget } # 如果设置了instant模式 elif self.reasoning_config.get('speed_mode') == 'instant': # 只需确保不包含thinking参数,不添加speed_mode参数 if 'thinking' in payload: del payload['thinking'] # 默认启用思考但使用较小的预算 else: payload['thinking'] = { 'type': 'enabled', 'budget_tokens': min(4096, max_tokens // 4) } # 默认设置 else: payload['thinking'] = { 'type': 'enabled', 'budget_tokens': min(4096, max_tokens // 4) } print(f"Debug - 图像分析推理配置: max_tokens={max_tokens}, thinking={payload.get('thinking', payload.get('speed_mode', 'default'))}") # 使用配置的API基础URL api_endpoint = f"{self.api_base_url}/messages" response = requests.post( api_endpoint, headers=headers, json=payload, stream=True, proxies=proxies, timeout=60 ) if response.status_code != 200: error_msg = f'API error: {response.status_code}' try: error_data = response.json() if 'error' in error_data: error_msg += f" - {error_data['error']['message']}" except: error_msg += f" - {response.text}" yield {"status": "error", "error": error_msg} return thinking_content = "" response_buffer = "" for chunk in response.iter_lines(): if not chunk: continue try: chunk_str = chunk.decode('utf-8') if not chunk_str.startswith('data: '): continue chunk_str = chunk_str[6:] data = json.loads(chunk_str) if data.get('type') == 'content_block_delta': if 'delta' in data: if 'text' in data['delta']: text_chunk = data['delta']['text'] response_buffer += text_chunk # 只在每累积一定数量的字符后才发送,减少UI跳变 if len(text_chunk) >= 10 or text_chunk.endswith(('.', '!', '?', '。', '!', '?', '\n')): yield { "status": "streaming", "content": response_buffer } elif 'thinking' in data['delta']: thinking_chunk = data['delta']['thinking'] thinking_content += thinking_chunk # 只在每累积一定数量的字符后才发送,减少UI跳变 if len(thinking_chunk) >= 20 or thinking_chunk.endswith(('.', '!', '?', '。', '!', '?', '\n')): yield { "status": "thinking", "content": thinking_content } # 处理新的extended_thinking格式 elif data.get('type') == 'extended_thinking_delta': if 'delta' in data and 'text' in data['delta']: thinking_chunk = data['delta']['text'] thinking_content += thinking_chunk # 只在每累积一定数量的字符后才发送,减少UI跳变 if len(thinking_chunk) >= 20 or thinking_chunk.endswith(('.', '!', '?', '。', '!', '?', '\n')): yield { "status": "thinking", "content": thinking_content } elif data.get('type') == 'message_stop': # 确保发送完整的思考内容 if thinking_content: yield { "status": "thinking_complete", "content": thinking_content } # 确保发送完整的响应内容 yield { "status": "completed", "content": response_buffer } elif data.get('type') == 'error': error_message = data.get('error', {}).get('message', 'Unknown error') yield { "status": "error", "error": error_message } except Exception as e: yield { "status": "error", "error": f"Error processing response: {str(e)}" } break