Skip to main content
Glama
http_server.py27.4 kB
""" FastAPI HTTP MCP Server for Personal Assistant This implements a cloud-native HTTP MCP server with: - Multi-tenancy support - OAuth authentication - Intelligent data retrieval with vector search - PostgreSQL + pgvector integration """ import asyncio import json import logging from typing import Any, Dict, List, Optional, Sequence from datetime import datetime, timedelta import uuid from fastapi import FastAPI, HTTPException, Depends, Request, status from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel import uvicorn from contextlib import asynccontextmanager from mcp.server.models import InitializeRequest from mcp.server.session import ServerSession from mcp.server.stdio import stdio_server from mcp.types import ( CallToolRequest, CallToolResult, ListToolsRequest, ListToolsResult, Tool, ) from .http_config import Config, get_config from .database_interface import DatabaseInterface from .database_factory import DatabaseFactory from .models import Project, Todo, CalendarEvent, StatusEntry, PersonalData from .document_manager import DocumentManager from .auth_service import AuthService, UserContext, create_auth_service, AuthenticationError, AuthorizationError from .embedding_service import get_embedding_service, generate_content_embedding from .intelligent_retrieval import IntelligentRetrievalService # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Security security = HTTPBearer() class MCPRequest(BaseModel): method: str params: Optional[Dict[str, Any]] = None class MCPResponse(BaseModel): result: Optional[Any] = None error: Optional[Dict[str, Any]] = None class PersonalAssistantHTTPServer: """HTTP MCP Server for Personal Assistant with multi-tenancy and vector search""" def __init__(self, config: Config): self.config = config self.db_interfaces: Dict[str, DatabaseInterface] = {} self.document_managers: Dict[str, DocumentManager] = {} self.intelligent_retrievers: Dict[str, IntelligentRetrievalService] = {} self.auth_service = create_auth_service(config.auth) self.embedding_service = get_embedding_service( provider=config.vector_search.provider, model=config.vector_search.model ) # Create FastAPI app with lifespan self.app = FastAPI( title="MCP Personal Assistant Server", description="HTTP MCP Server with multi-tenancy and vector search", version="2.0.0", lifespan=self.lifespan ) self._setup_middleware() self._setup_routes() @asynccontextmanager async def lifespan(self, app: FastAPI): """Manage application lifecycle""" logger.info("Starting HTTP MCP Server...") yield logger.info("Shutting down HTTP MCP Server...") # Clean up database connections for db_interface in self.db_interfaces.values(): await db_interface.close() def _setup_middleware(self): """Setup FastAPI middleware""" self.app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure based on your needs allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @self.app.middleware("http") async def authenticate_request(request: Request, call_next): """Authentication middleware""" # Skip auth for health checks and docs if request.url.path in ["/health", "/docs", "/openapi.json", "/metrics"]: return await call_next(request) # Skip auth if disabled if not self.config.auth.enabled: # Create a mock user context for development request.state.user = UserContext( user_id="dev_user", email="dev@example.com", tenant_id="development" ) return await call_next(request) # Extract and validate token auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Bearer "): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing or invalid authorization header" ) token = auth_header[7:] # Remove "Bearer " prefix try: user_context = await self.auth_service.authenticate(token) request.state.user = user_context return await call_next(request) except AuthenticationError as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e) ) except Exception as e: logger.error(f"Authentication error: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Authentication service error" ) async def _get_user_database(self, tenant_id: str) -> DatabaseInterface: """Get or create database interface for tenant with proper isolation""" if tenant_id not in self.db_interfaces: # Create tenant-specific database configuration tenant_config = self._create_tenant_config(tenant_id) self.db_interfaces[tenant_id] = await DatabaseFactory.create_database(tenant_config) # Initialize document manager for tenant self.document_managers[tenant_id] = DocumentManager( self.db_interfaces[tenant_id], f"documents_{tenant_id}" ) # Initialize intelligent retrieval service for tenant self.intelligent_retrievers[tenant_id] = IntelligentRetrievalService( self.db_interfaces[tenant_id], self.embedding_service ) logger.info(f"Initialized database for tenant: {tenant_id}") return self.db_interfaces[tenant_id] async def _get_intelligent_retriever(self, tenant_id: str) -> IntelligentRetrievalService: """Get intelligent retrieval service for tenant""" # Ensure database is initialized (which also initializes the retriever) await self._get_user_database(tenant_id) return self.intelligent_retrievers[tenant_id] def _create_tenant_config(self, tenant_id: str) -> Config: """Create tenant-specific configuration with proper isolation""" if self.config.database_type == "postgresql": # Use schema-based isolation for PostgreSQL connection_string = self.config.pgvector_connection_string # Append schema to connection string if not already present if "?options=" not in connection_string: connection_string += f"?options=-c search_path=tenant_{tenant_id},public" else: connection_string += f",-c search_path=tenant_{tenant_id},public" tenant_config = Config( database_type="postgresql", pgvector_connection_string=connection_string ) # Copy other configuration tenant_config.vector_search = self.config.vector_search tenant_config.features = self.config.features else: # File-based isolation for SQLite/TinyDB base_path = self.config.database_path if "." in base_path: name, ext = base_path.rsplit(".", 1) tenant_path = f"{name}_{tenant_id}.{ext}" else: tenant_path = f"{base_path}_{tenant_id}" tenant_config = Config( database_type=self.config.database_type, database_path=tenant_path ) # Copy other configuration tenant_config.vector_search = self.config.vector_search tenant_config.features = self.config.features return tenant_config def _setup_routes(self): """Setup FastAPI routes""" @self.app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "timestamp": datetime.utcnow()} @self.app.post("/mcp/initialize") async def initialize_mcp(request: Request): """Initialize MCP session""" user: UserContext = request.state.user return { "protocolVersion": "2024-11-05", "capabilities": { "tools": {}, "resources": {}, "prompts": {} }, "serverInfo": { "name": "personal-assistant-http", "version": "2.0.0" } } @self.app.post("/mcp/tools/list") async def list_tools(request: Request): """List available MCP tools""" return { "tools": [ { "name": "get_dashboard", "description": "Get intelligent dashboard with context-aware filtering", "inputSchema": { "type": "object", "properties": { "query": {"type": "string", "description": "Search query for context filtering"} } } }, { "name": "add_project", "description": "Add a new project with vector embeddings", "inputSchema": { "type": "object", "properties": { "name": {"type": "string"}, "description": {"type": "string"}, "priority": {"type": "string", "enum": ["low", "medium", "high"]}, "tags": {"type": "array", "items": {"type": "string"}} }, "required": ["name", "description"] } }, { "name": "semantic_search", "description": "Perform semantic search across projects, todos, and documents", "inputSchema": { "type": "object", "properties": { "query": {"type": "string"}, "limit": {"type": "integer", "default": 5}, "types": {"type": "array", "items": {"type": "string"}} }, "required": ["query"] } } ] } @self.app.post("/mcp/tools/call") async def call_tool(request: Request, body: MCPRequest): """Call MCP tool""" user: UserContext = request.state.user db = await self._get_user_database(user.tenant_id) tool_name = body.params.get("name") if body.params else None arguments = body.params.get("arguments", {}) if body.params else {} try: if tool_name == "get_dashboard": result = await self._get_intelligent_dashboard(db, user, arguments) elif tool_name == "add_project": result = await self._add_project_with_embeddings(db, user, arguments) elif tool_name == "semantic_search": result = await self._semantic_search(db, user, arguments) else: raise HTTPException( status_code=400, detail=f"Unknown tool: {tool_name}" ) return {"content": [{"type": "text", "text": json.dumps(result, default=str)}]} except Exception as e: logger.error(f"Error calling tool {tool_name}: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) async def _get_intelligent_dashboard(self, db: DatabaseInterface, user: UserContext, args: Dict[str, Any]) -> Dict[str, Any]: """Get intelligent dashboard with RAG-based context-aware filtering""" query = args.get("query", "") # Get intelligent retrieval service for user's tenant retriever = await self._get_intelligent_retriever(user.tenant_id) if query: # Use intelligent retrieval for context-aware results search_results = await retriever.search( user_id=user.user_id, tenant_id=user.tenant_id, query=query, content_types=["projects", "todos", "events"], max_results=15, intent="status_update" # Dashboard context ) # Organize results by type relevant_projects = [r for r in search_results["results"] if r["content_type"] == "project"] relevant_todos = [r for r in search_results["results"] if r["content_type"] == "todo"] relevant_events = [r for r in search_results["results"] if r["content_type"] == "event"] return { "type": "intelligent_dashboard", "query": query, "context_analysis": search_results["context"], "relevant_projects": relevant_projects[:5], "priority_todos": relevant_todos[:5], "upcoming_events": relevant_events[:3], "insights": self._generate_dashboard_insights(search_results["results"]), "suggestions": await self._generate_contextual_suggestions(user, query, search_results["results"]), "retrieval_metadata": search_results["retrieval_metadata"] } else: # Smart default dashboard - show contextually relevant items without explicit query now = datetime.now() # Get current work context using intelligent retrieval work_context_results = await retriever.search( user_id=user.user_id, tenant_id=user.tenant_id, query="current work status progress today", content_types=["projects", "todos", "events"], max_results=20, time_scope="today", intent="status_update" ) # Get basic counts for overview all_projects = await db.get_projects() all_todos = await db.get_todos() upcoming_events = await db.get_calendar_events( start_date=now, end_date=now + timedelta(days=7) ) # Separate results by type from context-aware search contextual_projects = [r for r in work_context_results["results"] if r["content_type"] == "project"] contextual_todos = [r for r in work_context_results["results"] if r["content_type"] == "todo"] contextual_events = [r for r in work_context_results["results"] if r["content_type"] == "event"] return { "type": "smart_dashboard", "overview": { "total_projects": len(all_projects), "active_projects": len([p for p in all_projects if p.status == "active"]), "total_todos": len(all_todos), "pending_todos": len([t for t in all_todos if not t.completed]), "completed_today": len([t for t in all_todos if t.completed and t.updated_date and t.updated_date.date() == now.date()]), "upcoming_events": len(upcoming_events) }, "current_focus": { "active_projects": contextual_projects[:3], "priority_todos": contextual_todos[:5], "today_events": contextual_events[:3] }, "insights": self._generate_dashboard_insights(work_context_results["results"]), "suggestions": await self._generate_contextual_suggestions(user, "", work_context_results["results"]), "context_metadata": work_context_results["retrieval_metadata"] } def _generate_dashboard_insights(self, results: List[Dict[str, Any]]) -> List[str]: """Generate insights from dashboard results using simple heuristics""" insights = [] # Count by content type project_count = len([r for r in results if r["content_type"] == "project"]) todo_count = len([r for r in results if r["content_type"] == "todo"]) event_count = len([r for r in results if r["content_type"] == "event"]) # High priority items high_priority = len([r for r in results if r.get("metadata", {}).get("priority") == "high"]) if project_count > todo_count * 2: insights.append("You have many active projects - consider focusing on specific tasks") if high_priority > 3: insights.append(f"You have {high_priority} high-priority items requiring attention") if event_count > 5: insights.append("Your schedule is quite busy - consider time blocking for deep work") # Completion insights completed_todos = len([r for r in results if r.get("metadata", {}).get("completed") is True]) if completed_todos > 0: insights.append(f"Great progress! You've completed {completed_todos} tasks recently") if not insights: insights.append("Your workspace is well-organized - keep up the good work!") return insights async def _generate_contextual_suggestions(self, user: UserContext, query: str, results: List[Dict[str, Any]]) -> List[str]: """Generate contextual suggestions based on current state""" suggestions = [] # Analyze overdue items overdue_todos = [r for r in results if r["content_type"] == "todo" and r.get("metadata", {}).get("due_date") and not r.get("metadata", {}).get("completed")] if overdue_todos: suggestions.append(f"You have {len(overdue_todos)} overdue tasks - consider reviewing priorities") # Suggest focus areas high_relevance = [r for r in results if r.get("relevance_score", 0) > 0.8] if len(high_relevance) > 1: suggestions.append("Consider focusing on your most relevant items first") # Project-task balance project_results = [r for r in results if r["content_type"] == "project"] todo_results = [r for r in results if r["content_type"] == "todo"] if len(project_results) > len(todo_results): suggestions.append("Consider breaking down your projects into specific actionable tasks") # Time-based suggestions if query and ("today" in query.lower() or "now" in query.lower()): suggestions.append("Focus on quick wins to build momentum for the day") if not suggestions: suggestions.append("Everything looks well-organized - great job staying on top of things!") return suggestions async def _add_project_with_embeddings(self, db: DatabaseInterface, user: UserContext, args: Dict[str, Any]) -> Dict[str, Any]: """Add project with vector embeddings for semantic search""" project_data = { "id": str(uuid.uuid4()), "name": args["name"], "description": args["description"], "priority": args.get("priority", "medium"), "status": "active", "tags": args.get("tags", []), "created_date": datetime.now(), "updated_date": datetime.now() } project = Project(**project_data) # Generate embeddings for semantic search if vector search is enabled if self.config.vector_search.enabled: content_text = f"{project.name} {project.description}" metadata = { "priority": project.priority, "status": project.status, "tags": project.tags } embedding = await generate_content_embedding( content_text, content_type="project", metadata=metadata ) # Store embedding with project (depending on database implementation) if hasattr(project, '__dict__'): project.__dict__['embedding'] = embedding logger.info(f"Generated embedding for project {project.id}") await db.add_project(project) return { "message": "Project added successfully with semantic indexing", "project": project_data, "vector_search_enabled": self.config.vector_search.enabled } async def _semantic_search(self, db: DatabaseInterface, user: UserContext, args: Dict[str, Any]) -> Dict[str, Any]: """Perform intelligent semantic search across all data types""" query = args["query"] limit = args.get("limit", 10) search_types = args.get("types", ["projects", "todos", "documents", "events"]) # Get intelligent retrieval service for user's tenant retriever = await self._get_intelligent_retriever(user.tenant_id) # Perform intelligent search with context awareness intelligent_results = await retriever.search( user_id=user.user_id, tenant_id=user.tenant_id, query=query, content_types=search_types, max_results=limit, similarity_threshold=args.get("similarity_threshold", 0.7), intent=args.get("intent"), time_scope=args.get("time_scope"), priority_filter=args.get("priority_filter") ) return intelligent_results async def _semantic_search_projects(self, db: DatabaseInterface, query: str, limit: int = 5) -> List[Dict]: """Semantic search for projects""" if not self.config.vector_search.enabled: # Fallback to text search if vector search is disabled all_projects = await db.get_projects() matching_projects = [ p for p in all_projects if query.lower() in p.name.lower() or query.lower() in p.description.lower() ] return [ { "id": p.id, "name": p.name, "description": p.description, "priority": p.priority, "relevance_score": 0.85 # Mock score for text search } for p in matching_projects[:limit] ] try: # Generate embedding for search query query_embedding = await self.embedding_service.generate_embedding(query) # Perform vector search if database supports it if hasattr(db, 'semantic_search_projects'): results = await db.semantic_search_projects( query_embedding, limit=limit, similarity_threshold=self.config.vector_search.similarity_threshold ) return [ { "id": project.id, "name": project.name, "description": project.description, "priority": project.priority, "relevance_score": similarity } for project, similarity in results ] else: # Fallback if database doesn't support vector search return await self._semantic_search_projects_fallback(db, query, limit) except Exception as e: logger.error(f"Vector search failed: {e}") # Fallback to text search return await self._semantic_search_projects_fallback(db, query, limit) async def _semantic_search_projects_fallback(self, db: DatabaseInterface, query: str, limit: int) -> List[Dict]: """Fallback text search for projects""" all_projects = await db.get_projects() matching_projects = [ p for p in all_projects if query.lower() in p.name.lower() or query.lower() in p.description.lower() ] return [ { "id": p.id, "name": p.name, "description": p.description, "priority": p.priority, "relevance_score": 0.75 # Lower score for text-based fallback } for p in matching_projects[:limit] ] async def _semantic_search_todos(self, db: DatabaseInterface, query: str, limit: int = 5) -> List[Dict]: """Semantic search for todos""" # TODO: Implement actual vector search all_todos = await db.get_todos() matching_todos = [ t for t in all_todos if query.lower() in t.title.lower() or (t.description and query.lower() in t.description.lower()) ] return [ { "id": t.id, "title": t.title, "description": t.description, "priority": t.priority, "completed": t.completed, "relevance_score": 0.80 # Mock score } for t in matching_todos[:limit] ] async def _semantic_search_documents(self, db: DatabaseInterface, query: str, limit: int = 5) -> List[Dict]: """Semantic search for documents""" # TODO: Implement document search with vector embeddings return [] def run(self, host: str = "0.0.0.0", port: int = 8000): """Run the HTTP server""" uvicorn.run(self.app, host=host, port=port) async def main(): """Main entry point for HTTP server""" config = Config() server = PersonalAssistantHTTPServer(config) logger.info(f"Starting HTTP MCP Server on port 8000") server.run() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/swapnilsurdi/mcp-pa'

If you have feedback or need assistance with the MCP directory API, please join our Discord server