Files
Snap-Solver/app.py
2025-03-06 14:45:52 +08:00

421 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Flask, jsonify, render_template, request
from flask_socketio import SocketIO
import pyautogui
import base64
from io import BytesIO
import socket
from threading import Thread
import pystray
from PIL import Image, ImageDraw
import pyperclip
from models import ModelFactory
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*", ping_timeout=30, ping_interval=5, max_http_buffer_size=50 * 1024 * 1024)
# Commented out due to model file issues
# from pix2text import Pix2Text
def get_local_ip():
try:
# Get local IP address
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "127.0.0.1"
def create_tray_icon():
# Create a simple icon (a colored circle)
icon_size = 64
icon_image = Image.new('RGB', (icon_size, icon_size), color='white')
draw = ImageDraw.Draw(icon_image)
draw.ellipse([4, 4, icon_size-4, icon_size-4], fill='#2196F3') # Using the primary color from our CSS
# Get server URL
ip_address = get_local_ip()
server_url = f"http://{ip_address}:5000"
# Create menu
menu = pystray.Menu(
pystray.MenuItem(server_url, lambda icon, item: None, enabled=False),
pystray.MenuItem("Exit", lambda icon, item: icon.stop())
)
# Create icon
icon = pystray.Icon(
"SnapSolver",
icon_image,
"Snap Solver",
menu
)
return icon
@app.route('/')
def index():
local_ip = get_local_ip()
return render_template('index.html', local_ip=local_ip)
@socketio.on('connect')
def handle_connect():
print('Client connected')
@socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected')
# Commented out due to model file issues
# p2t = Pix2Text()
def stream_model_response(response_generator, sid):
"""Stream model responses to the client"""
try:
print("Starting response streaming...")
# 初始化:发送开始状态
socketio.emit('claude_response', {
'status': 'started',
'content': ''
}, room=sid)
print("Sent initial status to client")
# 维护服务端缓冲区以累积完整内容
response_buffer = ""
thinking_buffer = ""
# 流式处理响应
for response in response_generator:
# 处理Mathpix响应
if isinstance(response.get('content', ''), str) and 'mathpix' in response.get('model', ''):
socketio.emit('text_extracted', {
'content': response['content']
}, room=sid)
continue
# 获取状态和内容
status = response.get('status', '')
content = response.get('content', '')
# 根据不同的状态进行处理
if status == 'thinking':
# 累积思考内容到缓冲区
thinking_buffer += content
# 发送完整的思考内容
socketio.emit('claude_response', {
'status': 'thinking',
'content': thinking_buffer
}, room=sid)
elif status == 'thinking_complete':
# 直接使用完整的思考内容
thinking_buffer = content # 使用服务器提供的完整内容
print(f"Thinking complete, total length: {len(thinking_buffer)} chars")
socketio.emit('claude_response', {
'status': 'thinking_complete',
'content': thinking_buffer
}, room=sid)
elif status == 'streaming':
# 流式输出正常内容
if content:
# 累积到服务端缓冲区
response_buffer += content
# 发送完整的内容
# print(f"Streaming response content: {len(response_buffer)} chars")
socketio.emit('claude_response', {
'status': 'streaming',
'content': response_buffer
}, room=sid)
else:
# 其他状态直接转发
socketio.emit('claude_response', response, room=sid)
# 调试信息
if status == 'completed':
print("Response completed")
elif status == 'error':
print(f"Error: {response.get('error', 'Unknown error')}")
except Exception as e:
error_msg = f"Streaming error: {str(e)}"
print(error_msg)
socketio.emit('claude_response', {
'status': 'error',
'error': error_msg
}, room=sid)
@socketio.on('request_screenshot')
def handle_screenshot_request():
try:
# Capture the screen
screenshot = pyautogui.screenshot()
# Convert the image to base64 string
buffered = BytesIO()
screenshot.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode()
# Emit the screenshot back to the client
socketio.emit('screenshot_response', {
'success': True,
'image': img_str
})
except Exception as e:
socketio.emit('screenshot_response', {
'success': False,
'error': str(e)
})
@socketio.on('extract_text')
def handle_text_extraction(data):
try:
print("Starting text extraction...")
# Validate input data
if not data or not isinstance(data, dict):
raise ValueError("Invalid request data")
if 'image' not in data:
raise ValueError("No image data provided")
image_data = data['image']
if not isinstance(image_data, str):
raise ValueError("Invalid image data format")
# 检查图像大小,避免处理过大的图像导致断开连接
image_size_bytes = len(image_data) * 3 / 4 # 估算base64的实际大小
if image_size_bytes > 10 * 1024 * 1024: # 10MB
raise ValueError("Image too large, please crop to a smaller area")
settings = data.get('settings', {})
if not isinstance(settings, dict):
raise ValueError("Invalid settings format")
mathpix_key = settings.get('mathpixApiKey')
if not mathpix_key:
raise ValueError("Mathpix API key is required")
# 先回复客户端,确认已收到请求,防止超时断开
# 注意这里不能使用return否则后续代码不会执行
socketio.emit('request_acknowledged', {
'status': 'received',
'message': 'Image received, text extraction in progress'
}, room=request.sid)
try:
app_id, app_key = mathpix_key.split(':')
if not app_id.strip() or not app_key.strip():
raise ValueError()
except ValueError:
raise ValueError("Invalid Mathpix API key format. Expected format: 'app_id:app_key'")
print("Creating Mathpix model instance...")
model = ModelFactory.create_model(
model_name='mathpix',
api_key=mathpix_key
)
print("Starting text extraction...")
# 使用新的extract_full_text方法直接提取完整文本
extracted_text = model.extract_full_text(image_data)
# 直接返回文本结果
socketio.emit('text_extracted', {
'content': extracted_text
}, room=request.sid)
except ValueError as e:
error_msg = str(e)
print(f"Validation error: {error_msg}")
socketio.emit('text_extracted', {
'error': error_msg
}, room=request.sid)
except Exception as e:
error_msg = f"Text extraction error: {str(e)}"
print(f"Unexpected error: {error_msg}")
print(f"Error details: {type(e).__name__}")
socketio.emit('text_extracted', {
'error': error_msg
}, room=request.sid)
@socketio.on('analyze_text')
def handle_analyze_text(data):
try:
text = data.get('text')
settings = data.get('settings', {})
sid = request.sid
print("Selected model:", settings.get('model', 'claude-3-7-sonnet-20250219'))
# Get API key and create model
model_name = settings.get('model', 'claude-3-7-sonnet-20250219')
api_key = settings.get('api_keys', {}).get(model_name)
# Validate required settings
if not api_key:
raise ValueError("API key is required for the selected model")
# Configure proxy settings if enabled
proxies = None
if settings.get('proxyEnabled', False):
proxy_host = settings.get('proxyHost', '127.0.0.1')
proxy_port = settings.get('proxyPort', '4780')
proxies = {
'http': f'http://{proxy_host}:{proxy_port}',
'https': f'http://{proxy_host}:{proxy_port}'
}
try:
# Create model instance using factory
model = ModelFactory.create_model(
model_name=model_name,
api_key=api_key,
temperature=float(settings.get('temperature', 0.7)),
system_prompt=settings.get('systemPrompt')
)
# Start streaming in a separate thread
Thread(
target=stream_model_response,
args=(model.analyze_text(text, proxies), sid)
).start()
except Exception as e:
socketio.emit('claude_response', {
'status': 'error',
'error': f'API error: {str(e)}'
}, room=sid)
except Exception as e:
print(f"Analysis error: {str(e)}")
socketio.emit('claude_response', {
'status': 'error',
'error': f'Analysis error: {str(e)}'
}, room=request.sid)
@socketio.on('analyze_image')
def handle_analyze_image(data):
try:
# 检查数据是否有效
if not data or not isinstance(data, dict):
raise ValueError("Invalid request data")
image_data = data.get('image')
if not image_data:
raise ValueError("No image data provided")
# 检查图像大小,避免处理过大的图像导致断开连接
image_size_bytes = len(image_data) * 3 / 4 # 估算base64的实际大小
if image_size_bytes > 10 * 1024 * 1024: # 10MB
raise ValueError("Image too large, please crop to a smaller area or use text extraction")
settings = data.get('settings', {})
# 不需要分割了,因为前端已经做了分割
# _, base64_data = image_data_url.split(',', 1)
base64_data = image_data
# Get API key and create model
model_name = settings.get('model', 'claude-3-7-sonnet-20250219')
api_key = settings.get('api_keys', {}).get(model_name)
# Validate required settings
if not api_key:
raise ValueError(f"API key is required for the selected model: {model_name}")
# Log with model name for better debugging
print(f"Using API key for {model_name}: {api_key[:6]}...")
print("Selected model:", model_name)
# Configure proxy settings if enabled
proxies = None
if settings.get('proxyEnabled', False):
proxy_host = settings.get('proxyHost', '127.0.0.1')
proxy_port = settings.get('proxyPort', '4780')
proxies = {
'http': f'http://{proxy_host}:{proxy_port}',
'https': f'http://{proxy_host}:{proxy_port}'
}
try:
# 先回复客户端,确认已收到请求,防止超时断开
# 注意这里不能使用return否则后续代码不会执行
socketio.emit('request_acknowledged', {
'status': 'received',
'message': 'Image received, analysis in progress'
}, room=request.sid)
# Create model instance using factory
model = ModelFactory.create_model(
model_name=model_name,
api_key=api_key,
temperature=float(settings.get('temperature', 0.7)),
system_prompt=settings.get('systemPrompt')
)
# Start streaming in a separate thread
Thread(
target=stream_model_response,
args=(model.analyze_image(base64_data, proxies), request.sid)
).start()
except Exception as e:
socketio.emit('claude_response', {
'status': 'error',
'error': f'API error: {str(e)}'
}, room=request.sid)
except Exception as e:
print(f"Analysis error: {str(e)}")
socketio.emit('claude_response', {
'status': 'error',
'error': f'Analysis error: {str(e)}'
}, room=request.sid)
@socketio.on('capture_screenshot')
def handle_capture_screenshot(data):
try:
# Capture the screen
screenshot = pyautogui.screenshot()
# Convert the image to base64 string
buffered = BytesIO()
screenshot.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode()
# Emit the screenshot back to the client
socketio.emit('screenshot_complete', {
'success': True,
'image': img_str
}, room=request.sid)
except Exception as e:
error_msg = f"Screenshot error: {str(e)}"
print(f"Error capturing screenshot: {error_msg}")
socketio.emit('screenshot_complete', {
'success': False,
'error': error_msg
}, room=request.sid)
def run_tray():
icon = create_tray_icon()
icon.run()
if __name__ == '__main__':
local_ip = get_local_ip()
print(f"Local IP Address: {local_ip}")
print(f"Connect from your mobile device using: {local_ip}:5000")
# Run system tray icon in a separate thread
tray_thread = Thread(target=run_tray)
tray_thread.daemon = True
tray_thread.start()
# Run Flask in the main thread without debug mode
socketio.run(app, host='0.0.0.0', port=5000, allow_unsafe_werkzeug=True)