From accb6a9de1e70ff000b635cf9a19caa8648141e5 Mon Sep 17 00:00:00 2001 From: zhayujie Date: Wed, 22 Mar 2023 01:14:59 +0800 Subject: [PATCH] fix: multi channel startup --- app.py | 67 ++++++++++++++++++++++++++++++------------------------- config.py | 3 +-- 2 files changed, 37 insertions(+), 33 deletions(-) diff --git a/app.py b/app.py index 7b9f792..72764b0 100644 --- a/app.py +++ b/app.py @@ -2,57 +2,62 @@ import config from channel import channel_factory -from common import log -import os +from common import log, const from multiprocessing import Pool -def startProcess(channel_type): +# 启动通道 +def start_process(channel_type): + model_type = config.conf().get("model").get("type") # load config - config.load_config() + log.info("[INIT] Start up: {} on {}", model_type, channel_type) + # create channel channel = channel_factory.create_channel(channel_type) + # startup channel channel.startup() -def wrapper(channel_type): - startProcess(channel_type) - - if __name__ == '__main__': try: # load config config.load_config() - proxy = config.conf().get("model").get("openai").get("proxy") - if proxy: - os.environ['http_proxy'] = proxy - os.environ['https_proxy'] = proxy - model_type = config.conf().get("model").get("type") channel_type = config.conf().get("channel").get("type") - if isinstance(channel_type, list) == False: # 兼容字符串格式的type - channel_type = [channel_type] - # 使用主进程启动终端通道 - if "terminal" in channel_type: - index = channel_type.index("terminal") - terminal = channel_type.pop(index) + # 1.单个字符串格式配置时,直接启动 + if not isinstance(channel_type, list): + start_process(channel_type) + exit(0) + + # 2.单通道列表配置时,直接启动 + if len(channel_type) == 1: + start_process(channel_type[0]) + exit(0) + + # 3.多通道配置时,进程池启动 else: - terminal = None - # 使用进程池启动其他通道子进程 - pool = Pool(len(channel_type)) - for type in channel_type: - log.info("[INIT] Start up: {} on {}", model_type, type) - pool.apply_async(wrapper, args=[type]) + # 使用主进程启动终端通道 + if const.TERMINAL in channel_type: + index = channel_type.index(const.TERMINAL) + terminal = channel_type.pop(index) + else: + terminal = None - if terminal: - channel = channel_factory.create_channel(terminal) - channel.startup() - # 等待池中所有进程执行完毕 - pool.close() - pool.join() + # 使用进程池启动其他通道子进程 + pool = Pool(len(channel_type)) + for type_item in channel_type: + log.info("[INIT] Start up: {} on {}", model_type, type_item) + pool.apply_async(start_process, args=[type_item]) + + if terminal: + start_process(terminal) + + # 等待池中所有进程执行完毕 + pool.close() + pool.join() except Exception as e: log.error("App startup failed!") log.exception(e) diff --git a/config.py b/config.py index 9509d95..6ae2c95 100644 --- a/config.py +++ b/config.py @@ -15,8 +15,7 @@ def load_config(): config_str = read_file(config_path) # 将json字符串反序列化为dict类型 config = json.loads(config_str) - print("载入环节" ) - print(config) + print("Load config success") return config def get_root():