Skip to main content
Glama
rag_mcp_server.pyโ€ข14.1 kB
from typing import Annotated, Optional from pydantic import Field from fastmcp.tools.tool import ToolResult from fastmcp.exceptions import ToolError import json from utils.rag_interface import knowledge_base_retrieval_interface from fastmcp import FastMCP, Context from fastapi.responses import JSONResponse from fastmcp.server.middleware import Middleware, MiddlewareContext from fastmcp.server.dependencies import get_http_headers from utils.jwt_util import verify_server_token from utils.azure_storage import AzureStorageManager from utils.file_processor import FileProcessor from rag.system import TutoringRAGSystem class AuthHeaderMiddleware(Middleware): async def on_call_tool(self, context: MiddlewareContext, call_next): print("=" * 80) print("๐Ÿ”ง MIDDLEWARE: Processing tool call") headers = get_http_headers() auth = headers.get("authorization") print(f"๐Ÿ“‹ Headers received: {dict(headers)}") print(f"๐Ÿ”‘ Auth header present: {bool(auth)}") is_verified = False payload = None if auth and auth.startswith("Bearer "): token = auth.split("Bearer ")[1].strip() try: payload = verify_server_token(token) is_verified = True print("โœ… Token verified successfully") except Exception as e: is_verified = False print(f"โŒ Token verification failed: {e}") else: print("โŒ No valid Bearer token found") context.fastmcp_context.set_state("auth_verified", is_verified) context.fastmcp_context.set_state("auth_payload", payload) # Log incoming tool call details tool_name = context.message.name tool_args = context.message.arguments or {} print(f"๐Ÿ”ง Tool name: {tool_name}") print(f"๐Ÿ“ฆ Tool arguments: {json.dumps(tool_args, indent=2)}") print(f"๐Ÿ”’ Auth verified: {is_verified}") # CRITICAL FIX: Preprocess query parameter for knowledge_base_retrieval # Convert array to comma-separated string if the agent sends an array if tool_name == "knowledge_base_retrieval" and "query" in tool_args: query_value = tool_args["query"] # If query is a list/array, convert it to a comma-separated string if isinstance(query_value, list): converted_query = ", ".join(str(item) for item in query_value) # Create a new arguments dict with the converted query new_args = {**tool_args, "query": converted_query} # Create a new message with the updated arguments new_message = context.message.model_copy(update={"arguments": new_args}) # Create a new context with the updated message context = context.copy(message=new_message) print(f"โš ๏ธ MIDDLEWARE: Converted query array to string:") print(f" Original: {query_value}") print(f" Converted: {converted_query}") print("=" * 80) try: result = await call_next(context) print(f"โœ… Tool execution completed successfully") return result except Exception as e: print(f"โŒ Tool execution failed: {e}") print(f" Exception type: {type(e).__name__}") import traceback print(f" Traceback: {traceback.format_exc()}") raise mcp = FastMCP( name="TutoringRAGSystemMCPServer", instructions=""" This TutoringRAGSystem MCP server manages user-scoped knowledge bases for personalized tutoring and learning. It exposes tools for: 1. Knowledge retrieval from stored learning interactions with citations 2. File uploads with automatic text extraction and storage 3. File management and listing === Tool Descriptions === - knowledge_base_retrieval: Retrieve stored knowledge for a specific user with contextual search and document citations. - upload_student_file: Upload PDF or DOCX files, automatically extract text, and store in the RAG system. Supports both file storage in Azure Blob Storage and content indexing for retrieval. Accepts optional document_title parameter for better citation tracking. All tools require valid JWT authentication via Bearer token. """, ) mcp.add_middleware(AuthHeaderMiddleware()) # Initialize services azure_storage = AzureStorageManager() rag_system = TutoringRAGSystem() file_processor = FileProcessor(rag_system) @mcp.custom_route("/health", methods=["GET"]) async def health_check(request): """Health check endpoint.""" return JSONResponse( { "status": "healthy", "service": "rag_mcp_server", "tools_available": ["knowledge_base_retrieval", "upload_student_file"], } ) @mcp.tool( name="knowledge_base_retrieval", description=""" Retrieve personalized knowledge for a specific user from their stored content. This tool performs a contextual search across a user's private knowledge base using semantic and keyword-based retrieval. Returns responses with document citations. IMPORTANT: The 'query' parameter must be a single string, NOT a list or array. Use comma or space separated keywords in one string. Example: "calculus derivatives, power rule, differentiation" NOT: ["calculus", "derivatives", "power rule"] """, ) def knowledge_base_retrieval( ctx: Context, user_id: Annotated[ str, Field(description="Unique ID of the user whose content is being searched.") ], query: Annotated[ str, # MUST be a string - arrays/lists will be rejected Field( description="A SINGLE STRING containing natural language search query or comma/space-separated keywords. CRITICAL: This MUST be a string, NOT an array or list. Example: 'photosynthesis chlorophyll light reaction' NOT ['photosynthesis', 'chlorophyll']. Use comma or space separated keywords in a single string.", json_schema_extra={ "type": "string", "examples": [ "quadratic equation, quadratic formula, solving polynomials", "photosynthesis process light reaction chlorophyll", "World War II causes treaty of versailles" ] } ), ], subject: Annotated[ str, Field(description="Subject of the query (e.g., 'Mathematics', 'History').") ], topic: Annotated[ str, Field( description="Topic within the subject (e.g., 'Algebra', 'World War II')." ), ], top_k: Annotated[ int, Field(description="Number of results to return. Default=3") ] = 3, ) -> ToolResult: """ MCP tool wrapper around the RAG retrieval system for user-specific context. """ import traceback try: print(f"=" * 80) print(f"๐Ÿ” knowledge_base_retrieval called") print(f" user_id: {user_id}") print(f" query: {query} (type: {type(query).__name__})") print(f" subject: {subject}") print(f" topic: {topic}") print(f" top_k: {top_k}") print(f"=" * 80) # Validate query is a string if not isinstance(query, str): raise ValueError(f"Query must be a string, not {type(query).__name__}. Use comma or space separated keywords in a single string.") # Check authentication if not ctx.get_state("auth_verified"): return ToolResult( content=json.dumps( { "status": "error", "message": "Unauthorized: token verification failed.", } ) ) print("Authentication verified") print("Calling knowledge_base_retrieval_interface...") # Call the RAG interface result = knowledge_base_retrieval_interface( student_id=user_id, current_question=query, subject=subject, topic=topic, context_limit=top_k, ) print(f"RAG interface returned result") print(f" Result type: {type(result)}") print(f" Result preview: {str(result)[:200]}...") # Result should be a string if not isinstance(result, str): raise ValueError(f"Expected string result, got {type(result)}") return ToolResult( content=json.dumps( { "status": "success", "user_id": user_id, "query": query, "response": result, } ) ) except Exception as e: error_details = traceback.format_exc() print(f"โŒ ERROR in knowledge_base_retrieval:") print(error_details) raise ToolError( f"Failed to retrieve knowledge: {str(e)}\n\nDetails:\n{error_details}" ) @mcp.tool( name="upload_student_file", description=""" Upload a PDF or DOCX file, extract text content, and store in both Azure Blob Storage and the RAG system for future retrieval. Accepts optional document_title for citations. """, ) def process_existing_file( ctx: Context, user_id: Annotated[str, Field(description="Unique ID of the student.")], filename: Annotated[ str, Field( description="Original filename with extension (e.g., 'notes.pdf', 'chapter1.docx')" ), ], file_id: Annotated[ str, Field(description="The blob name (file_id) returned from upload endpoint") ], subject: Annotated[ str, Field(description="Subject category (e.g., 'Mathematics', 'History')") ], topic: Annotated[ Optional[str], Field(description="Optional specific topic within the subject") ] = None, difficulty_level: Annotated[ int, Field(description="Difficulty level 1-10 (default: 5)") ] = 5, description: Annotated[ Optional[str], Field(description="Optional description of the file content") ] = None, document_title: Annotated[ Optional[str], Field( description="Optional custom document title for citations (defaults to filename)" ), ] = None, ) -> ToolResult: """ Download the file from Azure and index it into the RAG system with citation metadata. """ print("=" * 80) print("๐Ÿ“ UPLOAD_STUDENT_FILE: Starting file processing") print(f" user_id: {user_id}") print(f" filename: {filename}") print(f" file_id: {file_id}") print(f" subject: {subject}") print(f" topic: {topic}") print(f" difficulty_level: {difficulty_level}") print(f" document_title: {document_title}") print(f" description: {description}") print("=" * 80) try: if not ctx.get_state("auth_verified"): print("โŒ Auth verification failed in tool") return ToolResult( content=json.dumps( { "status": "error", "message": "Unauthorized: token verification failed.", } ) ) print("โœ… Auth verified in tool") file_extension = filename.lower().split(".")[-1] print(f"๐Ÿ“„ File extension: {file_extension}") if file_extension not in ["pdf", "docx", "doc"]: error_msg = f"Unsupported file type: {file_extension}. Only PDF and DOCX files are supported." print(f"โŒ {error_msg}") raise ToolError(error_msg) print(f"๐Ÿ”„ Downloading file from Azure: {file_id}") file_content = azure_storage.download_file(file_id) if file_content is None: error_msg = f"File not found in storage: {file_id}" print(f"โŒ {error_msg}") raise ToolError(error_msg) print(f"โœ… File downloaded successfully ({len(file_content)} bytes)") metadata = {} if description: metadata["description"] = description processing_result = file_processor.process_and_store_file( file_content=file_content, filename=filename, student_id=user_id, subject=subject, topic=topic, difficulty_level=difficulty_level, document_title=document_title, additional_metadata={ "blob_name": file_id, "description": description, **metadata, }, ) if processing_result["status"] != "success": return ToolResult( content=json.dumps( { "status": "error", "message": "Text extraction and indexing failed", "error": processing_result.get("message"), } ) ) return ToolResult( content=json.dumps( { "status": "success", "message": f"File '{filename}' processed and indexed successfully", "file_id": file_id, "document_title": processing_result.get("document_title", filename), "detected_subject": processing_result.get("detected_subject", subject), "processing_info": { "total_characters": processing_result["total_characters"], "chunks_stored": processing_result["chunks_stored"], "extraction_metadata": processing_result.get("metadata", {}), }, } ) ) except Exception as e: raise ToolError(f"Failed to process file: {str(e)}") if __name__ == "__main__": import os port = int(os.getenv("PORT", 9000)) mcp.run( transport="http", host="0.0.0.0", port=port, log_level="DEBUG", )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Chukwuebuka-2003/ebuka_mcps'

If you have feedback or need assistance with the MCP directory API, please join our Discord server