mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-02-13 10:59:17 +08:00
92 lines
3.4 KiB
Python
92 lines
3.4 KiB
Python
import requests
|
|
from config import conf
|
|
from common.log import logger
|
|
import os
|
|
|
|
|
|
class LinkSummary:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def summary_file(self, file_path: str):
|
|
file_body = {
|
|
"file": open(file_path, "rb"),
|
|
"name": file_path.split("/")[-1],
|
|
}
|
|
res = requests.post(url=self.base_url() + "/v1/summary/file", headers=self.headers(), files=file_body, timeout=(5, 180))
|
|
return self._parse_summary_res(res)
|
|
|
|
def summary_url(self, url: str):
|
|
body = {
|
|
"url": url
|
|
}
|
|
res = requests.post(url=self.base_url() + "/v1/summary/url", headers=self.headers(), json=body, timeout=(5, 180))
|
|
return self._parse_summary_res(res)
|
|
|
|
def summary_chat(self, summary_id: str):
|
|
body = {
|
|
"summary_id": summary_id
|
|
}
|
|
res = requests.post(url=self.base_url() + "/v1/summary/chat", headers=self.headers(), json=body, timeout=(5, 180))
|
|
if res.status_code == 200:
|
|
res = res.json()
|
|
logger.debug(f"[LinkSum] chat open, res={res}")
|
|
if res.get("code") == 200:
|
|
data = res.get("data")
|
|
return {
|
|
"questions": data.get("questions"),
|
|
"file_id": data.get("file_id")
|
|
}
|
|
else:
|
|
res_json = res.json()
|
|
logger.error(f"[LinkSum] summary error, status_code={res.status_code}, msg={res_json.get('message')}")
|
|
return None
|
|
|
|
def _parse_summary_res(self, res):
|
|
if res.status_code == 200:
|
|
res = res.json()
|
|
logger.debug(f"[LinkSum] url summary, res={res}")
|
|
if res.get("code") == 200:
|
|
data = res.get("data")
|
|
return {
|
|
"summary": data.get("summary"),
|
|
"summary_id": data.get("summary_id")
|
|
}
|
|
else:
|
|
res_json = res.json()
|
|
logger.error(f"[LinkSum] summary error, status_code={res.status_code}, msg={res_json.get('message')}")
|
|
return None
|
|
|
|
def base_url(self):
|
|
return conf().get("linkai_api_base", "https://api.link-ai.chat")
|
|
|
|
def headers(self):
|
|
return {"Authorization": "Bearer " + conf().get("linkai_api_key")}
|
|
|
|
def check_file(self, file_path: str, sum_config: dict) -> bool:
|
|
file_size = os.path.getsize(file_path) // 1000
|
|
with open(file_path, 'r') as f:
|
|
content = f.read()
|
|
word_count = len(content)
|
|
|
|
if (sum_config.get("max_file_size") and file_size > sum_config.get("max_file_size")) or file_size > 15000\
|
|
or (sum_config.get("max_summary_words") and word_count > sum_config.get("max_summary_words")):
|
|
logger.warn(f"[LinkSum] file size exceeds limit, No processing, file_size={file_size}KB, word_count={word_count}")
|
|
return True
|
|
|
|
suffix = file_path.split(".")[-1]
|
|
support_list = ["txt", "csv", "docx", "pdf", "md"]
|
|
if suffix not in support_list:
|
|
logger.warn(f"[LinkSum] unsupported file, suffix={suffix}, support_list={support_list}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def check_url(self, url: str):
|
|
support_list = ["mp.weixin.qq.com"]
|
|
for support_url in support_list:
|
|
if support_url in url:
|
|
return True
|
|
logger.warn("[LinkSum] unsupported url")
|
|
return False
|