This commit is contained in:
philschmid
2025-05-29 15:46:39 -07:00
parent abd4403858
commit 09971ff55e
48 changed files with 9638 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
from agent.graph import graph
__all__ = ["graph"]

61
backend/src/agent/app.py Normal file
View File

@@ -0,0 +1,61 @@
# mypy: disable - error - code = "no-untyped-def,misc"
import pathlib
from fastapi import FastAPI, Request, Response
from fastapi.staticfiles import StaticFiles
import fastapi.exceptions
# Define the FastAPI app
app = FastAPI()
def create_frontend_router(build_dir="../frontend/dist"):
"""Creates a router to serve the React frontend.
Args:
build_dir: Path to the React build directory relative to this file.
Returns:
A Starlette application serving the frontend.
"""
build_path = pathlib.Path(__file__).parent.parent.parent / build_dir
static_files_path = build_path / "assets" # Vite uses 'assets' subdir
if not build_path.is_dir() or not (build_path / "index.html").is_file():
print(
f"WARN: Frontend build directory not found or incomplete at {build_path}. Serving frontend will likely fail."
)
# Return a dummy router if build isn't ready
from starlette.routing import Route
async def dummy_frontend(request):
return Response(
"Frontend not built. Run 'npm run build' in the frontend directory.",
media_type="text/plain",
status_code=503,
)
return Route("/{path:path}", endpoint=dummy_frontend)
build_dir = pathlib.Path(build_dir)
react = FastAPI(openapi_url="")
react.mount(
"/assets", StaticFiles(directory=static_files_path), name="static_assets"
)
@react.get("/{path:path}")
async def handle_catch_all(request: Request, path: str):
fp = build_path / path
if not fp.exists() or not fp.is_file():
fp = build_path / "index.html"
return fastapi.responses.FileResponse(fp)
return react
# Mount the frontend under /app to not conflict with the LangGraph API routes
app.mount(
"/app",
create_frontend_router(),
name="frontend",
)

View File

@@ -0,0 +1,60 @@
import os
from pydantic import BaseModel, Field
from typing import Any, Optional
from langchain_core.runnables import RunnableConfig
class Configuration(BaseModel):
"""The configuration for the agent."""
query_generator_model: str = Field(
default="gemini-2.0-flash",
metadata={
"description": "The name of the language model to use for the agent's query generation."
},
)
reflection_model: str = Field(
default="gemini-2.5-flash-preview-04-17",
metadata={
"description": "The name of the language model to use for the agent's reflection."
},
)
answer_model: str = Field(
default="gemini-2.5-pro-preview-05-06",
metadata={
"description": "The name of the language model to use for the agent's answer."
},
)
number_of_initial_queries: int = Field(
default=3,
metadata={"description": "The number of initial search queries to generate."},
)
max_research_loops: int = Field(
default=2,
metadata={"description": "The maximum number of research loops to perform."},
)
@classmethod
def from_runnable_config(
cls, config: Optional[RunnableConfig] = None
) -> "Configuration":
"""Create a Configuration instance from a RunnableConfig."""
configurable = (
config["configurable"] if config and "configurable" in config else {}
)
# Get raw values from environment or config
raw_values: dict[str, Any] = {
name: os.environ.get(name.upper(), configurable.get(name))
for name in cls.model_fields.keys()
}
# Filter out None values
values = {k: v for k, v in raw_values.items() if v is not None}
return cls(**values)

293
backend/src/agent/graph.py Normal file
View File

