mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-01-22 08:30:46 +08:00
171 lines
6.6 KiB
Python
171 lines
6.6 KiB
Python
# encoding:utf-8
|
|
|
|
import importlib
|
|
import json
|
|
import os
|
|
from common.singleton import singleton
|
|
from common.sorted_dict import SortedDict
|
|
from .event import *
|
|
from .plugin import *
|
|
from common.log import logger
|
|
|
|
|
|
@singleton
|
|
class PluginManager:
|
|
def __init__(self):
|
|
self.plugins = SortedDict(lambda k,v: v.priority,reverse=True)
|
|
self.listening_plugins = {}
|
|
self.instances = {}
|
|
self.pconf = {}
|
|
|
|
def register(self, name: str, desc: str, version: str, author: str, desire_priority: int = 0):
|
|
def wrapper(plugincls):
|
|
plugincls.name = name
|
|
plugincls.desc = desc
|
|
plugincls.version = version
|
|
plugincls.author = author
|
|
plugincls.priority = desire_priority
|
|
plugincls.enabled = True
|
|
self.plugins[name.upper()] = plugincls
|
|
logger.info("Plugin %s_v%s registered" % (name, version))
|
|
return plugincls
|
|
return wrapper
|
|
|
|
def save_config(self):
|
|
with open("plugins/plugins.json", "w", encoding="utf-8") as f:
|
|
json.dump(self.pconf, f, indent=4, ensure_ascii=False)
|
|
|
|
def load_config(self):
|
|
logger.info("Loading plugins config...")
|
|
|
|
modified = False
|
|
if os.path.exists("plugins/plugins.json"):
|
|
with open("plugins/plugins.json", "r", encoding="utf-8") as f:
|
|
pconf = json.load(f)
|
|
pconf['plugins'] = SortedDict(lambda k,v: v["priority"],pconf['plugins'],reverse=True)
|
|
else:
|
|
modified = True
|
|
pconf = {"plugins": SortedDict(lambda k,v: v["priority"],reverse=True)}
|
|
self.pconf = pconf
|
|
if modified:
|
|
self.save_config()
|
|
return pconf
|
|
|
|
def scan_plugins(self):
|
|
logger.info("Scaning plugins ...")
|
|
plugins_dir = "plugins"
|
|
for plugin_name in os.listdir(plugins_dir):
|
|
plugin_path = os.path.join(plugins_dir, plugin_name)
|
|
if os.path.isdir(plugin_path):
|
|
# 判断插件是否包含同名.py文件
|
|
main_module_path = os.path.join(plugin_path, plugin_name+".py")
|
|
if os.path.isfile(main_module_path):
|
|
# 导入插件
|
|
import_path = "{}.{}.{}".format(plugins_dir, plugin_name, plugin_name)
|
|
main_module = importlib.import_module(import_path)
|
|
pconf = self.pconf
|
|
new_plugins = []
|
|
modified = False
|
|
for name, plugincls in self.plugins.items():
|
|
rawname = plugincls.name
|
|
if rawname not in pconf["plugins"]:
|
|
new_plugins.append(plugincls)
|
|
modified = True
|
|
logger.info("Plugin %s not found in pconfig, adding to pconfig..." % name)
|
|
pconf["plugins"][rawname] = {"enabled": plugincls.enabled, "priority": plugincls.priority}
|
|
else:
|
|
self.plugins[name].enabled = pconf["plugins"][rawname]["enabled"]
|
|
self.plugins[name].priority = pconf["plugins"][rawname]["priority"]
|
|
self.plugins._update_heap(name) # 更新下plugins中的顺序
|
|
if modified:
|
|
self.save_config()
|
|
return new_plugins
|
|
|
|
def refresh_order(self):
|
|
for event in self.listening_plugins.keys():
|
|
self.listening_plugins[event].sort(key=lambda name: self.plugins[name].priority, reverse=True)
|
|
|
|
def activate_plugins(self): # 生成新开启的插件实例
|
|
for name, plugincls in self.plugins.items():
|
|
if plugincls.enabled:
|
|
if name not in self.instances:
|
|
instance = plugincls()
|
|
self.instances[name] = instance
|
|
for event in instance.handlers:
|
|
if event not in self.listening_plugins:
|
|
self.listening_plugins[event] = []
|
|
self.listening_plugins[event].append(name)
|
|
self.refresh_order()
|
|
|
|
def reload_plugin(self, name:str):
|
|
name = name.upper()
|
|
if name in self.instances:
|
|
for event in self.listening_plugins:
|
|
if name in self.listening_plugins[event]:
|
|
self.listening_plugins[event].remove(name)
|
|
del self.instances[name]
|
|
self.activate_plugins()
|
|
return True
|
|
return False
|
|
|
|
def load_plugins(self):
|
|
self.load_config()
|
|
self.scan_plugins()
|
|
pconf = self.pconf
|
|
logger.debug("plugins.json config={}".format(pconf))
|
|
for name,plugin in pconf["plugins"].items():
|
|
if name.upper() not in self.plugins:
|
|
logger.error("Plugin %s not found, but found in plugins.json" % name)
|
|
self.activate_plugins()
|
|
|
|
def emit_event(self, e_context: EventContext, *args, **kwargs):
|
|
if e_context.event in self.listening_plugins:
|
|
for name in self.listening_plugins[e_context.event]:
|
|
if self.plugins[name].enabled and e_context.action == EventAction.CONTINUE:
|
|
logger.debug("Plugin %s triggered by event %s" % (name,e_context.event))
|
|
instance = self.instances[name]
|
|
instance.handlers[e_context.event](e_context, *args, **kwargs)
|
|
return e_context
|
|
|
|
def set_plugin_priority(self, name:str, priority:int):
|
|
name = name.upper()
|
|
if name not in self.plugins:
|
|
return False
|
|
if self.plugins[name].priority == priority:
|
|
return True
|
|
self.plugins[name].priority = priority
|
|
self.plugins._update_heap(name)
|
|
rawname = self.plugins[name].name
|
|
self.pconf["plugins"][rawname]["priority"] = priority
|
|
self.pconf["plugins"]._update_heap(rawname)
|
|
self.save_config()
|
|
self.refresh_order()
|
|
return True
|
|
|
|
def enable_plugin(self, name:str):
|
|
name = name.upper()
|
|
if name not in self.plugins:
|
|
return False
|
|
if not self.plugins[name].enabled :
|
|
self.plugins[name].enabled = True
|
|
rawname = self.plugins[name].name
|
|
self.pconf["plugins"][rawname]["enabled"] = True
|
|
self.save_config()
|
|
self.activate_plugins()
|
|
return True
|
|
return True
|
|
|
|
def disable_plugin(self, name:str):
|
|
name = name.upper()
|
|
if name not in self.plugins:
|
|
return False
|
|
if self.plugins[name].enabled :
|
|
self.plugins[name].enabled = False
|
|
rawname = self.plugins[name].name
|
|
self.pconf["plugins"][rawname]["enabled"] = False
|
|
self.save_config()
|
|
return True
|
|
return True
|
|
|
|
def list_plugins(self):
|
|
return self.plugins |