Merge pull request #2595 from zhayujie/feat-agent-plugin

feat: add agent plugin and optimize web channel
This commit is contained in:
zhayujie
2025-05-23 11:59:54 +08:00
committed by GitHub
14 changed files with 1410 additions and 333 deletions

3
.gitignore vendored
View File

@@ -15,6 +15,8 @@ plugins.json
itchat.pkl
*.log
logs/
workspace
config.yaml
user_datas.pkl
chatgpt_tool_hub/
plugins/**/
@@ -31,4 +33,5 @@ plugins/banwords/lib/__pycache__
!plugins/role
!plugins/keyword
!plugins/linkai
!plugins/agent
client_config.json

View File

@@ -1,7 +1,10 @@
# Web channel
使用SSEServer-Sent Events服务器推送事件实现提供了一个默认的网页。也可以自己实现加入api
# Web Channel
#使用方法
- 在配置文件中channel_type填入web即可
- 访问地址 http://localhost:9899/chat
- port可以在配置项 web_port中设置
提供了一个默认的AI对话页面可展示文本、图片等消息交互支持markdown语法渲染兼容插件执行。
# 使用说明
-`config.json` 配置文件中的 `channel_type` 字段填入 `web`
- 程序运行后将监听9899端口浏览器访问 http://localhost:9899/chat 即可使用
- 监听端口可以在配置文件 `web_port` 中自定义
- 对于Docker运行方式如果需要外部访问需要在 `docker-compose.yml` 中通过 ports配置将端口监听映射到宿主机

File diff suppressed because it is too large Load Diff

2
channel/web/static/axios.min.js vendored Normal file

File diff suppressed because one or more lines are too long

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.4 KiB

View File

