diff --git a/ai_providers/ai_perplexity.py b/ai_providers/ai_perplexity.py index 39d9d61..e7f2320 100644 --- a/ai_providers/ai_perplexity.py +++ b/ai_providers/ai_perplexity.py @@ -14,7 +14,7 @@ from openai import OpenAI class PerplexityThread(Thread): """处理Perplexity请求的线程""" - def __init__(self, perplexity_instance, prompt, chat_id, send_text_func, receiver, at_user=None): + def __init__(self, perplexity_instance, prompt, chat_id, send_text_func, receiver, at_user=None, on_finish: Optional[Callable[[], None]] = None): """初始化Perplexity处理线程 Args: @@ -33,6 +33,7 @@ class PerplexityThread(Thread): self.receiver = receiver self.at_user = at_user self.LOG = logging.getLogger("PerplexityThread") + self.on_finish = on_finish # 检查是否使用reasoning模型 self.is_reasoning_model = False @@ -67,6 +68,12 @@ class PerplexityThread(Thread): except Exception as e: self.LOG.error(f"处理Perplexity请求时出错: {e}") self.send_text_func(f"处理请求时出错: {e}", at_list=self.at_user) + finally: + if self.on_finish: + try: + self.on_finish() + except Exception as cleanup_error: + self.LOG.error(f"清理Perplexity线程时出错: {cleanup_error}") def remove_thinking_content(self, text): """移除标签之间的思考内容 @@ -200,8 +207,15 @@ class PerplexityManager: return False # 发送等待消息 - send_text_func("正在使用Perplexity进行深度研究,请稍候...", at_list=at_user) + send_text_func("正在联网查询,请稍候...", at_list=at_user) + # 添加线程完成回调,自动清理线程 + def thread_finished_callback(): + with self.lock: + thread = self.threads.pop(thread_key, None) + if thread is not None: + self.LOG.info(f"已清理Perplexity线程: {thread_key}") + # 创建并启动新线程处理请求 perplexity_thread = PerplexityThread( perplexity_instance=perplexity_instance, @@ -209,16 +223,10 @@ class PerplexityManager: chat_id=chat_id, send_text_func=send_text_func, receiver=receiver, - at_user=at_user + at_user=at_user, + on_finish=thread_finished_callback ) - # 添加线程完成回调,自动清理线程 - def thread_finished_callback(): - with self.lock: - if thread_key in self.threads: - del self.threads[thread_key] - self.LOG.info(f"已清理Perplexity线程: {thread_key}") - # 保存线程引用 self.threads[thread_key] = perplexity_thread @@ -231,34 +239,31 @@ class PerplexityManager: def cleanup_threads(self): """清理所有Perplexity线程""" with self.lock: - active_threads = [] - for thread_key, thread in self.threads.items(): - if thread.is_alive(): - active_threads.append(thread_key) - - if active_threads: - self.LOG.info(f"等待{len(active_threads)}个Perplexity线程结束: {active_threads}") - - # 等待所有线程结束,但最多等待10秒 - for i in range(10): - active_count = 0 - for thread_key, thread in self.threads.items(): - if thread.is_alive(): - active_count += 1 - - if active_count == 0: - break - - time.sleep(1) - - # 记录未能结束的线程 + active_threads = [thread_key for thread_key, thread in self.threads.items() if thread.is_alive()] + + if active_threads: + self.LOG.info(f"等待{len(active_threads)}个Perplexity线程结束: {active_threads}") + + # 等待所有线程结束,但最多等待10秒 + for _ in range(10): + with self.lock: + active_count = sum(1 for thread in self.threads.values() if thread.is_alive()) + + if active_count == 0: + break + + time.sleep(1) + + with self.lock: still_active = [thread_key for thread_key, thread in self.threads.items() if thread.is_alive()] if still_active: self.LOG.warning(f"以下Perplexity线程在退出时仍在运行: {still_active}") - - # 清空线程字典 - self.threads.clear() - self.LOG.info("Perplexity线程管理已清理") + self.threads.clear() + else: + with self.lock: + self.threads.clear() + + self.LOG.info("Perplexity线程管理已清理") class Perplexity: @@ -445,4 +450,4 @@ class Perplexity: self.thread_manager.cleanup_threads() def __str__(self): - return "Perplexity" \ No newline at end of file + return "Perplexity"