diff --git a/agent/protocol/agent_stream.py b/agent/protocol/agent_stream.py index 530bc88..8730fd3 100644 --- a/agent/protocol/agent_stream.py +++ b/agent/protocol/agent_stream.py @@ -105,8 +105,8 @@ class AgentStreamExecutor: else: break # Different tool or args, stop counting - # Stop at 3 consecutive calls with same args (whether success or failure) - if same_args_calls >= 3: + # Stop at 5 consecutive calls with same args (whether success or failure) + if same_args_calls >= 5: return True, f"工具 '{tool_name}' 使用相同参数已被调用 {same_args_calls} 次,停止执行以防止无限循环。如果需要查看配置,结果已在之前的调用中返回。", False # Count consecutive failures for same tool + args @@ -370,8 +370,8 @@ class AgentStreamExecutor: if name == tool_name and ahash == args_hash and success: recent_success_count += 1 - # If tool was called successfully 2+ times, add hint to stop loop - if recent_success_count >= 2: + # If tool was called successfully 3+ times with same args, add hint to stop loop + if recent_success_count >= 3: logger.warning( f"⚠️ Detected potential loop: '{tool_name}' called {recent_success_count} times " f"with same args. Adding hint to LLM to provide final response." diff --git a/bridge/bridge.py b/bridge/bridge.py index 9978cbe..43616d2 100644 --- a/bridge/bridge.py +++ b/bridge/bridge.py @@ -55,7 +55,8 @@ class Bridge(object): if model_type in [const.MODELSCOPE]: self.btype["chat"] = const.MODELSCOPE - if model_type in ["abab6.5-chat"]: + # MiniMax models + if model_type and (model_type in ["abab6.5-chat", "abab6.5"] or model_type.lower().startswith("minimax")): self.btype["chat"] = const.MiniMax if conf().get("use_linkai") and conf().get("linkai_api_key"): diff --git a/config-template.json b/config-template.json index 066f0bf..0c77d2b 100644 --- a/config-template.json +++ b/config-template.json @@ -8,6 +8,7 @@ "gemini_api_key": "", "gemini_api_base": "https://generativelanguage.googleapis.com", "zhipu_ai_api_key": "", + "minimax_api_key": "", "dashscope_api_key": "", "voice_to_text": "openai", "text_to_voice": "openai", @@ -17,7 +18,7 @@ "use_linkai": false, "linkai_api_key": "", "linkai_app_code": "", - "feishu_bot_name": "CowAgent", + "feishu_bot_name": "", "feishu_app_id": "", "feishu_app_secret": "", "dingtalk_client_id": "", diff --git a/models/minimax/minimax_bot.py b/models/minimax/minimax_bot.py index 53265fb..e0fc34d 100644 --- a/models/minimax/minimax_bot.py +++ b/models/minimax/minimax_bot.py @@ -1,9 +1,9 @@ # encoding:utf-8 import time +import json +import requests -import openai -import openai.error from models.bot import Bot from models.minimax.minimax_session import MinimaxSession from models.session_manager import SessionManager @@ -11,41 +11,36 @@ from bridge.context import Context, ContextType from bridge.reply import Reply, ReplyType from common.log import logger from config import conf, load_config -from models.chatgpt.chat_gpt_session import ChatGPTSession -import requests from common import const -# ZhipuAI对话模型API +# MiniMax对话模型API class MinimaxBot(Bot): def __init__(self): super().__init__() self.args = { - "model": conf().get("model") or "abab6.5", # 对话模型的名称 - "temperature": conf().get("temperature", 0.3), # 如果设置,值域须为 [0, 1] 我们推荐 0.3,以达到较合适的效果。 - "top_p": conf().get("top_p", 0.95), # 使用默认值 - } - self.api_key = conf().get("Minimax_api_key") - self.group_id = conf().get("Minimax_group_id") - self.base_url = conf().get("Minimax_base_url", f"https://api.minimax.chat/v1/text/chatcompletion_pro?GroupId={self.group_id}") - # tokens_to_generate/bot_setting/reply_constraints可自行修改 - self.request_body = { - "model": self.args["model"], - "tokens_to_generate": 2048, - "reply_constraints": {"sender_type": "BOT", "sender_name": "MM智能助理"}, - "messages": [], - "bot_setting": [ - { - "bot_name": "MM智能助理", - "content": "MM智能助理是一款由MiniMax自研的,没有调用其他产品的接口的大型语言模型。MiniMax是一家中国科技公司,一直致力于进行大模型相关的研究。", - } - ], + "model": conf().get("model") or "MiniMax-M2.1", + "temperature": conf().get("temperature", 0.3), + "top_p": conf().get("top_p", 0.95), } + # Use unified key name: minimax_api_key + self.api_key = conf().get("minimax_api_key") + if not self.api_key: + # Fallback to old key name for backward compatibility + self.api_key = conf().get("Minimax_api_key") + if self.api_key: + logger.warning("[MINIMAX] 'Minimax_api_key' is deprecated, please use 'minimax_api_key' instead") + + # REST API endpoint + # Use Chinese endpoint by default, users can override in config + # International users should set: "minimax_api_base": "https://api.minimax.io/v1" + self.api_base = conf().get("minimax_api_base", "https://api.minimaxi.com/v1") + self.sessions = SessionManager(MinimaxSession, model=const.MiniMax) def reply(self, query, context: Context = None) -> Reply: # acquire reply content - logger.info("[Minimax_AI] query={}".format(query)) + logger.info("[MINIMAX] query={}".format(query)) if context.type == ContextType.TEXT: session_id = context["session_id"] reply = None @@ -62,19 +57,16 @@ class MinimaxBot(Bot): if reply: return reply session = self.sessions.session_query(query, session_id) - logger.debug("[Minimax_AI] session query={}".format(session)) + logger.debug("[MINIMAX] session query={}".format(session)) model = context.get("Minimax_model") new_args = self.args.copy() if model: new_args["model"] = model - # if context.get('stream'): - # # reply in stream - # return self.reply_text_stream(query, new_query, session_id) reply_content = self.reply_text(session, args=new_args) logger.debug( - "[Minimax_AI] new_query={}, session_id={}, reply_cont={}, completion_tokens={}".format( + "[MINIMAX] new_query={}, session_id={}, reply_cont={}, completion_tokens={}".format( session.messages, session_id, reply_content["content"], @@ -88,7 +80,7 @@ class MinimaxBot(Bot): reply = Reply(ReplyType.TEXT, reply_content["content"]) else: reply = Reply(ReplyType.ERROR, reply_content["content"]) - logger.debug("[Minimax_AI] reply {} used 0 tokens.".format(reply_content)) + logger.debug("[MINIMAX] reply {} used 0 tokens.".format(reply_content)) return reply else: reply = Reply(ReplyType.ERROR, "Bot不支持处理{}类型的消息".format(context.type)) @@ -96,41 +88,62 @@ class MinimaxBot(Bot): def reply_text(self, session: MinimaxSession, args=None, retry_count=0) -> dict: """ - call openai's ChatCompletion to get the answer + Call MiniMax API to get the answer using REST API :param session: a conversation session - :param session_id: session id + :param args: request arguments :param retry_count: retry count :return: {} """ try: - headers = {"Content-Type": "application/json", "Authorization": "Bearer " + self.api_key} - self.request_body["messages"].extend(session.messages) - logger.info("[Minimax_AI] request_body={}".format(self.request_body)) - # logger.info("[Minimax_AI] reply={}, total_tokens={}".format(response.choices[0]['message']['content'], response["usage"]["total_tokens"])) - res = requests.post(self.base_url, headers=headers, json=self.request_body) + if args is None: + args = self.args + + # Build request + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.api_key}" + } + + request_body = { + "model": args.get("model", self.args["model"]), + "messages": session.messages, + "temperature": args.get("temperature", self.args["temperature"]), + "top_p": args.get("top_p", self.args["top_p"]), + } + + url = f"{self.api_base}/chat/completions" + logger.debug(f"[MINIMAX] Calling {url} with model={request_body['model']}") + + response = requests.post(url, headers=headers, json=request_body, timeout=60) + + if response.status_code == 200: + result = response.json() + content = result["choices"][0]["message"]["content"] + total_tokens = result["usage"]["total_tokens"] + completion_tokens = result["usage"]["completion_tokens"] + + logger.debug(f"[MINIMAX] reply_text: content_length={len(content)}, tokens={total_tokens}") - # self.request_body["messages"].extend(response.json()["choices"][0]["messages"]) - if res.status_code == 200: - response = res.json() return { - "total_tokens": response["usage"]["total_tokens"], - "completion_tokens": response["usage"]["total_tokens"], - "content": response["reply"], + "total_tokens": total_tokens, + "completion_tokens": completion_tokens, + "content": content, } else: - response = res.json() - error = response.get("error") - logger.error(f"[Minimax_AI] chat failed, status_code={res.status_code}, " f"msg={error.get('message')}, type={error.get('type')}") + error_msg = response.text + logger.error(f"[MINIMAX] API error: status={response.status_code}, msg={error_msg}") - result = {"completion_tokens": 0, "content": "提问太快啦,请休息一下再问我吧"} + # Parse error for better messages + result = {"completion_tokens": 0, "content": "我现在有点累了,等会再来吧"} need_retry = False - if res.status_code >= 500: - # server error, need retry - logger.warn(f"[Minimax_AI] do retry, times={retry_count}") + + if response.status_code >= 500: + logger.warning(f"[MINIMAX] Server error, retry={retry_count}") need_retry = retry_count < 2 - elif res.status_code == 401: + elif response.status_code == 401: result["content"] = "授权失败,请检查API Key是否正确" - elif res.status_code == 429: + need_retry = False + elif response.status_code == 429: result["content"] = "请求过于频繁,请稍后再试" need_retry = retry_count < 2 else: @@ -141,11 +154,489 @@ class MinimaxBot(Bot): return self.reply_text(session, args, retry_count + 1) else: return result - except Exception as e: - logger.exception(e) + + except requests.exceptions.Timeout: + logger.error("[MINIMAX] Request timeout") need_retry = retry_count < 2 - result = {"completion_tokens": 0, "content": "我现在有点累了,等会再来吧"} + result = {"completion_tokens": 0, "content": "请求超时,请稍后再试"} if need_retry: + time.sleep(3) return self.reply_text(session, args, retry_count + 1) else: return result + except Exception as e: + logger.error(f"[MINIMAX] reply_text error: {e}") + import traceback + logger.error(traceback.format_exc()) + need_retry = retry_count < 2 + result = {"completion_tokens": 0, "content": "我现在有点累了,等会再来吧"} + if need_retry: + time.sleep(3) + return self.reply_text(session, args, retry_count + 1) + else: + return result + + def call_with_tools(self, messages, tools=None, stream=False, **kwargs): + """ + Call MiniMax API with tool support for agent integration + + This method handles: + 1. Format conversion (Claude format → OpenAI format) + 2. System prompt injection + 3. API calling with REST API + 4. Interleaved Thinking support (reasoning_split=True) + + Args: + messages: List of messages (may be in Claude format from agent) + tools: List of tool definitions (may be in Claude format from agent) + stream: Whether to use streaming + **kwargs: Additional parameters (max_tokens, temperature, system, etc.) + + Returns: + Formatted response or generator for streaming + """ + try: + # Convert messages from Claude format to OpenAI format + converted_messages = self._convert_messages_to_openai_format(messages) + + # Extract and inject system prompt if provided + system_prompt = kwargs.pop("system", None) + if system_prompt: + # Add system message at the beginning + converted_messages.insert(0, {"role": "system", "content": system_prompt}) + + # Convert tools from Claude format to OpenAI format + converted_tools = None + if tools: + converted_tools = self._convert_tools_to_openai_format(tools) + + # Prepare API parameters + model = kwargs.pop("model", None) or self.args["model"] + max_tokens = kwargs.pop("max_tokens", 4096) + temperature = kwargs.pop("temperature", self.args["temperature"]) + + # Build request body + request_body = { + "model": model, + "messages": converted_messages, + "max_tokens": max_tokens, + "temperature": temperature, + "stream": stream, + } + + # Add tools if provided + if converted_tools: + request_body["tools"] = converted_tools + + # Add reasoning_split=True for better thinking control (M2.1 feature) + # This separates thinking content into reasoning_details field + request_body["reasoning_split"] = True + + logger.debug(f"[MINIMAX] API call: model={model}, tools={len(converted_tools) if converted_tools else 0}, stream={stream}") + + # Check if we should show thinking process + show_thinking = kwargs.pop("show_thinking", conf().get("minimax_show_thinking", False)) + + if stream: + return self._handle_stream_response(request_body, show_thinking=show_thinking) + else: + return self._handle_sync_response(request_body) + + except Exception as e: + logger.error(f"[MINIMAX] call_with_tools error: {e}") + import traceback + logger.error(traceback.format_exc()) + + def error_generator(): + yield {"error": True, "message": str(e), "status_code": 500} + return error_generator() + + def _convert_messages_to_openai_format(self, messages): + """ + Convert messages from Claude format to OpenAI format + + Claude format: + - role: "user" | "assistant" + - content: string | list of content blocks + + OpenAI format: + - role: "user" | "assistant" | "tool" + - content: string + - tool_calls: list (for assistant) + - tool_call_id: string (for tool results) + """ + converted = [] + + for msg in messages: + role = msg.get("role") + content = msg.get("content") + + if role == "user": + # Handle user message + if isinstance(content, list): + # Extract text from content blocks + text_parts = [] + tool_result = None + + for block in content: + if isinstance(block, dict): + if block.get("type") == "text": + text_parts.append(block.get("text", "")) + elif block.get("type") == "tool_result": + # Tool result should be a separate message with role="tool" + tool_result = { + "role": "tool", + "tool_call_id": block.get("tool_use_id"), + "content": str(block.get("content", "")) + } + + if text_parts: + converted.append({ + "role": "user", + "content": "\n".join(text_parts) + }) + + if tool_result: + converted.append(tool_result) + else: + # Simple text content + converted.append({ + "role": "user", + "content": str(content) + }) + + elif role == "assistant": + # Handle assistant message + openai_msg = {"role": "assistant"} + + if isinstance(content, list): + # Parse content blocks + text_parts = [] + tool_calls = [] + + for block in content: + if isinstance(block, dict): + if block.get("type") == "text": + text_parts.append(block.get("text", "")) + elif block.get("type") == "tool_use": + # Convert to OpenAI tool_calls format + tool_calls.append({ + "id": block.get("id"), + "type": "function", + "function": { + "name": block.get("name"), + "arguments": json.dumps(block.get("input", {})) + } + }) + + # Set content (can be empty if only tool calls) + if text_parts: + openai_msg["content"] = "\n".join(text_parts) + elif not tool_calls: + openai_msg["content"] = "" + + # Set tool_calls + if tool_calls: + openai_msg["tool_calls"] = tool_calls + # When tool_calls exist and content is empty, set to None + if not text_parts: + openai_msg["content"] = None + + else: + # Simple text content + openai_msg["content"] = str(content) if content else "" + + converted.append(openai_msg) + + return converted + + def _convert_tools_to_openai_format(self, tools): + """ + Convert tools from Claude format to OpenAI format + + Claude format: + { + "name": "tool_name", + "description": "description", + "input_schema": {...} + } + + OpenAI format: + { + "type": "function", + "function": { + "name": "tool_name", + "description": "description", + "parameters": {...} + } + } + """ + converted = [] + + for tool in tools: + converted.append({ + "type": "function", + "function": { + "name": tool.get("name"), + "description": tool.get("description"), + "parameters": tool.get("input_schema", {}) + } + }) + + return converted + + def _handle_sync_response(self, request_body): + """Handle synchronous API response""" + try: + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.api_key}" + } + + # Remove stream from body for sync request + request_body.pop("stream", None) + + url = f"{self.api_base}/chat/completions" + response = requests.post(url, headers=headers, json=request_body, timeout=60) + + if response.status_code != 200: + error_msg = response.text + logger.error(f"[MINIMAX] API error: status={response.status_code}, msg={error_msg}") + yield {"error": True, "message": error_msg, "status_code": response.status_code} + return + + result = response.json() + message = result["choices"][0]["message"] + finish_reason = result["choices"][0]["finish_reason"] + + # Build response in Claude-like format + response_data = { + "role": "assistant", + "content": [] + } + + # Add reasoning_details (thinking) if present + if "reasoning_details" in message: + for reasoning in message["reasoning_details"]: + if "text" in reasoning: + response_data["content"].append({ + "type": "thinking", + "thinking": reasoning["text"] + }) + + # Add text content if present + if message.get("content"): + response_data["content"].append({ + "type": "text", + "text": message["content"] + }) + + # Add tool calls if present + if message.get("tool_calls"): + for tool_call in message["tool_calls"]: + response_data["content"].append({ + "type": "tool_use", + "id": tool_call["id"], + "name": tool_call["function"]["name"], + "input": json.loads(tool_call["function"]["arguments"]) + }) + + # Set stop_reason + if finish_reason == "tool_calls": + response_data["stop_reason"] = "tool_use" + elif finish_reason == "stop": + response_data["stop_reason"] = "end_turn" + else: + response_data["stop_reason"] = finish_reason + + yield response_data + + except requests.exceptions.Timeout: + logger.error("[MINIMAX] Request timeout") + yield {"error": True, "message": "Request timeout", "status_code": 500} + except Exception as e: + logger.error(f"[MINIMAX] sync response error: {e}") + import traceback + logger.error(traceback.format_exc()) + yield {"error": True, "message": str(e), "status_code": 500} + + def _handle_stream_response(self, request_body, show_thinking=False): + """Handle streaming API response + + Args: + request_body: API request parameters + show_thinking: Whether to show thinking/reasoning process to users + """ + try: + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.api_key}" + } + + url = f"{self.api_base}/chat/completions" + response = requests.post(url, headers=headers, json=request_body, stream=True, timeout=60) + + if response.status_code != 200: + error_msg = response.text + logger.error(f"[MINIMAX] API error: status={response.status_code}, msg={error_msg}") + yield {"error": True, "message": error_msg, "status_code": response.status_code} + return + + current_content = [] + current_tool_calls = {} + current_reasoning = [] + finish_reason = None + chunk_count = 0 + + # Process SSE stream + for line in response.iter_lines(): + if not line: + continue + + line = line.decode('utf-8') + if not line.startswith('data: '): + continue + + data_str = line[6:] # Remove 'data: ' prefix + if data_str.strip() == '[DONE]': + break + + try: + chunk = json.loads(data_str) + chunk_count += 1 + except json.JSONDecodeError as e: + logger.warning(f"[MINIMAX] JSON decode error: {e}, data: {data_str[:100]}") + continue + + # Check for error response (MiniMax format) + if chunk.get("type") == "error" or "error" in chunk: + error_data = chunk.get("error", {}) + error_msg = error_data.get("message", "Unknown error") + error_type = error_data.get("type", "") + http_code = error_data.get("http_code", "") + + logger.error(f"[MINIMAX] API error: {error_msg} (type: {error_type}, code: {http_code})") + + yield { + "error": True, + "message": error_msg, + "status_code": int(http_code) if http_code.isdigit() else 500 + } + return + + if not chunk.get("choices"): + continue + + choice = chunk["choices"][0] + delta = choice.get("delta", {}) + + # Handle reasoning_details (thinking) + if "reasoning_details" in delta: + for reasoning in delta["reasoning_details"]: + if "text" in reasoning: + reasoning_id = reasoning.get("id", "reasoning-text-1") + reasoning_index = reasoning.get("index", 0) + reasoning_text = reasoning["text"] + + # Accumulate reasoning text + if reasoning_index >= len(current_reasoning): + current_reasoning.append({"id": reasoning_id, "text": ""}) + + current_reasoning[reasoning_index]["text"] += reasoning_text + + # Optionally yield thinking as visible content + if show_thinking: + # Format thinking text for display + formatted_thinking = f"💭 {reasoning_text}" + + # Yield as OpenAI-format content delta + yield { + "choices": [{ + "index": 0, + "delta": { + "role": "assistant", + "content": formatted_thinking + } + }] + } + + # Handle text content + if "content" in delta and delta["content"]: + # Start new content block if needed + if not any(block.get("type") == "text" for block in current_content): + current_content.append({"type": "text", "text": ""}) + + # Accumulate text + for block in current_content: + if block.get("type") == "text": + block["text"] += delta["content"] + break + + # Yield OpenAI-format delta (for agent_stream.py compatibility) + yield { + "choices": [{ + "index": 0, + "delta": { + "role": "assistant", + "content": delta["content"] + } + }] + } + + # Handle tool calls + if "tool_calls" in delta: + for tool_call_chunk in delta["tool_calls"]: + index = tool_call_chunk.get("index", 0) + if index not in current_tool_calls: + # Start new tool call + current_tool_calls[index] = { + "id": tool_call_chunk.get("id", ""), + "type": "tool_use", + "name": tool_call_chunk.get("function", {}).get("name", ""), + "input": "" + } + + # Accumulate tool call arguments + if "function" in tool_call_chunk and "arguments" in tool_call_chunk["function"]: + current_tool_calls[index]["input"] += tool_call_chunk["function"]["arguments"] + + # Yield OpenAI-format tool call delta + yield { + "choices": [{ + "index": 0, + "delta": { + "tool_calls": [tool_call_chunk] + } + }] + } + + # Handle finish_reason + if choice.get("finish_reason"): + finish_reason = choice["finish_reason"] + + # Log complete reasoning_details for debugging + if current_reasoning: + logger.debug(f"[MINIMAX] ===== Complete Reasoning Details =====") + for i, reasoning in enumerate(current_reasoning): + reasoning_text = reasoning.get("text", "") + logger.debug(f"[MINIMAX] Reasoning {i+1} (length={len(reasoning_text)}):") + logger.debug(f"[MINIMAX] {reasoning_text}") + logger.debug(f"[MINIMAX] ===== End Reasoning Details =====") + + # Yield final chunk with finish_reason (OpenAI format) + yield { + "choices": [{ + "index": 0, + "delta": {}, + "finish_reason": finish_reason + }] + } + + except requests.exceptions.Timeout: + logger.error("[MINIMAX] Request timeout") + yield {"error": True, "message": "Request timeout", "status_code": 500} + except Exception as e: + logger.error(f"[MINIMAX] stream response error: {e}") + import traceback + logger.error(traceback.format_exc()) + yield {"error": True, "message": str(e), "status_code": 500}