@@ -0,0 +1,293 @@
import os
from agent.tools_and_schemas import SearchQueryList, Reflection
from dotenv import load_dotenv
from langchain_core.messages import AIMessage
from langgraph.types import Send
from langgraph.graph import StateGraph
from langgraph.graph import START, END
from langchain_core.runnables import RunnableConfig
from google.genai import Client
from agent.state import (
OverallState,
QueryGenerationState,
ReflectionState,
WebSearchState,
)
from agent.configuration import Configuration
from agent.prompts import (
get_current_date,
query_writer_instructions,
web_searcher_instructions,
reflection_instructions,
answer_instructions,
)
from langchain_google_genai import ChatGoogleGenerativeAI
from agent.utils import (
get_citations,
get_research_topic,
insert_citation_markers,
resolve_urls,
)
load_dotenv()
if os.getenv("GEMINI_API_KEY") is None:
raise ValueError("GEMINI_API_KEY is not set")
# Used for Google Search API
genai_client = Client(api_key=os.getenv("GEMINI_API_KEY"))
# Nodes
def generate_query(state: OverallState, config: RunnableConfig) -> QueryGenerationState:
"""LangGraph node that generates a search queries based on the User's question.
Uses Gemini 2.0 Flash to create an optimized search query for web research based on
the User's question.
Args:
state: Current graph state containing the User's question
config: Configuration for the runnable, including LLM provider settings
Returns:
Dictionary with state update, including search_query key containing the generated query
"""
configurable = Configuration.from_runnable_config(config)
# check for custom initial search query count
if state.get("initial_search_query_count") is None:
state["initial_search_query_count"] = configurable.number_of_initial_queries
# init Gemini 2.0 Flash
llm = ChatGoogleGenerativeAI(
model=configurable.query_generator_model,
temperature=1.0,
max_retries=2,
api_key=os.getenv("GEMINI_API_KEY"),
)
structured_llm = llm.with_structured_output(SearchQueryList)
# Format the prompt
current_date = get_current_date()
formatted_prompt = query_writer_instructions.format(
current_date=current_date,
research_topic=get_research_topic(state["messages"]),
number_queries=state["initial_search_query_count"],
)
# Generate the search queries
result = structured_llm.invoke(formatted_prompt)
return {"query_list": result.query}
def continue_to_web_research(state: QueryGenerationState):
"""LangGraph node that sends the search queries to the web research node.
This is used to spawn n number of web research nodes, one for each search query.
"""
return [
Send("web_research", {"search_query": search_query, "id": int(idx)})
for idx, search_query in enumerate(state["query_list"])
]
def web_research(state: WebSearchState, config: RunnableConfig) -> OverallState:
"""LangGraph node that performs web research using the native Google Search API tool.
Executes a web search using the native Google Search API tool in combination with Gemini 2.0 Flash.
Args:
state: Current graph state containing the search query and research loop count
config: Configuration for the runnable, including search API settings
Returns:
Dictionary with state update, including sources_gathered, research_loop_count, and web_research_results
"""
# Configure
configurable = Configuration.from_runnable_config(config)
formatted_prompt = web_searcher_instructions.format(
current_date=get_current_date(),
research_topic=state["search_query"],
)
# Uses the google genai client as the langchain client doesn't return grounding metadata
response = genai_client.models.generate_content(
model=configurable.query_generator_model,
contents=formatted_prompt,
config={
"tools": [{"google_search": {}}],
"temperature": 0,
},
)
# resolve the urls to short urls for saving tokens and time
resolved_urls = resolve_urls(
response.candidates[0].grounding_metadata.grounding_chunks, state["id"]
)
# Gets the citations and adds them to the generated text
citations = get_citations(response, resolved_urls)
modified_text = insert_citation_markers(response.text, citations)
sources_gathered = [item for citation in citations for item in citation["segments"]]
return {
"sources_gathered": sources_gathered,
"search_query": [state["search_query"]],
"web_research_result": [modified_text],
}
def reflection(state: OverallState, config: RunnableConfig) -> ReflectionState:
"""LangGraph node that identifies knowledge gaps and generates potential follow-up queries.
Analyzes the current summary to identify areas for further research and generates
potential follow-up queries. Uses structured output to extract
the follow-up query in JSON format.
Args:
state: Current graph state containing the running summary and research topic
config: Configuration for the runnable, including LLM provider settings
Returns:
Dictionary with state update, including search_query key containing the generated follow-up query
"""
configurable = Configuration.from_runnable_config(config)
# Increment the research loop count and get the reasoning model
state["research_loop_count"] = state.get("research_loop_count", 0) + 1
reasoning_model = state.get("reasoning_model") or configurable.reasoning_model
# Format the prompt
current_date = get_current_date()
formatted_prompt = reflection_instructions.format(
current_date=current_date,
research_topic=get_research_topic(state["messages"]),
summaries="\n\n---\n\n".join(state["web_research_result"]),
)
# init Reasoning Model
llm = ChatGoogleGenerativeAI(
model=reasoning_model,
temperature=1.0,
max_retries=2,
api_key=os.getenv("GEMINI_API_KEY"),
)
result = llm.with_structured_output(Reflection).invoke(formatted_prompt)
return {
"is_sufficient": result.is_sufficient,
"knowledge_gap": result.knowledge_gap,
"follow_up_queries": result.follow_up_queries,
"research_loop_count": state["research_loop_count"],
"number_of_ran_queries": len(state["search_query"]),
}
def evaluate_research(
state: ReflectionState,
config: RunnableConfig,
) -> OverallState:
"""LangGraph routing function that determines the next step in the research flow.
Controls the research loop by deciding whether to continue gathering information
or to finalize the summary based on the configured maximum number of research loops.
Args:
state: Current graph state containing the research loop count
config: Configuration for the runnable, including max_research_loops setting
Returns:
String literal indicating the next node to visit ("web_research" or "finalize_summary")
"""
configurable = Configuration.from_runnable_config(config)
max_research_loops = (
state.get("max_research_loops")
if state.get("max_research_loops") is not None
else configurable.max_research_loops
)
if state["is_sufficient"] or state["research_loop_count"] >= max_research_loops:
return "finalize_answer"
else:
return [
Send(
"web_research",
{
"search_query": follow_up_query,
"id": state["number_of_ran_queries"] + int(idx),
},
)
for idx, follow_up_query in enumerate(state["follow_up_queries"])
]
def finalize_answer(state: OverallState, config: RunnableConfig):
"""LangGraph node that finalizes the research summary.
Prepares the final output by deduplicating and formatting sources, then
combining them with the running summary to create a well-structured
research report with proper citations.
Args:
state: Current graph state containing the running summary and sources gathered
Returns:
Dictionary with state update, including running_summary key containing the formatted final summary with sources
"""
configurable = Configuration.from_runnable_config(config)
reasoning_model = state.get("reasoning_model") or configurable.reasoning_model
# Format the prompt
current_date = get_current_date()
formatted_prompt = answer_instructions.format(
current_date=current_date,
research_topic=get_research_topic(state["messages"]),
summaries="\n---\n\n".join(state["web_research_result"]),
)
# init Reasoning Model, default to Gemini 2.5 Flash
llm = ChatGoogleGenerativeAI(
model=reasoning_model,
temperature=0,
max_retries=2,
api_key=os.getenv("GEMINI_API_KEY"),
)
result = llm.invoke(formatted_prompt)
# Replace the short urls with the original urls and add all used urls to the sources_gathered
unique_sources = []
for source in state["sources_gathered"]:
if source["short_url"] in result.content:
result.content = result.content.replace(
source["short_url"], source["value"]
)
unique_sources.append(source)
return {
"messages": [AIMessage(content=result.content)],
"sources_gathered": unique_sources,
}
# Create our Agent Graph
builder = StateGraph(OverallState, config_schema=Configuration)
# Define the nodes we will cycle between
builder.add_node("generate_query", generate_query)
builder.add_node("web_research", web_research)
builder.add_node("reflection", reflection)
builder.add_node("finalize_answer", finalize_answer)
# Set the entrypoint as `generate_query`
# This means that this node is the first one called
builder.add_edge(START, "generate_query")
# Add conditional edge to continue with search queries in a parallel branch
builder.add_conditional_edges(
"generate_query", continue_to_web_research, ["web_research"]
)
# Reflect on the web research
builder.add_edge("web_research", "reflection")
# Evaluate the research
builder.add_conditional_edges(
"reflection", evaluate_research, ["web_research", "finalize_answer"]
)
# Finalize the answer
builder.add_edge("finalize_answer", END)
graph = builder.compile(name="pro-search-agent")

