Files
Bubbles/ai_providers/chatglm/tool_registry.py
2025-04-23 13:30:10 +08:00

168 lines
5.3 KiB
Python

import inspect
import json
import random
import re
import traceback
from copy import deepcopy
from datetime import datetime
from types import GenericAlias
from typing import Annotated, get_origin
from ai_providers.chatglm.comfyUI_api import ComfyUIApi
from function.func_news import News
from zhdate import ZhDate
_TOOL_HOOKS = {}
_TOOL_DESCRIPTIONS = {}
def extract_code(text: str) -> str:
pattern = r'```([^\n]*)\n(.*?)```'
matches = re.findall(pattern, text, re.DOTALL)
return matches[-1][1]
def register_tool(func: callable):
tool_name = func.__name__
tool_description = inspect.getdoc(func).strip()
python_params = inspect.signature(func).parameters
tool_params = []
for name, param in python_params.items():
annotation = param.annotation
if annotation is inspect.Parameter.empty:
raise TypeError(f"Parameter `{name}` missing type annotation")
if get_origin(annotation) != Annotated:
raise TypeError(
f"Annotation type for `{name}` must be typing.Annotated")
typ, (description, required) = annotation.__origin__, annotation.__metadata__
typ: str = str(typ) if isinstance(typ, GenericAlias) else typ.__name__
if not isinstance(description, str):
raise TypeError(f"Description for `{name}` must be a string")
if not isinstance(required, bool):
raise TypeError(f"Required for `{name}` must be a bool")
tool_params.append({
"name": name,
"description": description,
"type": typ,
"required": required
})
tool_def = {
"name": tool_name,
"description": tool_description,
"parameters": tool_params
}
# print("[registered tool] " + pformat(tool_def))
_TOOL_HOOKS[tool_name] = func
_TOOL_DESCRIPTIONS[tool_name] = tool_def
return func
def dispatch_tool(tool_name: str, tool_params: dict) -> str:
if tool_name not in _TOOL_HOOKS:
return f"Tool `{tool_name}` not found. Please use a provided tool."
tool_call = _TOOL_HOOKS[tool_name]
try:
ret = tool_call(**tool_params)
except BaseException:
ret = traceback.format_exc()
return ret
def get_tools() -> dict:
return deepcopy(_TOOL_DESCRIPTIONS)
# Tool Definitions
# @register_tool
# def random_number_generator(
# seed: Annotated[int, 'The random seed used by the generator', True],
# range: Annotated[tuple[int, int], 'The range of the generated numbers', True],
# ) -> int:
# """
# Generates a random number x, s.t. range[0] <= x < range[1]
# """
# if not isinstance(seed, int):
# raise TypeError("Seed must be an integer")
# if not isinstance(range, tuple):
# raise TypeError("Range must be a tuple")
# if not isinstance(range[0], int) or not isinstance(range[1], int):
# raise TypeError("Range must be a tuple of integers")
# import random
# return random.Random(seed).randint(*range)
@register_tool
def get_weather(
city_name: Annotated[str, 'The name of the city to be queried', True],
) -> str:
"""
Get the current weather for `city_name`
"""
if not isinstance(city_name, str):
raise TypeError("City name must be a string")
key_selection = {
"current_condition": ["temp_C", "FeelsLikeC", "humidity", "weatherDesc", "observation_time"],
}
import requests
try:
resp = requests.get(f"https://wttr.in/{city_name}?format=j1")
resp.raise_for_status()
resp = resp.json()
ret = {k: {_v: resp[k][0][_v] for _v in v}
for k, v in key_selection.items()}
except BaseException:
import traceback
ret = "Error encountered while fetching weather data!\n" + traceback.format_exc()
return str(ret)
@register_tool
def get_confyui_image(prompt: Annotated[str, '要生成图片的提示词,注意必须是英文', True]) -> dict:
'''
生成图片
'''
with open("ai_providers/chatglm/base.json", "r", encoding="utf-8") as f:
data2 = json.load(f)
data2['prompt']['3']['inputs']['seed'] = ''.join(
random.sample('123456789012345678901234567890', 14))
# 模型名称
data2['prompt']['4']['inputs']['ckpt_name'] = 'chilloutmix_NiPrunedFp32Fix.safetensors'
data2['prompt']['6']['inputs']['text'] = prompt # 正向提示词
# data2['prompt']['7']['inputs']['text']='' #反向提示词
cfui = ComfyUIApi(server_address="127.0.0.1:8188") # 根据自己comfyUI地址修改
images = cfui.get_images(data2['prompt'])
return {'res': images[0]['image'], 'res_type': 'image', 'filename': images[0]['filename']}
@register_tool
def get_news() -> str:
'''
获取最新新闻
'''
news = News()
return news.get_important_news()
@register_tool
def get_time() -> str:
'''
获取当前日期,时间,农历日期,星期几
'''
time = datetime.now()
date2 = ZhDate.from_datetime(time)
week_list = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"]
return '{} {} {}'.format(time.strftime("%Y年%m月%d%H:%M:%S"), week_list[time.weekday()], '农历:' + date2.chinese())
if __name__ == "__main__":
print(dispatch_tool("get_weather", {"city_name": "beijing"}))
print(get_tools())