@@ -2,6 +2,7 @@ import sys
import time
import web
import json
import uuid
from queue import Queue, Empty
from bridge.context import *
from bridge.reply import Reply, ReplyType
@@ -12,6 +13,8 @@ from common.singleton import singleton
from config import conf
import os
import mimetypes # 添加这行来处理MIME类型
import threading
import logging
class WebMessage(ChatMessage):
def __init__(
@@ -43,39 +46,54 @@ class WebChannel(ChatChannel):
def __init__(self):
super().__init__()
self.message_queues = {} # 为每个用户存储一个消息队列
self.msg_id_counter = 0 # 添加消息ID计数器
self.session_queues = {} # 存储session_id到队列的映射
self.request_to_session = {} # 存储request_id到session_id的映射
def _generate_msg_id(self):
"""生成唯一的消息ID"""
self.msg_id_counter += 1
return str(int(time.time())) + str(self.msg_id_counter)
def _generate_request_id(self):
"""生成唯一的请求ID"""
return str(uuid.uuid4())
def send(self, reply: Reply, context: Context):
try:
if reply.type in self.NOT_SUPPORT_REPLYTYPE:
logger.warning(f"Web channel doesn't support {reply.type} yet")
return
if reply.type == ReplyType.IMAGE_URL:
time.sleep(0.5)
# 获取请求ID和会话ID
request_id = context.get("request_id", None)
if not request_id:
logger.error("No request_id found in context, cannot send message")
return
# 获取用户ID
user_id = context.get("receiver", None)
if not user_id:
logger.error("No receiver found in context, cannot send message")
# 通过request_id获取session_id
session_id = self.request_to_session.get(request_id)
if not session_id:
logger.error(f"No session_id found for request {request_id}")
return
# 检查是否有响应队列
response_queue = context.get("response_queue", None)
if response_queue:
# 直接将响应放入队列
# 检查是否有会话队列
if session_id in self.session_queues:
# 创建响应数据包含请求ID以区分不同请求的响应
response_data = {
"type": str(reply.type),
"content": reply.content,
"timestamp": time.time()
"timestamp": time.time(),
"request_id": request_id
}
response_queue.put(response_data)
logger.debug(f"Response sent to queue for user {user_id}")
self.session_queues[session_id].put(response_data)
logger.debug(f"Response sent to queue for session {session_id}, request {request_id}")
else:
logger.warning(f"No response queue found for user {user_id}, response dropped")
logger.warning(f"No response queue found for session {session_id}, response dropped")
except Exception as e:
logger.error(f"Error in send method: {e}")
@@ -83,57 +101,83 @@ class WebChannel(ChatChannel):
def post_message(self):
"""
Handle incoming messages from users via POST request.
Returns a request_id for tracking this specific request.
"""
try:
data = web.data() # 获取原始POST数据
json_data = json.loads(data)
user_id = json_data.get('user_id', 'default_user')
prompt = json_data.get('message', '')
session_id = json_data.get('session_id', f'session_{int(time.time())}')
except json.JSONDecodeError:
return json.dumps({"status": "error", "message": "Invalid JSON"})
except Exception as e:
return json.dumps({"status": "error", "message": str(e)})
if not prompt:
return json.dumps({"status": "error", "message": "No message provided"})
prompt = json_data.get('message', '')
try:
msg_id = self._generate_msg_id()
web_message = WebMessage(
msg_id=msg_id,
content=prompt,
from_user_id=user_id,
to_user_id="Chatgpt",
other_user_id=user_id
)
# 生成请求ID
request_id = self._generate_request_id()
context = self._compose_context(ContextType.TEXT, prompt, msg=web_message)
if not context:
return json.dumps({"status": "error", "message": "Failed to process message"})
# 创建一个响应队列
response_queue = Queue()
# 将请求ID与会话ID关联
self.request_to_session[request_id] = session_id
# 确保上下文包含必要的信息
context["isgroup"] = False
context["receiver"] = user_id
context["session_id"] = user_id
context["response_queue"] = response_queue
# 发送消息到处理队列
self.produce(context)
# 确保会话队列存在
if session_id not in self.session_queues:
self.session_queues[session_id] = Queue()
# 等待响应最多等待30秒
try:
response = response_queue.get(timeout=30)
return json.dumps({"status": "success", "reply": response["content"]})
except Empty:
return json.dumps({"status": "error", "message": "Response timeout"})
# 创建消息对象
msg = WebMessage(self._generate_msg_id(), prompt)
msg.from_user_id = session_id # 使用会话ID作为用户ID
# 创建上下文
context = self._compose_context(ContextType.TEXT, prompt, msg=msg)
# 添加必要的字段
context["session_id"] = session_id
context["request_id"] = request_id
context["isgroup"] = False # 添加 isgroup 字段
context["receiver"] = session_id # 添加 receiver 字段
# 异步处理消息 - 只传递上下文
threading.Thread(target=self.produce, args=(context,)).start()
# 返回请求ID
return json.dumps({"status": "success", "request_id": request_id})
except Exception as e:
logger.error(f"Error processing message: {e}")
return json.dumps({"status": "error", "message": "Internal server error"})
return json.dumps({"status": "error", "message": str(e)})
def poll_response(self):
"""
Poll for responses using the session_id.
"""
try:
# 不记录轮询请求的日志
web.ctx.log_request = False
data = web.data()
json_data = json.loads(data)
session_id = json_data.get('session_id')
if not session_id or session_id not in self.session_queues:
return json.dumps({"status": "error", "message": "Invalid session ID"})
# 尝试从队列获取响应,不等待
try:
# 使用peek而不是get这样如果前端没有成功处理下次还能获取到
response = self.session_queues[session_id].get(block=False)
# 返回响应包含请求ID以区分不同请求
return json.dumps({
"status": "success",
"has_content": True,
"content": response["content"],
"request_id": response["request_id"],
"timestamp": response["timestamp"]
})
except Empty:
# 没有新响应
return json.dumps({"status": "success", "has_content": False})
except Exception as e:
logger.error(f"Error polling response: {e}")
return json.dumps({"status": "error", "message": str(e)})
def chat_page(self):
"""Serve the chat HTML page."""
@@ -151,13 +195,34 @@ class WebChannel(ChatChannel):
logger.info(f"Created static directory: {static_dir}")
urls = (
'/', 'RootHandler', # 添加根路径处理器
'/message', 'MessageHandler',
'/poll', 'PollHandler', # 添加轮询处理器
'/chat', 'ChatHandler',
'/assets/(.*)', 'AssetsHandler', # 匹配 /assets/任何路径
)
port = conf().get("web_port", 9899)
app = web.application(urls, globals(), autoreload=False)
web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port))
# 禁用web.py的默认日志输出
import io
from contextlib import redirect_stdout
# 配置web.py的日志级别为ERROR只显示错误
logging.getLogger("web").setLevel(logging.ERROR)
# 禁用web.httpserver的日志
logging.getLogger("web.httpserver").setLevel(logging.ERROR)
# 临时重定向标准输出捕获web.py的启动消息
with redirect_stdout(io.StringIO()):
web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port))
class RootHandler:
def GET(self):
# 重定向到/chat
raise web.seeother('/chat')
class MessageHandler:
@@ -165,6 +230,11 @@ class MessageHandler:
return WebChannel().post_message()
class PollHandler:
def POST(self):
return WebChannel().poll_response()
class ChatHandler:
def GET(self):
# 正常返回聊天页面
@@ -185,11 +255,6 @@ class AssetsHandler:
current_dir = os.path.dirname(os.path.abspath(__file__))
static_dir = os.path.join(current_dir, 'static')
# 打印调试信息
logger.info(f"Current directory: {current_dir}")
logger.info(f"Static directory: {static_dir}")
logger.info(f"Requested file: {file_path}")
full_path = os.path.normpath(os.path.join(static_dir, file_path))
# 安全检查确保请求的文件在static目录内