View File

@@ -0,0 +1,96 @@
from datetime import datetime
# Get current date in a readable format
def get_current_date():
return datetime.now().strftime("%B %d, %Y")
query_writer_instructions = """Your goal is to generate sophisticated and diverse web search queries. These queries are intended for an advanced automated web research tool capable of analyzing complex results, following links, and synthesizing information.
Instructions:
- Always prefer a single search query, only add another query if the original question requests multiple aspects or elements and one query is not enough.
- Each query should focus on one specific aspect of the original question.
- Don't produce more than {number_queries} queries.
- Queries should be diverse, if the topic is broad, generate more than 1 query.
- Don't generate multiple similar queries, 1 is enough.
- Query should ensure that the most current information is gathered. The current date is {current_date}.
Format:
- Format your response as a JSON object with ALL three of these exact keys:
- "rationale": Brief explanation of why these queries are relevant
- "query": A list of search queries
Example:
Topic: What revenue grew more last year apple stock or the number of people buying an iphone
```json
{{
"rationale": "To answer this comparative growth question accurately, we need specific data points on Apple's stock performance and iPhone sales metrics. These queries target the precise financial information needed: company revenue trends, product-specific unit sales figures, and stock price movement over the same fiscal period for direct comparison.",
"query": ["Apple total revenue growth fiscal year 2024", "iPhone unit sales growth fiscal year 2024", "Apple stock price growth fiscal year 2024"],
}}
```
Context: {research_topic}"""
web_searcher_instructions = """Conduct targeted Google Searches to gather the most recent, credible information on "{research_topic}" and synthesize it into a verifiable text artifact.
Instructions:
- Query should ensure that the most current information is gathered. The current date is {current_date}.
- Conduct multiple, diverse searches to gather comprehensive information.
- Consolidate key findings while meticulously tracking the source(s) for each specific piece of information.
- The output should be a well-written summary or report based on your search findings.
- Only include the information found in the search results, don't make up any information.
Research Topic:
{research_topic}
"""
reflection_instructions = """You are an expert research assistant analyzing summaries about "{research_topic}".
Instructions:
- Identify knowledge gaps or areas that need deeper exploration and generate a follow-up query. (1 or multiple).
- If provided summaries are sufficient to answer the user's question, don't generate a follow-up query.
- If there is a knowledge gap, generate a follow-up query that would help expand your understanding.
- Focus on technical details, implementation specifics, or emerging trends that weren't fully covered.
Requirements:
- Ensure the follow-up query is self-contained and includes necessary context for web search.
Output Format:
- Format your response as a JSON object with these exact keys:
- "is_sufficient": true or false
- "knowledge_gap": Describe what information is missing or needs clarification
- "follow_up_queries": Write a specific question to address this gap
Example:
```json
{{
"is_sufficient": true, // or false
"knowledge_gap": "The summary lacks information about performance metrics and benchmarks", // "" if is_sufficient is true
"follow_up_queries": ["What are typical performance benchmarks and metrics used to evaluate [specific technology]?"] // [] if is_sufficient is true
}}
```
Reflect carefully on the Summaries to identify knowledge gaps and produce a follow-up query. Then, produce your output following this JSON format:
Summaries:
{summaries}
"""
answer_instructions = """Generate a high-quality answer to the user's question based on the provided summaries.
Instructions:
- The current date is {current_date}.
- You are the finaly step of a multi-step research process, don't mention that you are the final step.
- You have access to all the information gathered from the previous steps.
- You have access to the user's question.
- Generate a high-quality answer to the user's question based on the provided summaries and the user's question.
- you MUST include all the citations from the summaries in the answer correctly.
User Context:
- {research_topic}
Summaries:
{summaries}"""

