diff --git a/channel/chat_channel.py b/channel/chat_channel.py index 27f3af0..7e2df3a 100644 --- a/channel/chat_channel.py +++ b/channel/chat_channel.py @@ -337,24 +337,27 @@ class ChatChannel(Channel): while True: with self.lock: session_ids = list(self.sessions.keys()) - for session_id in session_ids: + for session_id in session_ids: + with self.lock: context_queue, semaphore = self.sessions[session_id] - if semaphore.acquire(blocking=False): # 等线程处理完毕才能删除 - if not context_queue.empty(): - context = context_queue.get() - logger.debug("[chat_channel] consume context: {}".format(context)) - future: Future = handler_pool.submit(self._handle, context) - future.add_done_callback(self._thread_pool_callback(session_id, context=context)) + if semaphore.acquire(blocking=False): # 等线程处理完毕才能删除 + if not context_queue.empty(): + context = context_queue.get() + logger.debug("[chat_channel] consume context: {}".format(context)) + future: Future = handler_pool.submit(self._handle, context) + future.add_done_callback(self._thread_pool_callback(session_id, context=context)) + with self.lock: if session_id not in self.futures: self.futures[session_id] = [] self.futures[session_id].append(future) - elif semaphore._initial_value == semaphore._value + 1: # 除了当前,没有任务再申请到信号量,说明所有任务都处理完毕 + elif semaphore._initial_value == semaphore._value + 1: # 除了当前,没有任务再申请到信号量,说明所有任务都处理完毕 + with self.lock: self.futures[session_id] = [t for t in self.futures[session_id] if not t.done()] assert len(self.futures[session_id]) == 0, "thread pool error" del self.sessions[session_id] - else: - semaphore.release() - time.sleep(0.1) + else: + semaphore.release() + time.sleep(0.2) # 取消session_id对应的所有任务,只能取消排队的消息和已提交线程池但未执行的任务 def cancel_session(self, session_id):