Files
Bubbles/ai_providers/ai_perplexity.py
2025-10-28 14:37:47 +08:00

458 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import logging
import re
import time
from typing import Optional, Dict, Callable, List
import os
from threading import Thread, Lock
from openai import OpenAI
class PerplexityThread(Thread):
"""处理Perplexity请求的线程"""
def __init__(
self,
perplexity_instance,
prompt,
chat_id,
send_text_func,
receiver,
at_user=None,
on_finish: Optional[Callable[[], None]] = None,
enable_full_research: bool = False,
):
"""初始化Perplexity处理线程
Args:
perplexity_instance: Perplexity实例
prompt: 查询内容
chat_id: 聊天ID
send_text_func: 发送消息的函数,接受(消息内容, 接收者ID, @用户ID)参数
receiver: 接收消息的ID
at_user: 被@的用户ID
"""
super().__init__(daemon=True)
self.perplexity = perplexity_instance
self.prompt = prompt
self.chat_id = chat_id
self.send_text_func = send_text_func
self.receiver = receiver
self.at_user = at_user
self.LOG = logging.getLogger("PerplexityThread")
self.on_finish = on_finish
self.enable_full_research = enable_full_research
# 检查是否使用reasoning模型
self.is_reasoning_model = bool(self.enable_full_research and getattr(self.perplexity, 'has_reasoning_model', False))
if self.is_reasoning_model:
self.LOG.info("Perplexity将启用推理模型处理此次请求")
def run(self):
"""线程执行函数"""
try:
self.LOG.info(f"开始处理Perplexity请求: {self.prompt[:30]}...")
# 获取回答
response = self.perplexity.get_answer(
self.prompt,
self.chat_id,
deep_research=self.enable_full_research
)
# 处理sonar-reasoning和sonar-reasoning-pro模型的<think>标签
if response:
# 只有对reasoning模型才应用清理逻辑
if self.is_reasoning_model:
response = self.remove_thinking_content(response)
# 移除Markdown格式符号
response = self.remove_markdown_formatting(response)
self.send_text_func(response, at_list=self.at_user)
else:
self.send_text_func("无法从Perplexity获取回答", at_list=self.at_user)
self.LOG.info(f"Perplexity请求处理完成: {self.prompt[:30]}...")
except Exception as e:
self.LOG.error(f"处理Perplexity请求时出错: {e}")
self.send_text_func(f"处理请求时出错: {e}", at_list=self.at_user)
finally:
if self.on_finish:
try:
self.on_finish()
except Exception as cleanup_error:
self.LOG.error(f"清理Perplexity线程时出错: {cleanup_error}")
def remove_thinking_content(self, text):
"""移除<think></think>标签之间的思考内容
Args:
text: 原始响应文本
Returns:
str: 处理后的文本
"""
try:
# 检查是否包含思考标签
has_thinking = '<think>' in text or '</think>' in text
if has_thinking:
self.LOG.info("检测到思考内容标签,准备移除...")
# 导入正则表达式库
import re
# 移除不完整的标签对情况
if text.count('<think>') != text.count('</think>'):
self.LOG.warning(f"检测到不匹配的思考标签: <think>数量={text.count('<think>')}, </think>数量={text.count('</think>')}")
# 提取思考内容用于日志记录
thinking_pattern = re.compile(r'<think>(.*?)</think>', re.DOTALL)
thinking_matches = thinking_pattern.findall(text)
if thinking_matches:
for i, thinking in enumerate(thinking_matches):
short_thinking = thinking[:100] + '...' if len(thinking) > 100 else thinking
self.LOG.debug(f"思考内容 #{i+1}: {short_thinking}")
# 替换所有的<think>...</think>内容 - 使用非贪婪模式
cleaned_text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
# 处理不完整的标签
cleaned_text = re.sub(r'<think>.*?$', '', cleaned_text, flags=re.DOTALL) # 处理未闭合的开始标签
cleaned_text = re.sub(r'^.*?</think>', '', cleaned_text, flags=re.DOTALL) # 处理未开始的闭合标签
# 处理可能的多余空行
cleaned_text = re.sub(r'\n{3,}', '\n\n', cleaned_text)
# 移除前后空白
cleaned_text = cleaned_text.strip()
self.LOG.info(f"思考内容已移除,原文本长度: {len(text)} -> 清理后: {len(cleaned_text)}")
# 如果清理后文本为空,返回一个提示信息
if not cleaned_text:
return "回答内容为空,可能是模型仅返回了思考过程。请重新提问。"
return cleaned_text
else:
return text # 没有思考标签,直接返回原文本
except Exception as e:
self.LOG.error(f"清理思考内容时出错: {e}")
return text # 出错时返回原始文本
def remove_markdown_formatting(self, text):
"""移除Markdown格式符号如*和#
Args:
text: 包含Markdown格式的文本
Returns:
str: 移除Markdown格式后的文本
"""
try:
# 导入正则表达式库
import re
self.LOG.info("开始移除Markdown格式符号...")
# 保存原始文本长度
original_length = len(text)
# 移除标题符号 (#)
# 替换 # 开头的标题,保留文本内容
cleaned_text = re.sub(r'^\s*#{1,6}\s+(.+)$', r'\1', text, flags=re.MULTILINE)
# 移除强调符号 (*)
# 替换 **粗体** 和 *斜体* 格式,保留文本内容
cleaned_text = re.sub(r'\*\*(.*?)\*\*', r'\1', cleaned_text)
cleaned_text = re.sub(r'\*(.*?)\*', r'\1', cleaned_text)
# 处理可能的多余空行
cleaned_text = re.sub(r'\n{3,}', '\n\n', cleaned_text)
# 移除前后空白
cleaned_text = cleaned_text.strip()
self.LOG.info(f"Markdown格式符号已移除原文本长度: {original_length} -> 清理后: {len(cleaned_text)}")
return cleaned_text
except Exception as e:
self.LOG.error(f"移除Markdown格式符号时出错: {e}")
return text # 出错时返回原始文本
class PerplexityManager:
"""管理Perplexity请求线程的类"""
def __init__(self):
self.threads = {}
self.lock = Lock()
self.LOG = logging.getLogger("PerplexityManager")
def start_request(
self,
perplexity_instance,
prompt,
chat_id,
send_text_func,
receiver,
at_user=None,
enable_full_research: bool = False,
):
"""启动Perplexity请求线程
Args:
perplexity_instance: Perplexity实例
prompt: 查询内容
chat_id: 聊天ID
send_text_func: 发送消息的函数
receiver: 接收消息的ID
at_user: 被@的用户ID
enable_full_research: 是否启用深度研究模式
Returns:
bool: 是否成功启动线程
"""
thread_key = f"{receiver}_{chat_id}"
full_research_available = enable_full_research and getattr(perplexity_instance, 'has_reasoning_model', False)
with self.lock:
# 检查是否已有正在处理的相同请求
if thread_key in self.threads and self.threads[thread_key].is_alive():
send_text_func("⚠️ 已有一个Perplexity请求正在处理中请稍后再试", at_list=at_user)
return False
# 发送等待消息
wait_msg = "正在启用满血模式研究中...." if full_research_available else "正在联网查询,请稍候..."
if enable_full_research and not full_research_available:
self.LOG.warning("收到满血模式请求,但未配置推理模型,退回普通模式。")
# 等待提示无需 @ 用户,避免频繁打扰
send_text_func(wait_msg, at_list="", record_message=False)
# 添加线程完成回调,自动清理线程
def thread_finished_callback():
with self.lock:
thread = self.threads.pop(thread_key, None)
if thread is not None:
self.LOG.info(f"已清理Perplexity线程: {thread_key}")
# 创建并启动新线程处理请求
perplexity_thread = PerplexityThread(
perplexity_instance=perplexity_instance,
prompt=prompt,
chat_id=chat_id,
send_text_func=send_text_func,
receiver=receiver,
at_user=at_user,
on_finish=thread_finished_callback,
enable_full_research=full_research_available
)
# 保存线程引用
self.threads[thread_key] = perplexity_thread
# 启动线程
perplexity_thread.start()
self.LOG.info(f"已启动Perplexity请求线程: {thread_key}")
return True
def cleanup_threads(self):
"""清理所有Perplexity线程"""
with self.lock:
active_threads = [thread_key for thread_key, thread in self.threads.items() if thread.is_alive()]
if active_threads:
self.LOG.info(f"等待{len(active_threads)}个Perplexity线程结束: {active_threads}")
# 等待所有线程结束但最多等待10秒
for _ in range(10):
with self.lock:
active_count = sum(1 for thread in self.threads.values() if thread.is_alive())
if active_count == 0:
break
time.sleep(1)
with self.lock:
still_active = [thread_key for thread_key, thread in self.threads.items() if thread.is_alive()]
if still_active:
self.LOG.warning(f"以下Perplexity线程在退出时仍在运行: {still_active}")
self.threads.clear()
else:
with self.lock:
self.threads.clear()
self.LOG.info("Perplexity线程管理已清理")
class Perplexity:
def __init__(self, config):
self.config = config
self.api_key = config.get('key')
self.api_base = config.get('api', 'https://api.perplexity.ai')
self.proxy = config.get('proxy')
self.prompt = config.get('prompt', '你是智能助手Perplexity')
self.trigger_keyword = config.get('trigger_keyword', 'ask')
self.fallback_prompt = config.get('fallback_prompt', "请像 Perplexity 一样以专业、客观、信息丰富的方式回答问题。不要使用任何tex或者md格式,纯文本输出。")
self.LOG = logging.getLogger('Perplexity')
self.model_flash = config.get('model_flash') or config.get('model', 'sonar')
self.model_reasoning = config.get('model_reasoning')
if self.model_reasoning and self.model_reasoning.lower() == (self.model_flash or '').lower():
self.model_reasoning = None
self.has_reasoning_model = bool(self.model_reasoning)
# 设置编码环境变量确保处理Unicode字符
os.environ["PYTHONIOENCODING"] = "utf-8"
# 创建线程管理器
self.thread_manager = PerplexityManager()
# 创建OpenAI客户端
self.client = None
if self.api_key:
try:
self.client = OpenAI(
api_key=self.api_key,
base_url=self.api_base
)
# 如果有代理设置
if self.proxy:
# OpenAI客户端不直接支持代理设置需要通过环境变量
os.environ["HTTPS_PROXY"] = self.proxy
os.environ["HTTP_PROXY"] = self.proxy
self.LOG.info("Perplexity 客户端已初始化")
except Exception as e:
self.LOG.error(f"初始化Perplexity客户端失败: {str(e)}")
else:
self.LOG.warning("未配置Perplexity API密钥")
@staticmethod
def value_check(args: dict) -> bool:
if args:
return all(value is not None for key, value in args.items() if key != 'proxy')
return False
def get_answer(self, prompt, session_id=None, deep_research: bool = False):
"""获取Perplexity回答
Args:
prompt: 用户输入的问题
session_id: 会话ID用于区分不同会话
Returns:
str: Perplexity的回答
"""
try:
if not self.api_key or not self.client:
return "Perplexity API key 未配置或客户端初始化失败"
# 构建消息列表
messages = [
{"role": "system", "content": self.prompt},
{"role": "user", "content": prompt}
]
# 获取模型
model = self.model_reasoning if (deep_research and self.has_reasoning_model) else self.model_flash or self.config.get('model', 'sonar')
if deep_research and self.has_reasoning_model:
self.LOG.info(f"Perplexity启动深度研究模式使用模型: {model}")
# 使用json序列化确保正确处理Unicode
self.LOG.info(f"发送到Perplexity的消息: {json.dumps(messages, ensure_ascii=False)}")
# 创建聊天完成
response = self.client.chat.completions.create(
model=model,
messages=messages
)
# 返回回答内容
return response.choices[0].message.content
except Exception as e:
self.LOG.error(f"调用Perplexity API时发生错误: {str(e)}")
return f"发生错误: {str(e)}"
def process_message(
self,
content,
chat_id,
sender,
roomid,
from_group,
send_text_func,
enable_full_research: bool = False,
):
"""处理可能包含Perplexity触发词的消息
Args:
content: 消息内容
chat_id: 聊天ID
sender: 发送者ID
roomid: 群聊ID如果是群聊
from_group: 是否来自群聊
send_text_func: 发送消息的函数
enable_full_research: 是否启用深度研究模式
Returns:
tuple[bool, Optional[str]]:
- bool: 是否已处理该消息
- Optional[str]: 无权限时的备选prompt其他情况为None
"""
prompt = (content or "").strip()
if not prompt:
return False, None
stripped_by_keyword = False
trigger = (self.trigger_keyword or "").strip()
if trigger:
lowered_prompt = prompt.lower()
lowered_trigger = trigger.lower()
if lowered_prompt.startswith(lowered_trigger):
stripped_by_keyword = True
prompt = prompt[len(trigger):].strip()
if not prompt:
if stripped_by_keyword:
send_text_func(
"请告诉我你想搜索什么内容",
at_list=sender if from_group else "",
record_message=False
)
return True, None
return False, None
receiver = roomid if from_group else sender
at_user = sender if from_group else None
request_started = self.thread_manager.start_request(
perplexity_instance=self,
prompt=prompt,
chat_id=chat_id,
send_text_func=send_text_func,
receiver=receiver,
at_user=at_user,
enable_full_research=enable_full_research
)
return request_started, None
def cleanup(self):
"""清理所有资源"""
self.thread_manager.cleanup_threads()
def __str__(self):
return "Perplexity"