View File

@@ -0,0 +1,50 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TypedDict
from langgraph.graph import add_messages
from typing_extensions import Annotated
import operator
from dataclasses import dataclass, field
from typing_extensions import Annotated
class OverallState(TypedDict):
messages: Annotated[list, add_messages]
search_query: Annotated[list, operator.add]
web_research_result: Annotated[list, operator.add]
sources_gathered: Annotated[list, operator.add]
initial_search_query_count: int
max_research_loops: int
research_loop_count: int
reasoning_model: str
class ReflectionState(TypedDict):
is_sufficient: bool
knowledge_gap: str
follow_up_queries: Annotated[list, operator.add]
research_loop_count: int
number_of_ran_queries: int
class Query(TypedDict):
query: str
rationale: str
class QueryGenerationState(TypedDict):
query_list: list[Query]
class WebSearchState(TypedDict):
search_query: str
id: str
@dataclass(kw_only=True)
class SearchStateOutput:
running_summary: str = field(default=None) # Final report

View File

@@ -0,0 +1,23 @@
from typing import List
from pydantic import BaseModel, Field
class SearchQueryList(BaseModel):
query: List[str] = Field(
description="A list of search queries to be used for web research."
)
rationale: str = Field(
description="A brief explanation of why these queries are relevant to the research topic."
)
class Reflection(BaseModel):
is_sufficient: bool = Field(
description="Whether the provided summaries are sufficient to answer the user's question."
)
knowledge_gap: str = Field(
description="A description of what information is missing or needs clarification."
)
follow_up_queries: List[str] = Field(
description="A list of follow-up queries to address the knowledge gap."
)

166
backend/src/agent/utils.py Normal file
View File

