Skip to main content
Glama
server.py32.5 kB
# R2R FastMCP Server # Install: mcp install server.py -v R2R_BASE_URL=http://localhost:7272 import json import logging import os from enum import Enum from typing import Any, Literal from dotenv import load_dotenv from r2r import R2RClient # type: ignore[import-untyped] # Load environment variables from .env file load_dotenv() # Configuration R2R_BASE_URL = os.getenv("R2R_BASE_URL", "http://127.0.0.1:7272") API_KEY = os.getenv("API_KEY", "") # Logging setup logging.basicConfig(level=logging.INFO) logger = logging.getLogger("r2r-mcp") # Helper functions def id_to_shorthand(id: str) -> str: return str(id)[:7] def format_search_results_for_llm(results: Any) -> str: """ Format R2R search results for LLM consumption. Aggregates 4 types of results: - Chunk search (vector search) - Graph search (knowledge graph entities/relationships) - Web search (internet results) - Document search (local documents with chunks) """ lines = [] # 1) Chunk search if results.chunk_search_results: lines.append("Vector Search Results:") for c in results.chunk_search_results: lines.append(f"Source ID [{id_to_shorthand(c.id)}]:") lines.append(c.text or "") # 2) Graph search if results.graph_search_results: lines.append("Graph Search Results:") for g in results.graph_search_results: lines.append(f"Source ID [{id_to_shorthand(g.id)}]:") if hasattr(g.content, "summary"): lines.append(f"Community Name: {g.content.name}") lines.append(f"ID: {g.content.id}") lines.append(f"Summary: {g.content.summary}") elif hasattr(g.content, "name") and hasattr(g.content, "description"): lines.append(f"Entity Name: {g.content.name}") lines.append(f"Description: {g.content.description}") elif ( hasattr(g.content, "subject") and hasattr(g.content, "predicate") and hasattr(g.content, "object") ): rel = f"{g.content.subject}-{g.content.predicate}-{g.content.object}" lines.append(f"Relationship: {rel}") # 3) Web search if results.web_search_results: lines.append("Web Search Results:") for w in results.web_search_results: lines.append(f"Source ID [{id_to_shorthand(w.id)}]:") lines.append(f"Title: {w.title}") lines.append(f"Link: {w.link}") lines.append(f"Snippet: {w.snippet}") # 4) Local context docs if results.document_search_results: lines.append("Local Context Documents:") for doc_result in results.document_search_results: doc_title = doc_result.title or "Untitled Document" doc_id = doc_result.id summary = doc_result.summary lines.append(f"Full Document ID: {doc_id}") lines.append(f"Shortened Document ID: {id_to_shorthand(doc_id)}") lines.append(f"Document Title: {doc_title}") if summary: lines.append(f"Summary: {summary}") if doc_result.chunks: for chunk in doc_result.chunks: lines.append( f"\nChunk ID {id_to_shorthand(chunk['id'])}:\n{chunk['text']}" ) result = "\n".join(lines) return result # Preset configurations for different use cases class SearchPreset(str, Enum): """Preset configurations for search operations.""" DEFAULT = "default" DEVELOPMENT = "development" REFACTORING = "refactoring" DEBUG = "debug" RESEARCH = "research" PRODUCTION = "production" class RAGPreset(str, Enum): """Preset configurations for RAG operations.""" DEFAULT = "default" DEVELOPMENT = "development" REFACTORING = "refactoring" DEBUG = "debug" RESEARCH = "research" PRODUCTION = "production" def get_search_preset_config(preset: str) -> dict[str, Any]: """ Get search configuration for a preset. Args: preset: Preset name (default, development, refactoring, debug, research, production) Returns: Dictionary with search settings """ presets = { "default": { "use_semantic_search": True, "use_hybrid_search": False, "use_graph_search": False, "limit": 10, }, "development": { "use_semantic_search": True, "use_hybrid_search": True, "use_graph_search": False, "limit": 15, "hybrid_settings": { "semantic_weight": 5.0, "full_text_weight": 1.0, "full_text_limit": 200, "rrf_k": 50, }, }, "refactoring": { "use_semantic_search": True, "use_hybrid_search": True, "use_graph_search": True, "limit": 20, "kg_search_type": "local", "hybrid_settings": { "semantic_weight": 7.0, "full_text_weight": 3.0, "full_text_limit": 300, "rrf_k": 50, }, }, "debug": { "use_semantic_search": True, "use_hybrid_search": False, "use_graph_search": True, "limit": 5, "kg_search_type": "local", }, "research": { "use_semantic_search": True, "use_hybrid_search": True, "use_graph_search": True, "limit": 30, "kg_search_type": "global", "hybrid_settings": { "semantic_weight": 6.0, "full_text_weight": 2.0, "full_text_limit": 400, "rrf_k": 60, }, }, "production": { "use_semantic_search": True, "use_hybrid_search": True, "use_graph_search": False, "limit": 10, "hybrid_settings": { "semantic_weight": 5.0, "full_text_weight": 1.0, "full_text_limit": 200, "rrf_k": 50, }, }, } return presets.get(preset.lower(), presets["default"]).copy() def get_rag_preset_config(preset: str) -> dict[str, Any]: """ Get RAG configuration for a preset. Args: preset: Preset name (default, development, refactoring, debug, research, production) Returns: Dictionary with RAG settings (search_settings and rag_generation_config) """ presets = { "default": { "search_settings": { "use_semantic_search": True, "use_hybrid_search": False, "limit": 10, }, "rag_generation_config": { "model": "vertex_ai/gemini-2.5-flash", "temperature": 0.7, }, }, "development": { "search_settings": { "use_semantic_search": True, "use_hybrid_search": True, "limit": 15, "hybrid_settings": { "semantic_weight": 5.0, "full_text_weight": 1.0, "full_text_limit": 200, "rrf_k": 50, }, }, "rag_generation_config": { "model": "vertex_ai/gemini-2.5-flash", "temperature": 0.8, }, }, "refactoring": { "search_settings": { "use_semantic_search": True, "use_hybrid_search": True, "use_graph_search": True, "limit": 20, "kg_search_type": "local", "hybrid_settings": { "semantic_weight": 7.0, "full_text_weight": 3.0, "full_text_limit": 300, "rrf_k": 50, }, }, "rag_generation_config": { "model": "vertex_ai/gemini-2.5-pro", "temperature": 0.5, }, }, "debug": { "search_settings": { "use_semantic_search": True, "use_hybrid_search": False, "use_graph_search": True, "limit": 5, "kg_search_type": "local", }, "rag_generation_config": { "model": "vertex_ai/gemini-2.5-flash", "temperature": 0.3, }, }, "research": { "search_settings": { "use_semantic_search": True, "use_hybrid_search": True, "use_graph_search": True, "limit": 30, "kg_search_type": "global", "hybrid_settings": { "semantic_weight": 6.0, "full_text_weight": 2.0, "full_text_limit": 400, "rrf_k": 60, }, }, "rag_generation_config": { "model": "vertex_ai/gemini-2.5-pro", "temperature": 0.7, }, }, "production": { "search_settings": { "use_semantic_search": True, "use_hybrid_search": True, "limit": 10, "hybrid_settings": { "semantic_weight": 5.0, "full_text_weight": 1.0, "full_text_limit": 200, "rrf_k": 50, }, }, "rag_generation_config": { "model": "vertex_ai/gemini-2.5-flash", "temperature": 0.6, }, }, } config = presets.get(preset.lower(), presets["default"]) return { "search_settings": config["search_settings"].copy(), "rag_generation_config": config["rag_generation_config"].copy(), } # Validation functions def validate_limit(limit: int) -> None: """Validate limit parameter (1-100).""" if not 1 <= limit <= 100: raise ValueError("limit must be between 1 and 100") def validate_temperature(temperature: float) -> None: """Validate temperature parameter (0.0-1.0).""" if not 0.0 <= temperature <= 1.0: raise ValueError("temperature must be between 0.0 and 1.0") def validate_semantic_weight(weight: float) -> None: """Validate semantic weight parameter (0.0-10.0).""" if not 0.0 <= weight <= 10.0: raise ValueError("semantic_weight must be between 0.0 and 10.0") def validate_full_text_weight(weight: float) -> None: """Validate full text weight parameter (0.0-10.0).""" if not 0.0 <= weight <= 10.0: raise ValueError("full_text_weight must be between 0.0 and 10.0") def validate_full_text_limit(limit: int) -> None: """Validate full text limit parameter (1-1000).""" if not 1 <= limit <= 1000: raise ValueError("full_text_limit must be between 1 and 1000") def validate_rrf_k(k: int) -> None: """Validate RRF k parameter (1-100).""" if not 1 <= k <= 100: raise ValueError("rrf_k must be between 1 and 100") def validate_kg_search_type(kg_search_type: str) -> None: """Validate knowledge graph search type.""" if kg_search_type not in ["local", "global"]: raise ValueError("kg_search_type must be 'local' or 'global'") # Create FastMCP server try: from fastmcp import Context, FastMCP # type: ignore[import-untyped] from fastmcp.server.middleware import Middleware, MiddlewareContext except Exception as e: raise ImportError( "FastMCP is not installed. Please run `pip install fastmcp`" ) from e # Error Handling Middleware class R2RErrorHandlingMiddleware(Middleware): """Custom error handling middleware for R2R operations.""" def __init__(self): self.logger = logging.getLogger("r2r-errors") self.error_counts = {} async def on_message(self, context: MiddlewareContext, call_next): try: return await call_next(context) except Exception as error: # Track error statistics error_key = f"{type(error).__name__}:{context.method}" self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1 self.logger.error( f"Error in {context.method}: {type(error).__name__}: {error}" ) # Special handling for R2R connection errors if "connection" in str(error).lower() or "refused" in str(error).lower(): raise ConnectionError( f"Failed to connect to R2R server at {R2R_BASE_URL}. " "Please check R2R_BASE_URL and ensure R2R is running." ) from error # Re-raise the original error raise # Initialize FastMCP server mcp = FastMCP("R2R Retrieval System") mcp.add_middleware(R2RErrorHandlingMiddleware()) # Resources @mcp.resource("r2r://config") async def get_r2r_config(ctx: Context) -> str: """Get current R2R MCP server configuration.""" await ctx.info("Retrieving R2R configuration") config = { "r2r_base_url": R2R_BASE_URL, "api_key_configured": bool(API_KEY), "request_id": ctx.request_id, "server_name": "R2R Retrieval System", } return json.dumps(config, indent=2) @mcp.resource("r2r://health") async def check_r2r_health(ctx: Context) -> str: """Check R2R server health and connectivity.""" await ctx.info("Checking R2R server health") try: client = R2RClient(base_url=R2R_BASE_URL) if API_KEY: client.set_api_key(API_KEY) # Simple connectivity check - try to initialize client health_data = { "status": "healthy", "r2r_url": R2R_BASE_URL, "timestamp": ctx.request_id, "api_key_configured": bool(API_KEY), } return json.dumps(health_data, indent=2) except Exception as e: await ctx.error(f"Health check failed: {e!s}") error_data = {"status": "unhealthy", "error": str(e), "r2r_url": R2R_BASE_URL} return json.dumps(error_data, indent=2) # Tools @mcp.tool( annotations={ "title": "R2R Search", "readOnlyHint": True, "idempotentHint": True, "openWorldHint": True, } ) async def search( query: str, ctx: Context, preset: str = "default", use_semantic_search: bool = True, use_hybrid_search: bool = False, use_graph_search: bool = True, limit: int = 10, kg_search_type: Literal["local", "global"] = "local", semantic_weight: float = 5.0, full_text_weight: float = 1.0, full_text_limit: int = 200, rrf_k: int = 50, search_strategy: str | None = "rag_fusion", include_web_search: bool = False, ) -> str: """ Perform comprehensive search on R2R knowledge base with full parameter control. This tool supports semantic search, hybrid search (semantic + full-text), knowledge graph search, and web search. Use presets for common scenarios or customize all parameters manually. Args: query: The search query to find relevant documents. Required. preset: Preset configuration for common use cases. Options: - "default": Basic semantic search, 10 results - "development": Hybrid search optimized for code development, 15 results - "refactoring": Hybrid + graph search for code refactoring, 20 results - "debug": Minimal graph search for debugging, 5 results - "research": Comprehensive search with global graph, 30 results - "production": Balanced hybrid search for production, 10 results use_semantic_search: Enable semantic/vector search (default: True) use_hybrid_search: Enable hybrid search combining semantic and full-text search (default: False) use_graph_search: Enable knowledge graph search for entity/relationship discovery (default: False) limit: Maximum number of results to return. Must be between 1 and 100 (default: 10) kg_search_type: Knowledge graph search type. "local" for local context, "global" for broader connections (default: "local") semantic_weight: Weight for semantic search in hybrid mode. Must be between 0.0 and 10.0 (default: 5.0) full_text_weight: Weight for full-text search in hybrid mode. Must be between 0.0 and 10.0 (default: 1.0) full_text_limit: Maximum full-text results to consider in hybrid search. Must be between 1 and 1000 (default: 200) rrf_k: Reciprocal Rank Fusion parameter for hybrid search. Must be between 1 and 100 (default: 50) search_strategy: Advanced search strategy (e.g., "hyde", "rag_fusion"). Optional. include_web_search: Include web search results from the internet (default: False) Returns: Formatted search results including: - Vector search results (chunks) - Graph search results (entities, relationships, communities) - Web search results (if enabled) - Document search results (local documents with chunks) Examples: # Simple search with default settings search("What is machine learning?") # Development preset for code search search("async function implementation", preset="development") # Custom hybrid search search( "API documentation", use_hybrid_search=True, semantic_weight=7.0, limit=20 ) # Research with knowledge graph search("neural network architectures", preset="research") """ await ctx.info(f"Starting search query: {query}, preset: {preset}") try: # Validate parameters validate_limit(limit) validate_semantic_weight(semantic_weight) validate_full_text_weight(full_text_weight) validate_full_text_limit(full_text_limit) validate_rrf_k(rrf_k) if use_graph_search: validate_kg_search_type(kg_search_type) await ctx.report_progress(progress=10, total=100, message="Initializing client") client = R2RClient(base_url=R2R_BASE_URL) if API_KEY: client.set_api_key(API_KEY) # Get preset configuration and merge with explicit parameters preset_config = get_search_preset_config(preset) # Apply preset values, but allow explicit parameters to override # For boolean flags: if preset enables it, use it unless explicitly disabled # For numeric: use preset if value is default, otherwise use explicit value final_use_hybrid = ( use_hybrid_search if preset == "default" else (use_hybrid_search or preset_config.get("use_hybrid_search", False)) ) final_use_graph = ( use_graph_search if preset == "default" else (use_graph_search or preset_config.get("use_graph_search", False)) ) search_settings: dict[str, Any] = { "use_semantic_search": use_semantic_search, "limit": limit, } # Apply hybrid search settings if final_use_hybrid: search_settings["use_hybrid_search"] = True hybrid_config = preset_config.get("hybrid_settings", {}) search_settings["hybrid_settings"] = { "semantic_weight": semantic_weight if semantic_weight != 5.0 or preset == "default" else hybrid_config.get("semantic_weight", 5.0), "full_text_weight": full_text_weight if full_text_weight != 1.0 or preset == "default" else hybrid_config.get("full_text_weight", 1.0), "full_text_limit": full_text_limit if full_text_limit != 200 or preset == "default" else hybrid_config.get("full_text_limit", 200), "rrf_k": rrf_k if rrf_k != 50 or preset == "default" else hybrid_config.get("rrf_k", 50), } await ctx.info("Hybrid search enabled") # Apply graph search settings if final_use_graph: kg_type = ( kg_search_type if kg_search_type != "local" or preset == "default" else preset_config.get("kg_search_type", "local") ) search_settings["graph_search_settings"] = { "use_graph_search": True, "kg_search_type": kg_type, } await ctx.info(f"Knowledge graph search enabled (type: {kg_type})") # Apply search strategy if provided if search_strategy: search_settings["search_strategy"] = search_strategy await ctx.info(f"Search strategy: {search_strategy}") await ctx.report_progress(progress=30, total=100, message="Executing search") search_response = client.retrieval.search( query=query, search_settings=search_settings ) await ctx.report_progress(progress=80, total=100, message="Formatting results") formatted = format_search_results_for_llm(search_response.results) await ctx.report_progress(progress=100, total=100, message="Complete") await ctx.info( f"Search completed successfully, returned {len(formatted)} chars" ) return formatted except ValueError as e: await ctx.error(f"Validation error: {e!s}") raise except Exception as e: await ctx.error(f"Search failed: {e!s}") raise @mcp.tool( annotations={ "title": "R2R RAG", "readOnlyHint": False, "destructiveHint": False, "openWorldHint": True, } ) async def rag( query: str, ctx: Context, preset: str = "default", model: str = "vertex_ai/gemini-2.5-pro", temperature: float = 0.7, max_tokens: int | None = 8000, use_semantic_search: bool = True, use_hybrid_search: bool = False, use_graph_search: bool = True, limit: int = 100, kg_search_type: Literal["local", "global"] = "global", semantic_weight: float = 5.0, full_text_weight: float = 1.0, full_text_limit: int = 200, rrf_k: int = 50, search_strategy: str | None = None, include_web_search: bool = False, task_prompt_override: str | None = None, ) -> str: """ Perform Retrieval-Augmented Generation (RAG) query with full parameter control. This tool retrieves relevant context from the knowledge base and generates an answer using a language model. Supports all search modes (semantic, hybrid, graph) and customizable generation parameters. Args: query: The question to answer using the knowledge base. Required. preset: Preset configuration for common use cases. Options: - "default": Basic RAG with gpt-4o-mini, temperature 0.7, 10 results - "development": Hybrid search with higher temperature for creative answers, 15 results - "refactoring": Hybrid + graph search with gpt-4o for code analysis, 20 results - "debug": Minimal graph search with low temperature for precise answers, 5 results - "research": Comprehensive search with gpt-4o for research questions, 30 results - "production": Balanced hybrid search optimized for production, 10 results model: LLM model to use for generation. Examples: - "vertex_ai/gemini-2.5-flash" (default, fast and cost-effective) - "vertex_ai/gemini-2.5-pro" (more capable, higher cost) - "openai/gpt-4-turbo" (high performance) - "anthropic/claude-3-haiku-20240307" (fast) - "anthropic/claude-3-sonnet-20240229" (balanced) - "anthropic/claude-3-opus-20240229" (most capable) temperature: Generation temperature controlling randomness. Must be between 0.0 and 1.0. Lower values (0.0-0.3) = more deterministic, precise answers Medium values (0.4-0.7) = balanced creativity and accuracy (default: 0.7) Higher values (0.8-1.0) = more creative, diverse answers max_tokens: Maximum number of tokens to generate. Optional, uses model default if not specified. use_semantic_search: Enable semantic/vector search for retrieval (default: True) use_hybrid_search: Enable hybrid search combining semantic and full-text search (default: False) use_graph_search: Enable knowledge graph search for entity/relationship context (default: False) limit: Maximum number of search results to retrieve. Must be between 1 and 100 (default: 10) kg_search_type: Knowledge graph search type. "local" for local context, "global" for broader connections (default: "local") semantic_weight: Weight for semantic search in hybrid mode. Must be between 0.0 and 10.0 (default: 5.0) full_text_weight: Weight for full-text search in hybrid mode. Must be between 0.0 and 10.0 (default: 1.0) full_text_limit: Maximum full-text results to consider in hybrid search. Must be between 1 and 1000 (default: 200) rrf_k: Reciprocal Rank Fusion parameter for hybrid search. Must be between 1 and 100 (default: 50) search_strategy: Advanced search strategy (e.g., "hyde", "rag_fusion"). Optional. include_web_search: Include web search results from the internet (default: False) task_prompt_override: Custom system prompt to override the default RAG task prompt. Useful for specializing AI behavior for specific domains or tasks. Optional. Returns: Generated answer based on relevant context from the knowledge base. Examples: # Simple RAG query rag("What is machine learning?") # Development preset for code questions rag("How to implement async/await in Python?", preset="development") # Custom RAG with specific model and temperature rag( "Explain neural networks", model="vertex_ai/gemini-2.5-pro", temperature=0.5 ) # Research preset with comprehensive search rag( "Latest developments in transformer architectures", preset="research" ) # Debug preset for precise technical answers rag("What causes this error?", preset="debug") """ await ctx.info(f"RAG query: {query}, preset: {preset}, model: {model}") try: # Validate parameters validate_limit(limit) validate_temperature(temperature) validate_semantic_weight(semantic_weight) validate_full_text_weight(full_text_weight) validate_full_text_limit(full_text_limit) validate_rrf_k(rrf_k) if use_graph_search: validate_kg_search_type(kg_search_type) await ctx.report_progress(progress=10, total=100, message="Initializing RAG") client = R2RClient(base_url=R2R_BASE_URL) if API_KEY: client.set_api_key(API_KEY) # Get preset configuration and merge with explicit parameters preset_config = get_rag_preset_config(preset) # Apply preset values, but allow explicit parameters to override final_use_hybrid = ( use_hybrid_search if preset == "default" else ( use_hybrid_search or preset_config["search_settings"].get("use_hybrid_search", False) ) ) final_use_graph = ( use_graph_search if preset == "default" else ( use_graph_search or preset_config["search_settings"].get("use_graph_search", False) ) ) search_settings: dict[str, Any] = { "use_semantic_search": use_semantic_search, "limit": limit, } # Apply hybrid search settings if final_use_hybrid: search_settings["use_hybrid_search"] = True hybrid_config = preset_config["search_settings"].get("hybrid_settings", {}) search_settings["hybrid_settings"] = { "semantic_weight": semantic_weight if semantic_weight != 5.0 or preset == "default" else hybrid_config.get("semantic_weight", 5.0), "full_text_weight": full_text_weight if full_text_weight != 1.0 or preset == "default" else hybrid_config.get("full_text_weight", 1.0), "full_text_limit": full_text_limit if full_text_limit != 200 or preset == "default" else hybrid_config.get("full_text_limit", 200), "rrf_k": rrf_k if rrf_k != 50 or preset == "default" else hybrid_config.get("rrf_k", 50), } await ctx.info("Hybrid search enabled for RAG") # Apply graph search settings if final_use_graph: kg_type = ( kg_search_type if kg_search_type != "local" or preset == "default" else preset_config["search_settings"].get("kg_search_type", "local") ) search_settings["graph_search_settings"] = { "use_graph_search": True, "kg_search_type": kg_type, } await ctx.info(f"Knowledge graph search enabled (type: {kg_type})") # Apply search strategy if provided if search_strategy: search_settings["search_strategy"] = search_strategy await ctx.info(f"Search strategy: {search_strategy}") # Build RAG generation config rag_model = ( model if model != "vertex_ai/gemini-2.5-flash" else preset_config["rag_generation_config"].get( "model", "vertex_ai/gemini-2.5-flash" ) ) rag_temp = ( temperature if temperature != 0.7 else preset_config["rag_generation_config"].get("temperature", 0.7) ) rag_generation_config: dict[str, Any] = { "model": rag_model, "temperature": rag_temp, "stream": False, } if max_tokens is not None: rag_generation_config["max_tokens"] = max_tokens await ctx.report_progress(progress=30, total=100, message="Retrieving context") try: rag_kwargs: dict[str, Any] = { "query": query, "search_settings": search_settings if search_settings else None, "rag_generation_config": rag_generation_config, "include_web_search": include_web_search, } if task_prompt_override: rag_kwargs["task_prompt"] = task_prompt_override rag_response = client.retrieval.rag(**rag_kwargs) await ctx.report_progress( progress=90, total=100, message="Generating answer" ) answer = rag_response.results.generated_answer # type: ignore await ctx.report_progress(progress=100, total=100, message="Complete") await ctx.info("RAG completed successfully") return answer except Exception as e: await ctx.error(f"RAG generation failed: {e!s}") raise except ValueError as e: await ctx.error(f"Validation error: {e!s}") raise except Exception as e: await ctx.error(f"RAG failed: {e!s}") raise # Create ASGI application for production deployment (Uvicorn, ChatMCP, etc.) app = mcp.http_app() # Run the server if executed directly (for local testing) if __name__ == "__main__": # For local development, use HTTP transport # Accessible at http://localhost:8000/mcp mcp.run()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/evgenygurin/r2r-rag-search-agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server