rag_mcp_server.pyโข14.1 kB
from typing import Annotated, Optional
from pydantic import Field
from fastmcp.tools.tool import ToolResult
from fastmcp.exceptions import ToolError
import json
from utils.rag_interface import knowledge_base_retrieval_interface
from fastmcp import FastMCP, Context
from fastapi.responses import JSONResponse
from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.server.dependencies import get_http_headers
from utils.jwt_util import verify_server_token
from utils.azure_storage import AzureStorageManager
from utils.file_processor import FileProcessor
from rag.system import TutoringRAGSystem
class AuthHeaderMiddleware(Middleware):
async def on_call_tool(self, context: MiddlewareContext, call_next):
print("=" * 80)
print("๐ง MIDDLEWARE: Processing tool call")
headers = get_http_headers()
auth = headers.get("authorization")
print(f"๐ Headers received: {dict(headers)}")
print(f"๐ Auth header present: {bool(auth)}")
is_verified = False
payload = None
if auth and auth.startswith("Bearer "):
token = auth.split("Bearer ")[1].strip()
try:
payload = verify_server_token(token)
is_verified = True
print("โ
Token verified successfully")
except Exception as e:
is_verified = False
print(f"โ Token verification failed: {e}")
else:
print("โ No valid Bearer token found")
context.fastmcp_context.set_state("auth_verified", is_verified)
context.fastmcp_context.set_state("auth_payload", payload)
# Log incoming tool call details
tool_name = context.message.name
tool_args = context.message.arguments or {}
print(f"๐ง Tool name: {tool_name}")
print(f"๐ฆ Tool arguments: {json.dumps(tool_args, indent=2)}")
print(f"๐ Auth verified: {is_verified}")
# CRITICAL FIX: Preprocess query parameter for knowledge_base_retrieval
# Convert array to comma-separated string if the agent sends an array
if tool_name == "knowledge_base_retrieval" and "query" in tool_args:
query_value = tool_args["query"]
# If query is a list/array, convert it to a comma-separated string
if isinstance(query_value, list):
converted_query = ", ".join(str(item) for item in query_value)
# Create a new arguments dict with the converted query
new_args = {**tool_args, "query": converted_query}
# Create a new message with the updated arguments
new_message = context.message.model_copy(update={"arguments": new_args})
# Create a new context with the updated message
context = context.copy(message=new_message)
print(f"โ ๏ธ MIDDLEWARE: Converted query array to string:")
print(f" Original: {query_value}")
print(f" Converted: {converted_query}")
print("=" * 80)
try:
result = await call_next(context)
print(f"โ
Tool execution completed successfully")
return result
except Exception as e:
print(f"โ Tool execution failed: {e}")
print(f" Exception type: {type(e).__name__}")
import traceback
print(f" Traceback: {traceback.format_exc()}")
raise
mcp = FastMCP(
name="TutoringRAGSystemMCPServer",
instructions="""
This TutoringRAGSystem MCP server manages user-scoped knowledge bases for personalized tutoring and learning.
It exposes tools for:
1. Knowledge retrieval from stored learning interactions with citations
2. File uploads with automatic text extraction and storage
3. File management and listing
=== Tool Descriptions ===
- knowledge_base_retrieval:
Retrieve stored knowledge for a specific user with contextual search and document citations.
- upload_student_file:
Upload PDF or DOCX files, automatically extract text, and store in the RAG system.
Supports both file storage in Azure Blob Storage and content indexing for retrieval.
Accepts optional document_title parameter for better citation tracking.
All tools require valid JWT authentication via Bearer token.
""",
)
mcp.add_middleware(AuthHeaderMiddleware())
# Initialize services
azure_storage = AzureStorageManager()
rag_system = TutoringRAGSystem()
file_processor = FileProcessor(rag_system)
@mcp.custom_route("/health", methods=["GET"])
async def health_check(request):
"""Health check endpoint."""
return JSONResponse(
{
"status": "healthy",
"service": "rag_mcp_server",
"tools_available": ["knowledge_base_retrieval", "upload_student_file"],
}
)
@mcp.tool(
name="knowledge_base_retrieval",
description="""
Retrieve personalized knowledge for a specific user from their stored content.
This tool performs a contextual search across a user's private knowledge base
using semantic and keyword-based retrieval. Returns responses with document citations.
IMPORTANT: The 'query' parameter must be a single string, NOT a list or array.
Use comma or space separated keywords in one string.
Example: "calculus derivatives, power rule, differentiation"
NOT: ["calculus", "derivatives", "power rule"]
""",
)
def knowledge_base_retrieval(
ctx: Context,
user_id: Annotated[
str, Field(description="Unique ID of the user whose content is being searched.")
],
query: Annotated[
str, # MUST be a string - arrays/lists will be rejected
Field(
description="A SINGLE STRING containing natural language search query or comma/space-separated keywords. CRITICAL: This MUST be a string, NOT an array or list. Example: 'photosynthesis chlorophyll light reaction' NOT ['photosynthesis', 'chlorophyll']. Use comma or space separated keywords in a single string.",
json_schema_extra={
"type": "string",
"examples": [
"quadratic equation, quadratic formula, solving polynomials",
"photosynthesis process light reaction chlorophyll",
"World War II causes treaty of versailles"
]
}
),
],
subject: Annotated[
str, Field(description="Subject of the query (e.g., 'Mathematics', 'History').")
],
topic: Annotated[
str,
Field(
description="Topic within the subject (e.g., 'Algebra', 'World War II')."
),
],
top_k: Annotated[
int, Field(description="Number of results to return. Default=3")
] = 3,
) -> ToolResult:
"""
MCP tool wrapper around the RAG retrieval system for user-specific context.
"""
import traceback
try:
print(f"=" * 80)
print(f"๐ knowledge_base_retrieval called")
print(f" user_id: {user_id}")
print(f" query: {query} (type: {type(query).__name__})")
print(f" subject: {subject}")
print(f" topic: {topic}")
print(f" top_k: {top_k}")
print(f"=" * 80)
# Validate query is a string
if not isinstance(query, str):
raise ValueError(f"Query must be a string, not {type(query).__name__}. Use comma or space separated keywords in a single string.")
# Check authentication
if not ctx.get_state("auth_verified"):
return ToolResult(
content=json.dumps(
{
"status": "error",
"message": "Unauthorized: token verification failed.",
}
)
)
print("Authentication verified")
print("Calling knowledge_base_retrieval_interface...")
# Call the RAG interface
result = knowledge_base_retrieval_interface(
student_id=user_id,
current_question=query,
subject=subject,
topic=topic,
context_limit=top_k,
)
print(f"RAG interface returned result")
print(f" Result type: {type(result)}")
print(f" Result preview: {str(result)[:200]}...")
# Result should be a string
if not isinstance(result, str):
raise ValueError(f"Expected string result, got {type(result)}")
return ToolResult(
content=json.dumps(
{
"status": "success",
"user_id": user_id,
"query": query,
"response": result,
}
)
)
except Exception as e:
error_details = traceback.format_exc()
print(f"โ ERROR in knowledge_base_retrieval:")
print(error_details)
raise ToolError(
f"Failed to retrieve knowledge: {str(e)}\n\nDetails:\n{error_details}"
)
@mcp.tool(
name="upload_student_file",
description="""
Upload a PDF or DOCX file, extract text content, and store in both Azure Blob Storage
and the RAG system for future retrieval. Accepts optional document_title for citations.
""",
)
def process_existing_file(
ctx: Context,
user_id: Annotated[str, Field(description="Unique ID of the student.")],
filename: Annotated[
str,
Field(
description="Original filename with extension (e.g., 'notes.pdf', 'chapter1.docx')"
),
],
file_id: Annotated[
str, Field(description="The blob name (file_id) returned from upload endpoint")
],
subject: Annotated[
str, Field(description="Subject category (e.g., 'Mathematics', 'History')")
],
topic: Annotated[
Optional[str], Field(description="Optional specific topic within the subject")
] = None,
difficulty_level: Annotated[
int, Field(description="Difficulty level 1-10 (default: 5)")
] = 5,
description: Annotated[
Optional[str], Field(description="Optional description of the file content")
] = None,
document_title: Annotated[
Optional[str],
Field(
description="Optional custom document title for citations (defaults to filename)"
),
] = None,
) -> ToolResult:
"""
Download the file from Azure and index it into the RAG system with citation metadata.
"""
print("=" * 80)
print("๐ UPLOAD_STUDENT_FILE: Starting file processing")
print(f" user_id: {user_id}")
print(f" filename: {filename}")
print(f" file_id: {file_id}")
print(f" subject: {subject}")
print(f" topic: {topic}")
print(f" difficulty_level: {difficulty_level}")
print(f" document_title: {document_title}")
print(f" description: {description}")
print("=" * 80)
try:
if not ctx.get_state("auth_verified"):
print("โ Auth verification failed in tool")
return ToolResult(
content=json.dumps(
{
"status": "error",
"message": "Unauthorized: token verification failed.",
}
)
)
print("โ
Auth verified in tool")
file_extension = filename.lower().split(".")[-1]
print(f"๐ File extension: {file_extension}")
if file_extension not in ["pdf", "docx", "doc"]:
error_msg = f"Unsupported file type: {file_extension}. Only PDF and DOCX files are supported."
print(f"โ {error_msg}")
raise ToolError(error_msg)
print(f"๐ Downloading file from Azure: {file_id}")
file_content = azure_storage.download_file(file_id)
if file_content is None:
error_msg = f"File not found in storage: {file_id}"
print(f"โ {error_msg}")
raise ToolError(error_msg)
print(f"โ
File downloaded successfully ({len(file_content)} bytes)")
metadata = {}
if description:
metadata["description"] = description
processing_result = file_processor.process_and_store_file(
file_content=file_content,
filename=filename,
student_id=user_id,
subject=subject,
topic=topic,
difficulty_level=difficulty_level,
document_title=document_title,
additional_metadata={
"blob_name": file_id,
"description": description,
**metadata,
},
)
if processing_result["status"] != "success":
return ToolResult(
content=json.dumps(
{
"status": "error",
"message": "Text extraction and indexing failed",
"error": processing_result.get("message"),
}
)
)
return ToolResult(
content=json.dumps(
{
"status": "success",
"message": f"File '{filename}' processed and indexed successfully",
"file_id": file_id,
"document_title": processing_result.get("document_title", filename),
"detected_subject": processing_result.get("detected_subject", subject),
"processing_info": {
"total_characters": processing_result["total_characters"],
"chunks_stored": processing_result["chunks_stored"],
"extraction_metadata": processing_result.get("metadata", {}),
},
}
)
)
except Exception as e:
raise ToolError(f"Failed to process file: {str(e)}")
if __name__ == "__main__":
import os
port = int(os.getenv("PORT", 9000))
mcp.run(
transport="http",
host="0.0.0.0",
port=port,
log_level="DEBUG",
)