Skip to main content
Glama

AI Tutoring RAG System

rag_mcp_server.pyโ€ข10.7 kB
from typing import Annotated, Optional from pydantic import Field from fastmcp.tools.tool import ToolResult from fastmcp.exceptions import ToolError import json from utils.rag_interface import knowledge_base_retrieval_interface from fastmcp import FastMCP, Context from fastapi.responses import JSONResponse from fastmcp.server.middleware import Middleware, MiddlewareContext from fastmcp.server.dependencies import get_http_headers from utils.jwt_util import verify_server_token from utils.azure_storage import AzureStorageManager from utils.file_processor import FileProcessor from rag.system import TutoringRAGSystem class AuthHeaderMiddleware(Middleware): async def on_call_tool(self, context: MiddlewareContext, call_next): headers = get_http_headers() auth = headers.get("authorization") is_verified = False payload = None if auth and auth.startswith("Bearer "): token = auth.split("Bearer ")[1].strip() try: payload = verify_server_token(token) is_verified = True except Exception: is_verified = False context.fastmcp_context.set_state("auth_verified", is_verified) context.fastmcp_context.set_state("auth_payload", payload) return await call_next(context) mcp = FastMCP( name="TutoringRAGSystemMCPServer", instructions=""" This TutoringRAGSystem MCP server manages user-scoped knowledge bases for personalized tutoring and learning. It exposes tools for: 1. Knowledge retrieval from stored learning interactions with citations 2. File uploads with automatic text extraction and storage 3. File management and listing === Tool Descriptions === - knowledge_base_retrieval: Retrieve stored knowledge for a specific user with contextual search and document citations. - upload_student_file: Upload PDF or DOCX files, automatically extract text, and store in the RAG system. Supports both file storage in Azure Blob Storage and content indexing for retrieval. Accepts optional document_title parameter for better citation tracking. All tools require valid JWT authentication via Bearer token. """, ) mcp.add_middleware(AuthHeaderMiddleware()) # Initialize services azure_storage = AzureStorageManager() rag_system = TutoringRAGSystem() file_processor = FileProcessor(rag_system) @mcp.custom_route("/health", methods=["GET"]) async def health_check(request): """Health check endpoint.""" return JSONResponse( { "status": "healthy", "service": "rag_mcp_server", "tools_available": ["knowledge_base_retrieval", "upload_student_file"], } ) @mcp.tool( name="knowledge_base_retrieval", description=""" Retrieve personalized knowledge for a specific user from their stored content. This tool performs a contextual search across a user's private knowledge base using semantic and keyword-based retrieval. Returns responses with document citations. IMPORTANT: The 'query' parameter must be a single string, NOT a list or array. Use comma or space separated keywords in one string. Example: "calculus derivatives, power rule, differentiation" NOT: ["calculus", "derivatives", "power rule"] """, ) def knowledge_base_retrieval( ctx: Context, user_id: Annotated[ str, Field(description="Unique ID of the user whose content is being searched.") ], query: Annotated[ str | list[str], # Accept both string and list, we'll convert list to string Field( description="A SINGLE STRING containing natural language search query or comma/space-separated keywords. DO NOT pass an array or list. Example: 'photosynthesis chlorophyll light reaction' NOT ['photosynthesis', 'chlorophyll']" ), ], subject: Annotated[ str, Field(description="Subject of the query (e.g., 'Mathematics', 'History').") ], topic: Annotated[ str, Field( description="Topic within the subject (e.g., 'Algebra', 'World War II')." ), ], top_k: Annotated[ int, Field(description="Number of results to return. Default=3") ] = 3, ) -> ToolResult: """ MCP tool wrapper around the RAG retrieval system for user-specific context. """ import traceback try: print(f"=" * 80) print(f"๐Ÿ” knowledge_base_retrieval called") print(f" user_id: {user_id}") print(f" query (before conversion): {query} (type: {type(query).__name__})") # Auto-convert list to string if needed (for AI models that ignore type hints) if isinstance(query, list): query = " ".join(query) print(f" โš ๏ธ Auto-converted query from list to string: {query}") print(f" query (after conversion): {query}") print(f" subject: {subject}") print(f" topic: {topic}") print(f" top_k: {top_k}") print(f"=" * 80) # Check authentication if not ctx.get_state("auth_verified"): return ToolResult( content=json.dumps( { "status": "error", "message": "Unauthorized: token verification failed.", } ) ) print("Authentication verified") print("Calling knowledge_base_retrieval_interface...") # Call the RAG interface result = knowledge_base_retrieval_interface( student_id=user_id, current_question=query, subject=subject, topic=topic, context_limit=top_k, ) print(f"RAG interface returned result") print(f" Result type: {type(result)}") print(f" Result preview: {str(result)[:200]}...") # Result should be a string if not isinstance(result, str): raise ValueError(f"Expected string result, got {type(result)}") return ToolResult( content=json.dumps( { "status": "success", "user_id": user_id, "query": query, "response": result, } ) ) except Exception as e: error_details = traceback.format_exc() print(f"โŒ ERROR in knowledge_base_retrieval:") print(error_details) raise ToolError( f"Failed to retrieve knowledge: {str(e)}\n\nDetails:\n{error_details}" ) @mcp.tool( name="upload_student_file", description=""" Upload a PDF or DOCX file, extract text content, and store in both Azure Blob Storage and the RAG system for future retrieval. Accepts optional document_title for citations. """, ) def process_existing_file( ctx: Context, user_id: Annotated[str, Field(description="Unique ID of the student.")], filename: Annotated[ str, Field( description="Original filename with extension (e.g., 'notes.pdf', 'chapter1.docx')" ), ], file_id: Annotated[ str, Field(description="The blob name (file_id) returned from upload endpoint") ], subject: Annotated[ str, Field(description="Subject category (e.g., 'Mathematics', 'History')") ], topic: Annotated[ Optional[str], Field(description="Optional specific topic within the subject") ] = None, difficulty_level: Annotated[ int, Field(description="Difficulty level 1-10 (default: 5)") ] = 5, description: Annotated[ Optional[str], Field(description="Optional description of the file content") ] = None, document_title: Annotated[ Optional[str], Field( description="Optional custom document title for citations (defaults to filename)" ), ] = None, ) -> ToolResult: """ Download the file from Azure and index it into the RAG system with citation metadata. """ try: if not ctx.get_state("auth_verified"): return ToolResult( content=json.dumps( { "status": "error", "message": "Unauthorized: token verification failed.", } ) ) file_extension = filename.lower().split(".")[-1] if file_extension not in ["pdf", "docx", "doc"]: raise ToolError( f"Unsupported file type: {file_extension}. Only PDF and DOCX files are supported." ) file_content = azure_storage.download_file(file_id) if file_content is None: raise ToolError(f"File not found in storage: {file_id}") metadata = {} if description: metadata["description"] = description processing_result = file_processor.process_and_store_file( file_content=file_content, filename=filename, student_id=user_id, subject=subject, topic=topic, difficulty_level=difficulty_level, document_title=document_title, additional_metadata={ "blob_name": file_id, "description": description, **metadata, }, ) if processing_result["status"] != "success": return ToolResult( content=json.dumps( { "status": "error", "message": "Text extraction and indexing failed", "error": processing_result.get("message"), } ) ) return ToolResult( content=json.dumps( { "status": "success", "message": f"File '{filename}' processed and indexed successfully", "file_id": file_id, "document_title": processing_result.get( "document_title", filename ), "processing_info": { "total_characters": processing_result["total_characters"], "chunks_stored": processing_result["chunks_stored"], "extraction_metadata": processing_result.get("metadata", {}), }, } ) ) except Exception as e: raise ToolError(f"Failed to process file: {str(e)}") if __name__ == "__main__": mcp.run( transport="http", host="0.0.0.0", port=9000, log_level="DEBUG", )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Chukwuebuka-2003/ebuka_mcps'

If you have feedback or need assistance with the MCP directory API, please join our Discord server