Files
Bubbles/commands/message_forwarder.py

166 lines
5.6 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Sequence
from commands.context import MessageContext
@dataclass
class ForwardRule:
source_room_id: str
target_room_ids: List[str]
keywords: List[str]
def matches(self, haystacks: Sequence[str]) -> bool:
"""Check whether any keyword is contained in the provided text pool."""
if not self.keywords:
return False
for keyword in self.keywords:
if not keyword:
continue
for item in haystacks:
if keyword in item:
return True
return False
class MessageForwarder:
"""Forward specific group messages to other groups based on keywords."""
def __init__(self, robot: Any, config: Dict[str, Any], logger: Any) -> None:
self.robot = robot
self.logger = logger
config = config if isinstance(config, dict) else {}
self.enabled = bool(config.get("enable"))
self.rules: List[ForwardRule] = (
self._build_rules(config.get("rules", [])) if self.enabled else []
)
self.rules_by_room: Dict[str, List[ForwardRule]] = {}
for rule in self.rules:
self.rules_by_room.setdefault(rule.source_room_id, []).append(rule)
if self.enabled and not self.rules:
# 没有有效规则就视为未启用,避免无意义的检查
self.enabled = False
if self.logger:
self.logger.warning("消息转发已启用,但未检测到有效规则,功能自动关闭。")
def forward_if_needed(self, ctx: MessageContext) -> bool:
"""Forward the message when it matches a rule."""
if (
not self.enabled
or not ctx.is_group
or not ctx.msg
or ctx.msg.from_self()
or getattr(ctx, "has_quote_reference", False)
):
return False
room_id = ctx.msg.roomid
candidate_rules = self.rules_by_room.get(room_id)
if not candidate_rules:
return False
haystacks = self._build_haystacks(ctx)
if not haystacks:
return False
payload = self._extract_forward_payload(ctx)
if not payload:
return False
triggered = False
for rule in candidate_rules:
if not rule.matches(haystacks):
continue
triggered = True
self._forward(rule, ctx, payload)
return triggered
def _build_rules(self, raw_rules: Sequence[Dict[str, Any]]) -> List[ForwardRule]:
rules: List[ForwardRule] = []
for raw in raw_rules or []:
if not isinstance(raw, dict):
continue
source = raw.get("source_room_id") or raw.get("source")
targets = raw.get("target_room_ids") or raw.get("target_room_id") or raw.get(
"target"
)
keywords = raw.get("keywords") or raw.get("keyword")
normalized_targets = self._normalize_str_list(targets)
normalized_keywords = self._normalize_str_list(keywords)
if not source or not normalized_targets or not normalized_keywords:
if self.logger:
self.logger.warning(
f"忽略无效的消息转发配置: source={source}, targets={targets}, keywords={keywords}"
)
continue
rules.append(
ForwardRule(
source_room_id=str(source).strip(),
target_room_ids=normalized_targets,
keywords=normalized_keywords,
)
)
return rules
@staticmethod
def _normalize_str_list(value: Any) -> List[str]:
if isinstance(value, str):
cleaned = value.strip()
return [cleaned] if cleaned else []
if isinstance(value, (list, tuple, set)):
result = []
for item in value:
if not isinstance(item, str):
continue
cleaned = item.strip()
if cleaned:
result.append(cleaned)
return result
return []
def _build_haystacks(self, ctx: MessageContext) -> List[str]:
haystacks: List[str] = []
raw_content = getattr(ctx.msg, "content", None)
if isinstance(raw_content, str) and raw_content:
haystacks.append(raw_content)
if isinstance(ctx.text, str) and ctx.text:
haystacks.append(ctx.text)
return haystacks
def _extract_forward_payload(self, ctx: MessageContext) -> str:
msg_type = getattr(ctx.msg, "type", None)
if msg_type == 49 and ctx.text:
return ctx.text
raw_content = getattr(ctx.msg, "content", "")
if isinstance(raw_content, str) and raw_content:
return raw_content
if isinstance(ctx.text, str):
return ctx.text
return ""
def _forward(self, rule: ForwardRule, ctx: MessageContext, payload: str) -> None:
for target_id in rule.target_room_ids:
try:
self.robot.sendTextMsg(payload, target_id)
if self.logger:
self.logger.info(
f"已将群 {rule.source_room_id} 的关键词消息转发至 {target_id}"
)
except Exception as exc:
if self.logger:
self.logger.error(
f"转发消息到 {target_id} 失败: {exc}",
exc_info=True,
)