perplexity 线程清理

This commit is contained in:
zihanjian
2025-09-25 15:23:16 +08:00
parent cc68c16e9d
commit 93c4f0c53f

View File

@@ -14,7 +14,7 @@ from openai import OpenAI
class PerplexityThread(Thread):
"""处理Perplexity请求的线程"""
def __init__(self, perplexity_instance, prompt, chat_id, send_text_func, receiver, at_user=None):
def __init__(self, perplexity_instance, prompt, chat_id, send_text_func, receiver, at_user=None, on_finish: Optional[Callable[[], None]] = None):
"""初始化Perplexity处理线程
Args:
@@ -33,6 +33,7 @@ class PerplexityThread(Thread):
self.receiver = receiver
self.at_user = at_user
self.LOG = logging.getLogger("PerplexityThread")
self.on_finish = on_finish
# 检查是否使用reasoning模型
self.is_reasoning_model = False
@@ -67,6 +68,12 @@ class PerplexityThread(Thread):
except Exception as e:
self.LOG.error(f"处理Perplexity请求时出错: {e}")
self.send_text_func(f"处理请求时出错: {e}", at_list=self.at_user)
finally:
if self.on_finish:
try:
self.on_finish()
except Exception as cleanup_error:
self.LOG.error(f"清理Perplexity线程时出错: {cleanup_error}")
def remove_thinking_content(self, text):
"""移除<think></think>标签之间的思考内容
@@ -200,8 +207,15 @@ class PerplexityManager:
return False
# 发送等待消息
send_text_func("正在使用Perplexity进行深度研究,请稍候...", at_list=at_user)
send_text_func("正在联网查询,请稍候...", at_list=at_user)
# 添加线程完成回调,自动清理线程
def thread_finished_callback():
with self.lock:
thread = self.threads.pop(thread_key, None)
if thread is not None:
self.LOG.info(f"已清理Perplexity线程: {thread_key}")
# 创建并启动新线程处理请求
perplexity_thread = PerplexityThread(
perplexity_instance=perplexity_instance,
@@ -209,16 +223,10 @@ class PerplexityManager:
chat_id=chat_id,
send_text_func=send_text_func,
receiver=receiver,
at_user=at_user
at_user=at_user,
on_finish=thread_finished_callback
)
# 添加线程完成回调,自动清理线程
def thread_finished_callback():
with self.lock:
if thread_key in self.threads:
del self.threads[thread_key]
self.LOG.info(f"已清理Perplexity线程: {thread_key}")
# 保存线程引用
self.threads[thread_key] = perplexity_thread
@@ -231,34 +239,31 @@ class PerplexityManager:
def cleanup_threads(self):
"""清理所有Perplexity线程"""
with self.lock:
active_threads = []
for thread_key, thread in self.threads.items():
if thread.is_alive():
active_threads.append(thread_key)
if active_threads:
self.LOG.info(f"等待{len(active_threads)}个Perplexity线程结束: {active_threads}")
# 等待所有线程结束但最多等待10秒
for i in range(10):
active_count = 0
for thread_key, thread in self.threads.items():
if thread.is_alive():
active_count += 1
if active_count == 0:
break
time.sleep(1)
# 记录未能结束的线程
active_threads = [thread_key for thread_key, thread in self.threads.items() if thread.is_alive()]
if active_threads:
self.LOG.info(f"等待{len(active_threads)}个Perplexity线程结束: {active_threads}")
# 等待所有线程结束但最多等待10秒
for _ in range(10):
with self.lock:
active_count = sum(1 for thread in self.threads.values() if thread.is_alive())
if active_count == 0:
break
time.sleep(1)
with self.lock:
still_active = [thread_key for thread_key, thread in self.threads.items() if thread.is_alive()]
if still_active:
self.LOG.warning(f"以下Perplexity线程在退出时仍在运行: {still_active}")
# 清空线程字典
self.threads.clear()
self.LOG.info("Perplexity线程管理已清理")
self.threads.clear()
else:
with self.lock:
self.threads.clear()
self.LOG.info("Perplexity线程管理已清理")
class Perplexity:
@@ -445,4 +450,4 @@ class Perplexity:
self.thread_manager.cleanup_threads()
def __str__(self):
return "Perplexity"
return "Perplexity"