View File

@@ -1,5 +1,5 @@
{
"channel_type": "wx",
"channel_type": "web",
"model": "",
"open_ai_api_key": "YOUR API KEY",
"claude_api_key": "YOUR API KEY",

66
plugins/agent/README.md Normal file
View File

@@ -0,0 +1,66 @@
# Agent插件
## 插件说明
基于 [AgentMesh](https://github.com/MinimalFuture/AgentMesh) 多智能体框架实现的Agent插件可以让机器人快速获得Agent能力通过自然语言对话来访问 **终端、浏览器、文件系统、搜索引擎** 等各类工具。
同时还支持通过 **多智能体协作** 来完成复杂任务,例如多智能体任务分发、多智能体问题讨论、协同处理等。
AgentMesh项目地址https://github.com/MinimalFuture/AgentMesh
## 安装
1. 确保已安装依赖:
```bash
pip install agentmesh-sdk>=0.1.2
```
2. 如需使用浏览器工具,还需安装:
```bash
pip install browser-use>=0.1.40
playwright install
```
## 配置
插件配置文件是 `plugins/agent`目录下的 `config.yaml`,包含智能体团队的配置以及工具的配置,可以从模板文件 `config-template.yaml`中复制:
```bash
cp config-template.yaml config.yaml
```
说明:
- `team`配置是默认选中的 agent team
- `teams` 下是Agent团队配置团队的model默认为`gpt-4.1-mini`,可根据需要进行修改,模型对应的 `api_key` 需要在项目根目录的 `config.json` 全局配置中进行配置。例如openai模型需要配置 `open_ai_api_key`
- 支持为 `agents` 下面的每个agent添加model字段来设置不同的模型
## 使用方法
在对机器人发送的消息中使用 `$agent` 前缀来触发插件,支持以下命令:
- `$agent [task]`: 使用默认团队执行任务 (默认团队可通 config.yaml 中的team配置修改)
- `$agent teams`: 列出可用的团队
- `$agent use [team_name] [task]`: 使用指定的团队执行任务
### 示例
```bash
$agent 帮我查看当前目录下有哪些文件夹
$agent teams
$agent use software_team 帮我写一个产品预约体验的表单页面
```
## 工具支持
目前支持多种内置工具,包括但不限于:
- `calculator`: 数学计算工具
- `current_time`: 获取当前时间
- `browser`: 浏览器操作工具,注意需安装额外依赖
- `google_search`: 搜索引擎,注意需在`config.yaml`中配置api_key
- `file_save`: 文件保存工具
- `terminal`: 终端命令执行工具

View File

@@ -0,0 +1,3 @@
from .agent import AgentPlugin
__all__ = ["AgentPlugin"]

282
plugins/agent/agent.py Normal file
View File

@@ -0,0 +1,282 @@
import os
import yaml
from typing import Dict, List, Optional
from agentmesh import AgentTeam, Agent, LLMModel
from agentmesh.models import ClaudeModel
from agentmesh.tools import ToolManager
from config import conf
import plugins
from plugins import Plugin, Event, EventContext, EventAction
from bridge.context import ContextType
from bridge.reply import Reply, ReplyType
from common.log import logger
@plugins.register(
name="agent",
desc="Use AgentMesh framework to process tasks with multi-agent teams",
version="0.1.0",
author="Saboteur7",
desire_priority=1,
)
class AgentPlugin(Plugin):
"""Plugin for integrating AgentMesh framework."""
def __init__(self):
super().__init__()
self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context
self.name = "agent"
self.description = "Use AgentMesh framework to process tasks with multi-agent teams"
self.config = self._load_config()
self.tool_manager = ToolManager()
self.tool_manager.load_tools(config_dict=self.config.get("tools"))
logger.info("[agent] inited")
def _load_config(self) -> Dict:
"""Load configuration from config.yaml file."""
config_path = os.path.join(self.path, "config.yaml")
if not os.path.exists(config_path):
logger.warning(f"Config file not found at {config_path}")
return {}
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def get_help_text(self, verbose=False, **kwargs):
"""Return help message for the agent plugin."""
help_text = "通过AgentMesh实现对终端、浏览器、文件系统、搜索引擎等工具的执行并支持多智能体协作。"
trigger_prefix = conf().get("plugin_trigger_prefix", "$")
if not verbose:
return help_text
teams = self.get_available_teams()
teams_str = ", ".join(teams) if teams else "未配置任何团队"
help_text += "\n\n使用说明:\n"
help_text += f"{trigger_prefix}agent [task] - 使用默认团队执行任务\n"
help_text += f"{trigger_prefix}agent teams - 列出可用的团队\n"
help_text += f"{trigger_prefix}agent use [team_name] [task] - 使用特定团队执行任务\n\n"
help_text += f"可用团队: \n{teams_str}\n\n"
help_text += f"示例:\n"
help_text += f"{trigger_prefix}agent 帮我查看当前文件夹路径\n"
help_text += f"{trigger_prefix}agent use software_team 帮我写一个产品预约体验的表单页面"
return help_text
def get_available_teams(self) -> List[str]:
"""Get list of available teams from configuration."""
teams_config = self.config.get("teams", {})
return list(teams_config.keys())
def create_team_from_config(self, team_name: str) -> Optional[AgentTeam]:
"""Create a team from configuration."""
# Get teams configuration
teams_config = self.config.get("teams", {})
# Check if the specified team exists
if team_name not in teams_config:
logger.error(f"Team '{team_name}' not found in configuration.")
available_teams = list(teams_config.keys())
logger.info(f"Available teams: {', '.join(available_teams)}")
return None
# Get team configuration
team_config = teams_config[team_name]
# Get team's model
team_model_name = team_config.get("model", "gpt-4.1-mini")
team_model = self.create_llm_model(team_model_name)
# Get team's max_steps (default to 20 if not specified)
team_max_steps = team_config.get("max_steps", 20)
# Create team with the model
team = AgentTeam(
name=team_name,
description=team_config.get("description", ""),
rule=team_config.get("rule", ""),
model=team_model,
max_steps=team_max_steps
)
# Create and add agents to the team
agents_config = team_config.get("agents", [])
for agent_config in agents_config:
# Check if agent has a specific model
if agent_config.get("model"):
agent_model = self.create_llm_model(agent_config.get("model"))
else:
agent_model = team_model
# Get agent's max_steps
agent_max_steps = agent_config.get("max_steps")
agent = Agent(
name=agent_config.get("name", ""),
system_prompt=agent_config.get("system_prompt", ""),
model=agent_model, # Use agent's model if specified, otherwise will use team's model
description=agent_config.get("description", ""),
max_steps=agent_max_steps
)
# Add tools to the agent if specified
tool_names = agent_config.get("tools", [])
for tool_name in tool_names:
tool = self.tool_manager.create_tool(tool_name)
if tool:
agent.add_tool(tool)
else:
if tool_name == "browser":
logger.warning(
"Tool 'Browser' loaded failed, "
"please install the required dependency with: \n"
"'pip install browser-use>=0.1.40' or 'pip install agentmesh-sdk[full]'\n"
)
else:
logger.warning(f"Tool '{tool_name}' not found for agent '{agent.name}'\n")
# Add agent to team
team.add(agent)
return team
def on_handle_context(self, e_context: EventContext):
"""Handle the message context."""
if e_context['context'].type != ContextType.TEXT:
return
content = e_context['context'].content
trigger_prefix = conf().get("plugin_trigger_prefix", "$")
if not content.startswith(f"{trigger_prefix}agent "):
e_context.action = EventAction.CONTINUE
return
if not self.config:
reply = Reply()
reply.type = ReplyType.ERROR
reply.content = "未找到插件配置,请在 plugins/agent 目录下创建 config.yaml 配置文件,可根据 config-template.yml 模板文件复制"
e_context['reply'] = reply
e_context.action = EventAction.BREAK_PASS
return
# Extract the actual task
task = content[len(f"{trigger_prefix}agent "):].strip()
# If task is empty, return help message
if not task:
reply = Reply()
reply.type = ReplyType.TEXT
reply.content = self.get_help_text(verbose=True)
e_context['reply'] = reply
e_context.action = EventAction.BREAK_PASS
return
# Check if task is asking for available teams
if task.lower() in ["teams", "list teams", "show teams"]:
teams = self.get_available_teams()
reply = Reply()
reply.type = ReplyType.TEXT
if not teams:
reply.content = "未配置任何团队。请检查 config.yaml 文件。"
else:
reply.content = f"可用团队: {', '.join(teams)}"
e_context['reply'] = reply
e_context.action = EventAction.BREAK_PASS
return
# Check if task specifies a team
team_name = None
if task.startswith("use "):
parts = task[4:].split(" ", 1)
if len(parts) > 0:
team_name = parts[0]
if len(parts) > 1:
task = parts[1].strip()
else:
reply = Reply()
reply.type = ReplyType.TEXT
reply.content = f"已选择团队 '{team_name}'。请输入您想执行的任务。"
e_context['reply'] = reply
e_context.action = EventAction.BREAK_PASS
return
if not team_name:
team_name = self.config.get("team")
# If no team specified, use default or first available
if not team_name:
teams = self.configself.get_available_teams()
if not teams:
reply = Reply()
reply.type = ReplyType.TEXT
reply.content = "未配置任何团队。请检查 config.yaml 文件。"
e_context['reply'] = reply
e_context.action = EventAction.BREAK_PASS
return
team_name = teams[0]
# Create team
team = self.create_team_from_config(team_name)
if not team:
reply = Reply()
reply.type = ReplyType.TEXT
reply.content = f"创建团队 '{team_name}' 失败。请检查配置。"
e_context['reply'] = reply
e_context.action = EventAction.BREAK_PASS
return
# Run the task
try:
logger.info(f"[agent] Running task '{task}' with team '{team_name}', team_model={team.model.model}")
result = team.run_async(task=task)
for agent_result in result:
res_text = f"🤖 {agent_result.get('agent_name')}\n\n{agent_result.get('final_answer')}"
_send_text(e_context, content=res_text)
reply = Reply()
reply.type = ReplyType.TEXT
reply.content = ""
e_context['reply'] = reply
e_context.action = EventAction.BREAK_PASS
except Exception as e:
logger.exception(f"Error running task with team '{team_name}'")
reply = Reply()
reply.type = ReplyType.ERROR
reply.content = f"执行任务时出错: {str(e)}"
e_context['reply'] = reply
e_context.action = EventAction.BREAK_PASS
return
def create_llm_model(self, model_name) -> LLMModel:
if conf().get("use_linkai"):
api_base = "https://api.link-ai.tech/v1"
api_key = conf().get("linkai_api_key")
elif model_name.startswith(("gpt", "text-davinci", "o1", "o3")):
api_base = conf().get("open_ai_api_base") or "https://api.openai.com/v1"
api_key = conf().get("open_ai_api_key")
elif model_name.startswith("claude"):
return ClaudeModel(model=model_name, api_key=conf().get("claude_api_key"))
elif model_name.startswith("moonshot"):
api_base = "https://api.moonshot.cn/v1"
api_key = conf().get("moonshot_api_key")
elif model_name.startswith("qwen"):
api_base = "https://dashscope.aliyuncs.com/compatible-mode/v1"
api_key = conf().get("dashscope_api_key")
else:
api_base = conf().get("open_ai_api_base") or "https://api.openai.com/v1"
api_key = conf().get("open_ai_api_key")
llm_model = LLMModel(model=model_name, api_key=api_key, api_base=api_base)
return llm_model
def _send_text(e_context: EventContext, content: str):
reply = Reply(ReplyType.TEXT, content)
channel = e_context["channel"]
channel.send(reply, e_context["context"])

View File

@@ -0,0 +1,50 @@
# 默认选中的Agent Team名称
team: general_team
tools:
google_search:
# get your apikey from https://serper.dev/
api_key: "YOUR API KEY"
# Team config
teams:
general_team:
model: "gpt-4.1-mini" # 团队使用的模型
description: "A versatile research and information agent team"
max_steps: 5
agents:
- name: "通用智能助手"
description: "Universal assistant specializing in research, information synthesis, and task execution"
system_prompt: "You are a versatile assistant who answers questions and completes tasks using available tools. Reply in a clearly structured, attractive and easy to read format."
# Agent 支持使用的工具
tools:
- time
- calculator
- google_search
- browser
- terminal
software_team:
model: "gpt-4.1-mini"
description: "A software development team with product manager, developer and tester."
rule: "A normal R&D process should be that Product Manager writes PRD, Developer writes code based on PRD, and Finally, Tester performs testing."
max_steps: 10
agents:
- name: "Product-Manager"
description: "Responsible for product requirements and documentation"
system_prompt: "You are an experienced product manager who creates concise PRDs, focusing on user needs and feature specifications. You always format your responses in Markdown."
tools:
- time
- file_save
- name: "Developer"
description: "Implements code based on PRD"
system_prompt: "You are a skilled developer. When developing web application, you creates single-page website based on user needs, you deliver HTML files with embedded JavaScript and CSS that are visually appealing, responsive, and user-friendly, featuring a grand layout and beautiful background. The HTML, CSS, and JavaScript code should be well-structured and effectively organized."
tools:
- file_save
- name: "Tester"
description: "Tests code and verifies functionality"
system_prompt: "You are a tester who validates code against requirements. For HTML applications, use browser tools to test functionality. For Python or other client-side applications, use the terminal tool to run and test. You only need to test a few core cases."
tools:
- file_save
- browser
- terminal

View File

@@ -155,7 +155,7 @@ def get_help_text(isadmin, isgroup):
for plugin in plugins:
if plugins[plugin].enabled and not plugins[plugin].hidden:
namecn = plugins[plugin].namecn
help_text += "\n%s:" % namecn
help_text += "\n%s: " % namecn
help_text += PluginManager().instances[plugin].get_help_text(verbose=False).strip()
if ADMIN_COMMANDS and isadmin:

View File

@@ -8,4 +8,4 @@ Pillow
pre-commit
web.py
linkai>=0.0.6.0
agentmesh-sdk>=0.1.2

278
simple_login_form_test.html Normal file
View File

@@ -0,0 +1,278 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>登录</title>
<style>
/* Reset and base */
* {
box-sizing: border-box;
}
body, html {
margin: 0; padding: 0; height: 100%;
font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif;
background: linear-gradient(135deg, #667eea, #764ba2);
display: flex;
justify-content: center;
align-items: center;
}
.login-container {
background: rgba(255, 255, 255, 0.95);
padding: 2.5rem 3rem;
border-radius: 12px;
box-shadow: 0 8px 24px rgba(0,0,0,0.15);
width: 100%;
max-width: 400px;
}
h2 {
margin-bottom: 1.5rem;
color: #333;
text-align: center;
}
form {
display: flex;
flex-direction: column;
}
label {
font-weight: 600;
margin-bottom: 0.4rem;
color: #444;
}
input[type="text"],
input[type="email"],
input[type="password"] {
padding: 0.6rem 0.8rem;
font-size: 1rem;
border: 1.8px solid #ccc;
border-radius: 6px;
transition: border-color 0.3s ease;
outline-offset: 2px;
}
input[type="text"]:focus,
input[type="email"]:focus,
input[type="password"]:focus {
border-color: #667eea;
}
.password-wrapper {
position: relative;
display: flex;
align-items: center;
}
.toggle-password {
position: absolute;
right: 0.8rem;
background: none;
border: none;
cursor: pointer;
font-size: 1rem;
color: #667eea;
user-select: none;
}
.login-button {
margin-top: 1.5rem;
padding: 0.75rem;
font-size: 1.1rem;
font-weight: 700;
background-color: #667eea;
color: white;
border: none;
border-radius: 8px;
cursor: pointer;
transition: background-color 0.3s ease;
}
.login-button:disabled {
background-color: #a3a9f7;
cursor: not-allowed;
}
.forgot-password {
margin-top: 1rem;
text-align: right;
}
.forgot-password a {
color: #667eea;
text-decoration: none;
font-size: 0.9rem;
}
.forgot-password a:hover {
text-decoration: underline;
}
.error-message {
margin-top: 1rem;
color: #d93025;
font-weight: 600;
text-align: center;
}
.loading-spinner {
border: 3px solid #f3f3f3;
border-top: 3px solid #667eea;
border-radius: 50%;
width: 20px;
height: 20px;
animation: spin 1s linear infinite;
display: inline-block;
vertical-align: middle;
margin-left: 8px;
}
@keyframes spin {
0% { transform: rotate(0deg);}
100% { transform: rotate(360deg);}
}
/* Responsive */
@media (max-width: 480px) {
.login-container {
margin: 1rem;
padding: 2rem 1.5rem;
}
}
</style>
</head>
<body>
<div class="login-container" role="main" aria-label="登录表单">
<h2>用户登录</h2>
<form id="loginForm" novalidate>
<label for="usernameEmail">用户名或邮箱</label>
<input type="text" id="usernameEmail" name="usernameEmail" autocomplete="username" placeholder="请输入用户名或邮箱" required aria-describedby="usernameEmailError" />
<div id="usernameEmailError" class="error-message" aria-live="polite"></div>
<label for="password" style="margin-top:1rem;">密码</label>
<div class="password-wrapper">
<input type="password" id="password" name="password" autocomplete="current-password" placeholder="请输入密码" required minlength="6" aria-describedby="passwordError" />
<button type="button" class="toggle-password" aria-label="切换密码可见性" title="切换密码可见性">👁️</button>
</div>
<div id="passwordError" class="error-message" aria-live="polite"></div>
<button type="submit" id="loginButton" class="login-button" disabled>登录</button>
<div class="forgot-password">
<a href="/forgot-password.html" target="_blank" rel="noopener noreferrer">忘记密码?</a>
</div>
<div id="submitError" class="error-message" aria-live="polite"></div>
</form>
</div>
<script>
(function(){
const usernameEmailInput = document.getElementById('usernameEmail');
const passwordInput = document.getElementById('password');
const loginButton = document.getElementById('loginButton');
const usernameEmailError = document.getElementById('usernameEmailError');
const passwordError = document.getElementById('passwordError');
const submitError = document.getElementById('submitError');
const togglePasswordBtn = document.querySelector('.toggle-password');
const form = document.getElementById('loginForm');
// 校验用户名或邮箱格式
function validateUsernameEmail(value) {
if (!value.trim()) {
return "用户名或邮箱不能为空";
}
// 简单邮箱正则
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
// 用户名规则允许字母数字下划线长度3-20
const usernameRegex = /^[a-zA-Z0-9_]{3,20}$/;
if (emailRegex.test(value)) {
return "";
} else if (usernameRegex.test(value)) {
return "";
} else {
return "请输入有效的用户名或邮箱格式";
}
}
// 校验密码格式
function validatePassword(value) {
if (!value) {
return "密码不能为空";
}
if (value.length < 6) {
return "密码长度不能少于6位";
}
return "";
}
// 实时校验并更新错误提示和按钮状态
function validateForm() {
const usernameEmailVal = usernameEmailInput.value;
const passwordVal = passwordInput.value;
const usernameEmailErrMsg = validateUsernameEmail(usernameEmailVal);
const passwordErrMsg = validatePassword(passwordVal);
usernameEmailError.textContent = usernameEmailErrMsg;
passwordError.textContent = passwordErrMsg;
submitError.textContent = "";
const isValid = !usernameEmailErrMsg && !passwordErrMsg;
loginButton.disabled = !isValid;
return isValid;
}
// 密码可见切换
togglePasswordBtn.addEventListener('click', () => {
if (passwordInput.type === 'password') {
passwordInput.type = 'text';
togglePasswordBtn.textContent = '🙈';
togglePasswordBtn.setAttribute('aria-label', '隐藏密码');
togglePasswordBtn.setAttribute('title', '隐藏密码');
} else {
passwordInput.type = 'password';
togglePasswordBtn.textContent = '👁️';
togglePasswordBtn.setAttribute('aria-label', '显示密码');
togglePasswordBtn.setAttribute('title', '显示密码');
}
});
// 监听输入事件实时校验
usernameEmailInput.addEventListener('input', validateForm);
passwordInput.addEventListener('input', validateForm);
// 模拟登录请求
function fakeLoginRequest(data) {
return new Promise((resolve, reject) => {
setTimeout(() => {
// 模拟用户名/邮箱为 "user" 或 "user@example.com" 且密码为 "password123" 才成功
const validUsers = ["user", "user@example.com"];
if (validUsers.includes(data.usernameEmail.toLowerCase()) && data.password === "password123") {
resolve();
} else {
reject(new Error("用户名或密码错误"));
}
}, 1500);
});
}
// 表单提交处理
form.addEventListener('submit', async (e) => {
e.preventDefault();
if (!validateForm()) return;
loginButton.disabled = true;
const originalText = loginButton.textContent;
loginButton.textContent = "登录中";
const spinner = document.createElement('span');
spinner.className = 'loading-spinner';
loginButton.appendChild(spinner);
submitError.textContent = "";
try {
await fakeLoginRequest({
usernameEmail: usernameEmailInput.value.trim(),
password: passwordInput.value
});
// 登录成功跳转此处用alert模拟
alert("登录成功,跳转到用户主页");
// window.location.href = "/user-home.html"; // 实际跳转
} catch (err) {
submitError.textContent = err.message;
} finally {
loginButton.disabled = false;
loginButton.textContent = originalText;
}
});
// 页面加载时校验一次,防止缓存值导致按钮状态异常
validateForm();
})();
<\/script>
<\/body>
<\/html>