@@ -0,0 +1,166 @@
from typing import Any, Dict, List
from langchain_core.messages import AnyMessage, AIMessage, HumanMessage
def get_research_topic(messages: List[AnyMessage]) -> str:
"""
Get the research topic from the messages.
"""
# check if request has a history and combine the messages into a single string
if len(messages) == 1:
research_topic = messages[-1].content
else:
research_topic = ""
for message in messages:
if isinstance(message, HumanMessage):
research_topic += f"User: {message.content}\n"
elif isinstance(message, AIMessage):
research_topic += f"Assistant: {message.content}\n"
return research_topic
def resolve_urls(urls_to_resolve: List[Any], id: int) -> Dict[str, str]:
"""
Create a map of the vertex ai search urls (very long) to a short url with a unique id for each url.
Ensures each original URL gets a consistent shortened form while maintaining uniqueness.
"""
prefix = f"https://vertexaisearch.cloud.google.com/id/"
urls = [site.web.uri for site in urls_to_resolve]
# Create a dictionary that maps each unique URL to its first occurrence index
resolved_map = {}
for idx, url in enumerate(urls):
if url not in resolved_map:
resolved_map[url] = f"{prefix}{id}-{idx}"
return resolved_map
def insert_citation_markers(text, citations_list):
"""
Inserts citation markers into a text string based on start and end indices.
Args:
text (str): The original text string.
citations_list (list): A list of dictionaries, where each dictionary
contains 'start_index', 'end_index', and
'segment_string' (the marker to insert).
Indices are assumed to be for the original text.
Returns:
str: The text with citation markers inserted.
"""
# Sort citations by end_index in descending order.
# If end_index is the same, secondary sort by start_index descending.
# This ensures that insertions at the end of the string don't affect
# the indices of earlier parts of the string that still need to be processed.
sorted_citations = sorted(
citations_list, key=lambda c: (c["end_index"], c["start_index"]), reverse=True
)
modified_text = text
for citation_info in sorted_citations:
# These indices refer to positions in the *original* text,
# but since we iterate from the end, they remain valid for insertion
# relative to the parts of the string already processed.
end_idx = citation_info["end_index"]
marker_to_insert = ""
for segment in citation_info["segments"]:
marker_to_insert += f" [{segment['label']}]({segment['short_url']})"
# Insert the citation marker at the original end_idx position
modified_text = (
modified_text[:end_idx] + marker_to_insert + modified_text[end_idx:]
)
return modified_text
def get_citations(response, resolved_urls_map):
"""
Extracts and formats citation information from a Gemini model's response.
This function processes the grounding metadata provided in the response to
construct a list of citation objects. Each citation object includes the
start and end indices of the text segment it refers to, and a string
containing formatted markdown links to the supporting web chunks.
Args:
response: The response object from the Gemini model, expected to have
a structure including `candidates[0].grounding_metadata`.
It also relies on a `resolved_map` being available in its
scope to map chunk URIs to resolved URLs.
Returns:
list: A list of dictionaries, where each dictionary represents a citation
and has the following keys:
- "start_index" (int): The starting character index of the cited
segment in the original text. Defaults to 0
if not specified.
- "end_index" (int): The character index immediately after the
end of the cited segment (exclusive).
- "segments" (list[str]): A list of individual markdown-formatted
links for each grounding chunk.
- "segment_string" (str): A concatenated string of all markdown-
formatted links for the citation.
Returns an empty list if no valid candidates or grounding supports
are found, or if essential data is missing.
"""
citations = []
# Ensure response and necessary nested structures are present
if not response or not response.candidates:
return citations
candidate = response.candidates[0]
if (
not hasattr(candidate, "grounding_metadata")
or not candidate.grounding_metadata
or not hasattr(candidate.grounding_metadata, "grounding_supports")
):
return citations
for support in candidate.grounding_metadata.grounding_supports:
citation = {}
# Ensure segment information is present
if not hasattr(support, "segment") or support.segment is None:
continue # Skip this support if segment info is missing
start_index = (
support.segment.start_index
if support.segment.start_index is not None
else 0
)
# Ensure end_index is present to form a valid segment
if support.segment.end_index is None:
continue # Skip if end_index is missing, as it's crucial
# Add 1 to end_index to make it an exclusive end for slicing/range purposes
# (assuming the API provides an inclusive end_index)
citation["start_index"] = start_index
citation["end_index"] = support.segment.end_index
citation["segments"] = []
if (
hasattr(support, "grounding_chunk_indices")
and support.grounding_chunk_indices
):
for ind in support.grounding_chunk_indices:
try:
chunk = candidate.grounding_metadata.grounding_chunks[ind]
resolved_url = resolved_urls_map.get(chunk.web.uri, None)
citation["segments"].append(
{
"label": chunk.web.title.split(".")[:-1][0],
"short_url": resolved_url,
"value": chunk.web.uri,
}
)
except (IndexError, AttributeError, NameError):
# Handle cases where chunk, web, uri, or resolved_map might be problematic
# For simplicity, we'll just skip adding this particular segment link
# In a production system, you might want to log this.
pass
citations.append(citation)
return citations