Skip to main content
Glama
http_server.py76.6 kB
""" MCP HTTP/SSE Server - Standard Model Context Protocol over HTTP Exposes all IRIS capabilities (OAuth, Calendar, Email, Users, Booking) via MCP protocol """ import asyncio import json import logging import os import uuid from pathlib import Path from typing import Dict, Any, Optional, AsyncGenerator, Union from datetime import datetime from dotenv import load_dotenv from fastapi import FastAPI, Request, HTTPException, Security, Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, FileResponse from sse_starlette.sse import EventSourceResponse from pydantic import BaseModel, Field import hashlib import psycopg2 from psycopg2.extras import RealDictCursor import redis # Load environment variables env_path = Path(__file__).parent.parent.parent / '.env' if env_path.exists(): load_dotenv(env_path) print(f"✅ Loaded .env from: {env_path}") print(f"🔍 TRUSTY_LAZY_AUTH = {os.getenv('TRUSTY_LAZY_AUTH', 'NOT SET')}") else: print(f"⚠️ .env file not found at: {env_path}") # Import operations from .microsoft.graph_client import MicrosoftGraphClient from .microsoft.operations import ( UserOperations, CalendarOperations, EmailOperations, TeamsOperations, FileOperations, BookingOperations ) from .microsoft.operations.hybrid_booking_operations import HybridBookingOperations # from .infocert import InfocertOperations # REMOVED: Infocert is a separate MCP system from .pec import PECOperations # OAuth functionality migrated to TrustyVault # Legacy imports removed: oauth_server.database (archived) # Import thread credentials management from ..database.credentials import ( save_credentials, get_credentials, update_last_used, save_pec_credentials, get_pec_credentials, save_microsoft_credentials, get_microsoft_credentials ) # Configure logging import os log_dir = os.getenv('LOG_DIR', '/app/logs') os.makedirs(log_dir, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(os.path.join(log_dir, 'mcp.log')), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Security: HTTP Bearer token scheme security = HTTPBearer(auto_error=False) # Database configuration for API keys DB_CONFIG = { 'host': os.getenv('DB_HOST', 'localhost'), 'port': os.getenv('DB_PORT', '5432'), 'database': os.getenv('DB_NAME', 'iris'), 'user': os.getenv('DB_USER', 'postgres'), 'password': os.getenv('DB_PASSWORD', '') } # Create FastAPI app app = FastAPI( title="IRIS MCP HTTP Server", description="Model Context Protocol over HTTP/SSE for IRIS Personal Assistant", version="1.0.0" ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Add request logging middleware @app.middleware("http") async def log_requests(request: Request, call_next): """Log all incoming requests""" logger.info(f"🔵 Incoming request: {request.method} {request.url.path}") logger.info(f"🔵 Headers: {dict(request.headers)}") # Log body for POST requests if request.method == "POST": body = await request.body() logger.info(f"🔵 Body (raw bytes): {body[:500]}") # First 500 bytes try: body_str = body.decode('utf-8') logger.info(f"🔵 Body (decoded): {body_str[:500]}") except Exception as e: logger.error(f"🔵 Body decode error: {e}") # IMPORTANT: Re-wrap the body so FastAPI can read it again async def receive(): return {"type": "http.request", "body": body} request._receive = receive response = await call_next(request) logger.info(f"🟢 Response status: {response.status_code}") return response # Initialize operations (graph_client no longer needed - each action creates its own) # Legacy: Operations still accept graph_client parameter for backward compatibility graph_client = None # Deprecated: Each action now creates MicrosoftGraphClient(access_token=...) user_ops = UserOperations(graph_client) calendar_ops = CalendarOperations(graph_client) email_ops = EmailOperations(graph_client) teams_ops = TeamsOperations(graph_client) file_ops = FileOperations(graph_client) booking_ops = BookingOperations(graph_client) hybrid_booking_ops = HybridBookingOperations(graph_client) # infocert_ops = InfocertOperations() # REMOVED: Infocert is a separate MCP system pec_ops = PECOperations() # Active SSE connections active_connections: Dict[str, asyncio.Queue] = {} # ==================== SESSION MANAGER (REDIS) ==================== # Redis client for session storage try: redis_client = redis.Redis( host=os.getenv('REDIS_HOST', 'swissknife-redis'), port=int(os.getenv('REDIS_PORT', 6379)), decode_responses=True, socket_connect_timeout=5 ) redis_client.ping() logger.info("✅ Redis connected for session storage") except Exception as e: logger.warning(f"⚠️ Redis not available: {e}. Session storage disabled.") redis_client = None class SessionManager: """Manages user session state (user_email, PEC credentials) in Redis""" def __init__(self, session_id: str): self.session_id = session_id self.key = f"mcp_session:{session_id}" self.ttl = 3600 # 1 hour def save(self, data: Dict[str, Any]): """Save session data to Redis""" if redis_client: try: redis_client.setex(self.key, self.ttl, json.dumps(data)) logger.debug(f"📝 Session saved: {self.session_id}") except Exception as e: logger.error(f"❌ Failed to save session: {e}") def load(self) -> Dict[str, Any]: """Load session data from Redis""" if redis_client: try: data = redis_client.get(self.key) if data: logger.debug(f"📖 Session loaded: {self.session_id}") return json.loads(data) except Exception as e: logger.error(f"❌ Failed to load session: {e}") return {} def update(self, key: str, value: Any): """Update a specific key in session""" data = self.load() data[key] = value self.save(data) logger.info(f"🔄 Session updated: {self.session_id} - {key} = {value if key != 'pec_password' else '***'}") def get(self, key: str, default: Any = None) -> Any: """Get a specific key from session""" data = self.load() return data.get(key, default) def delete(self): """Delete session from Redis""" if redis_client: try: redis_client.delete(self.key) logger.info(f"🗑️ Session deleted: {self.session_id}") except Exception as e: logger.error(f"❌ Failed to delete session: {e}") # ==================== AUTHENTICATION ==================== def verify_api_key_db(api_key: str) -> Optional[Dict[str, Any]]: """Verify API key against database and return user info""" try: # Hash the provided key key_hash = hashlib.sha256(api_key.encode()).hexdigest() # Check database conn = psycopg2.connect(**DB_CONFIG) cursor = conn.cursor(cursor_factory=RealDictCursor) cursor.execute(""" UPDATE mcp_api_keys SET last_used_at = CURRENT_TIMESTAMP WHERE key_hash = %s AND is_active = TRUE AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP) RETURNING id, key_name, user_email """, (key_hash,)) result = cursor.fetchone() conn.commit() cursor.close() conn.close() if result: logger.info(f"✅ Valid API key: {result['key_name']} (user: {result['user_email']})") return dict(result) else: logger.warning(f"🚫 Invalid or expired API key") return None except Exception as e: logger.error(f"❌ Database error during API key verification: {str(e)}") return None async def authenticate(request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Security(security)) -> Dict[str, Any]: """FastAPI dependency for API key authentication - supports both header and query param""" # Try query parameter first (for MCP clients) api_key = request.query_params.get("api_key") if not api_key: # Fallback to Authorization header if credentials: api_key = credentials.credentials if not api_key: # Check if TRUSTY_LAZY_AUTH is enabled (bypass authentication) lazy_auth = os.getenv("TRUSTY_LAZY_AUTH", "false").lower() == "true" if lazy_auth: logger.info("🟢 TRUSTY_LAZY_AUTH enabled - allowing unauthenticated access") return { "user_email": "guest@example.com", "user_name": "Guest User", "api_key": None, "lazy_auth": True } logger.warning("🚫 Missing authentication (no header or query param)") raise HTTPException( status_code=401, detail="Missing authentication. Provide: Authorization: Bearer <key> OR ?api_key=<key>", headers={"WWW-Authenticate": "Bearer"} ) user_info = verify_api_key_db(api_key) if not user_info: raise HTTPException( status_code=403, detail="Invalid or expired API key" ) return user_info # ==================== OAUTH HELPER FUNCTIONS ==================== # OAuth functionality removed - migrated to TrustyVault # IRIS MCP tools now receive credentials via TrustyVault integration # Tools that need OAuth should use TrustyVault's credential management # ==================== MODELS ==================== class MCPRequest(BaseModel): """MCP JSON-RPC request""" jsonrpc: str = "2.0" id: Optional[Union[str, int]] = None # JSON-RPC allows string, number or null method: str params: Optional[Dict[str, Any]] = None class MCPResponse(BaseModel): """MCP JSON-RPC response""" jsonrpc: str = "2.0" id: Optional[Union[str, int]] = None # JSON-RPC allows string, number or null result: Optional[Dict[str, Any]] = None error: Optional[Dict[str, Any]] = None # ==================== MCP TOOL DEFINITIONS ==================== MCP_TOOLS = { # OAuth Tools - DEPRECATED (use TrustyVault for authentication) # Removed: oauth_check_status, oauth_get_login_url # Calendar Tools "calendar_list_events": { "description": "List calendar events for a user", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "start_date": {"type": "string", "format": "date-time"}, "end_date": {"type": "string", "format": "date-time"}, "max_results": {"type": "integer", "default": 10} }, "required": ["access_token", "microsoft_user"] } }, "calendar_create_event": { "description": "Create calendar event for meetings with INTERNAL attendees ONLY (same organization, e.g., @infocert.it). Workflow: 1) Detect if ALL attendees are internal (same domain), 2) Use calendar_find_free_time with ALL attendees to find common free slot, 3) Create event directly. DO NOT use for external attendees (Gmail, Yahoo) - use booking_create instead!", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "subject": {"type": "string"}, "start": {"type": "string", "format": "date-time"}, "end": {"type": "string", "format": "date-time"}, "attendees": {"type": "array", "items": {"type": "string"}}, "body": {"type": "string"}, "location": {"type": "string"}, "is_online": {"type": "boolean", "default": False} }, "required": ["access_token", "microsoft_user", "subject", "start", "end"] } }, "calendar_update_event": { "description": "Update an existing calendar event", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "event_id": {"type": "string"}, "subject": {"type": "string"}, "start": {"type": "string"}, "end": {"type": "string"}, "body": {"type": "string"}, "location": {"type": "string"} }, "required": ["access_token", "microsoft_user", "event_id"] } }, "calendar_delete_event": { "description": "Delete a calendar event", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "event_id": {"type": "string"} }, "required": ["access_token", "microsoft_user", "event_id"] } }, "calendar_find_free_time": { "description": "Find common free time slots for organizer AND attendees. Use 'attendees' parameter to check availability of ALL participants (not just organizer!). For internal meetings: find common free slot among all attendees. For external meetings: check only organizer's availability (external calendars not accessible). IMPORTANT: Parse user time preferences carefully - 'domani mattina' = tomorrow 9-13, 'tra 3 giorni pomeriggio' = 3 days from now 14-18.", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "attendees": {"type": "array", "items": {"type": "string"}, "description": "List of attendee email addresses to check availability for"}, "duration_minutes": {"type": "integer", "description": "Meeting duration in minutes"}, "start_date": {"type": "string", "description": "Start of search window (ISO datetime). Example: for 'domani mattina' use tomorrow at 09:00, for 'tra 3 giorni' use 3 days from now."}, "end_date": {"type": "string", "description": "End of search window (ISO datetime). Example: for 'mattina' use 13:00, for 'pomeriggio' use 18:00, for full day use 18:00."} }, "required": ["access_token", "microsoft_user", "duration_minutes"] } }, # Email Tools "email_list_messages": { "description": "List Microsoft 365 email messages. ⚠️ NOTE: If you see attachments named 'smime.p7m' or 'postacert.eml', those are PEC (Italian Certified Email) messages imported into Microsoft mailbox. To access PEC content and attachments properly, use pec_list_messages + pec_get_attachment tools instead (requires pec_user + pec_password from TrustyVault).", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "folder": {"type": "string", "default": "inbox"}, "max_results": {"type": "integer", "default": 10}, "filter": {"type": "string"} }, "required": ["access_token", "microsoft_user"] } }, "email_send_message": { "description": "Send an email message with optional attachments. To attach files: use 'file_path' from pec_get_attachment/email_get_attachment result (NOT download_url or content_bytes). Example: attachments=[{'file_path': '/app/attachments/20231124_abc123_document.pdf'}]", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "to": {"type": "array", "items": {"type": "string"}}, "subject": {"type": "string"}, "body": {"type": "string"}, "cc": {"type": "array", "items": {"type": "string"}}, "bcc": {"type": "array", "items": {"type": "string"}}, "attachments": { "type": "array", "items": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "file_path": {"type": "string", "description": "Path to file on server filesystem (e.g. /app/attachments/file.pdf) - get this from pec_get_attachment or email_get_attachment"}, "name": {"type": "string", "description": "Optional: Override filename"}, "content_type": {"type": "string", "description": "Optional: MIME type"}, "content_bytes": {"type": "string", "description": "Optional: Base64 encoded content (alternative to file_path)"} } }, "description": "Attachments with file_path from get_attachment tools" } }, "required": ["access_token", "microsoft_user", "to", "subject", "body"] } }, "email_get_message": { "description": "Get full email message content including body and metadata", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "message_id": {"type": "string", "description": "Message ID to retrieve"} }, "required": ["access_token", "microsoft_user", "message_id"] } }, "email_list_attachments": { "description": "List all attachments for an email message", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "message_id": {"type": "string", "description": "Message ID to get attachments from"} }, "required": ["access_token", "microsoft_user", "message_id"] } }, "email_get_attachment": { "description": "Get Microsoft email attachment - file is automatically saved to /app/attachments/. ⚠️ USE ONLY for Microsoft 365 email messages from email_list_messages. DO NOT use for PEC messages or attachments named 'smime.p7m' / 'postacert.eml' - those are PEC envelopes, use pec_list_messages + pec_get_attachment instead. Returns: 1) 'file_path' (use for forwarding), 2) 'markdown_content' (for analysis), 3) 'download_url', 4) 'download_link_html' (HTML link with target='_blank'). WORKFLOW EXAMPLE for forwarding: Step 1) result = email_get_attachment(...), Step 2) Extract: file_path = result['data']['file_path'], Step 3) teams_send_message(attachments=[{'file_path': file_path}]). When showing download link to user, use 'download_link_html' so it opens in new tab. The file_path points to the saved file on server filesystem. Supports PDF, DOCX, XLSX, PPTX, images via MarkItDown.", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "message_id": {"type": "string", "description": "Message ID containing the attachment"}, "attachment_id": {"type": "string", "description": "Attachment ID to download"} }, "required": ["access_token", "microsoft_user", "message_id", "attachment_id"] } }, "email_read_attachment_text": { "description": "Read and extract text from email attachments (PDF, EML, TXT, etc.) without downloading. Returns extracted text content directly.", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "message_id": {"type": "string", "description": "Message ID containing the attachment"}, "attachment_id": {"type": "string", "description": "Attachment ID to read"} }, "required": ["access_token", "microsoft_user", "message_id", "attachment_id"] } }, "email_forward": { "description": "Forward an email message to one or more recipients", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "message_id": {"type": "string", "description": "Message ID to forward"}, "to": {"type": "array", "items": {"type": "string"}, "description": "Recipients email addresses"}, "comment": {"type": "string", "description": "Optional comment to add to forwarded message"} }, "required": ["access_token", "microsoft_user", "message_id", "to"] } }, "email_search": { "description": "Search emails using Microsoft Search API with advanced query capabilities. Supports KQL (Keyword Query Language) for precise searches across subject, body, and attachments. Examples: 'belluzzo', 'from:giovanni.belluzzo@infocert.it', 'subject:meeting', 'hasattachments:true'", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "query": {"type": "string", "description": "Search query (supports KQL syntax)"}, "max_results": {"type": "integer", "default": 25, "description": "Maximum number of results to return"}, "enable_top_results": {"type": "boolean", "default": False, "description": "Enable relevance-based ranking"} }, "required": ["access_token", "microsoft_user", "query"] } }, "email_get_conversation": { "description": "Get all messages in an email conversation/thread. Useful for following entire discussion chains. Provide either conversation_id directly or message_id to retrieve its thread.", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "conversation_id": {"type": "string", "description": "Conversation ID to retrieve (optional if message_id provided)"}, "message_id": {"type": "string", "description": "Message ID to get conversation from (optional if conversation_id provided)"} }, "required": ["access_token", "microsoft_user"] } }, # User Tools "users_get_profile": { "description": "Get user profile information. If target_email is omitted, returns YOUR profile (authenticated user via /me endpoint). If target_email is provided, searches for that user's profile. Accepts both UPN and email display - automatically resolves to correct user.", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "target_email": {"type": "string", "description": "OPTIONAL - Email or UPN of user to search (e.g., giovanni.albero@infocert.it or GAL1234@infocert.it). If omitted, returns YOUR profile using /me endpoint."} }, "required": ["access_token"] } }, "users_search": { "description": "Search for users in organization", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "query": {"type": "string"} }, "required": ["access_token", "microsoft_user", "query"] } }, # Teams Tools "teams_list_chats": { "description": "List user's Teams chats (1-on-1 and group chats). Returns chat_id, topic/displayName, and members for each chat. IMPORTANT: When user asks to send message to a group by name (e.g. 'send to LegalVector group'), you MUST: 1) Call teams_list_chats first, 2) Find the chat with matching topic/displayName, 3) Extract its chat_id, 4) Use that chat_id in teams_send_message. DO NOT ask user for member emails if group already exists!", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, }, "required": ["access_token", "microsoft_user"] } }, "teams_send_message": { "description": "Send a message to a Teams chat with optional file attachments. SIMPLIFIED: Provide 'recipient_email' to auto-find/create 1-on-1 chat (e.g., 'giovanni.albero@infocert.it' or 'GAL1234@infocert.it'), OR provide 'chat_id' for existing group chats. ONE OF THE TWO IS REQUIRED (chat_id OR recipient_email). System automatically handles chat discovery and creation. To attach files: use 'file_path' from pec_get_attachment/email_get_attachment result. Example: teams_send_message({recipient_email: 'user@domain.com', message: 'Hello!'})", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "chat_id": {"type": "string", "description": "OPTIONAL - Chat ID to send message to (for existing group chats). Not needed if using recipient_email. ONE OF chat_id OR recipient_email REQUIRED."}, "recipient_email": {"type": "string", "description": "OPTIONAL - Email display or UPN of recipient for 1-on-1 chat (e.g., 'giovanni.albero@infocert.it' or 'GAL1234@infocert.it'). System auto-finds or creates chat. Use this for simplicity. ONE OF chat_id OR recipient_email REQUIRED."}, "message": {"type": "string", "description": "Message content"}, "content_type": {"type": "string", "default": "text", "description": "Content type (text or html)"}, "attachments": { "type": "array", "items": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "file_path": {"type": "string", "description": "Path to file on server filesystem (e.g. /app/attachments/file.pdf) - get this from pec_get_attachment or email_get_attachment"}, "name": {"type": "string", "description": "Optional: Override filename"}, "content_type": {"type": "string", "description": "Optional: MIME type"} } }, "description": "Attachments with file_path from get_attachment tools (max 3MB per file)" } }, "required": ["access_token", "microsoft_user", "message"] } }, "teams_create_chat": { "description": "Create or get existing Teams chat with members. IDEMPOTENT: If chat already exists, returns existing chat_id (no error). AUTO-RESOLVES UPNs: Accepts display emails (e.g. 'firstname.lastname@domain.com') and automatically finds correct UPN. Use this directly to send messages - no need to search users or existing chats first. The caller (user_email) is automatically included. For 1-on-1 chat, provide only the other person's email in members list. Returns chat_id that can be used immediately with teams_send_message.", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"}, "members": {"type": "array", "items": {"type": "string"}, "description": "List of OTHER member email addresses (caller is auto-included). Accepts display emails or UPNs - system auto-resolves. For 1-on-1, provide only 1 email."}, "topic": {"type": "string", "description": "Chat topic (for group chats with 3+ people)"} }, "required": ["access_token", "microsoft_user", "members"] } }, # Hybrid Booking Tools "booking_create": { "description": "Create booking for meetings with ONE external attendee (Gmail, Yahoo, etc.) + optional internal attendees. IMPORTANT: Always ask user for their email (organizer_email) if not already known. WORKFLOW: 1) Ask organizer email if needed, 2) Ask if they want to PROPOSE slots or CONFIRM specific time, 3a) If PROPOSE: Find 3 free slots and send booking link, OR 3b) If CONFIRM: Create calendar event immediately with specific time. Example: 'Meeting with filippo@gmail.com tomorrow at 10' → Ask email → Ask 'Propose slots or confirm 10am?' → If confirm: set direct_booking=true + proposed_times=['2025-11-26T10:00:00'].", "inputSchema": { "type": "object", "properties": { "access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"}, "microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com) - same as organizer_email"}, "organizer_email": {"type": "string", "description": "Email of the meeting organizer (REQUIRED - must be a valid Microsoft account with OAuth access)"}, "external_email": {"type": "string", "description": "Email of external attendee (non-Microsoft account like Gmail)"}, "subject": {"type": "string", "description": "Meeting subject/title"}, "duration_minutes": {"type": "integer", "description": "Meeting duration in minutes"}, "internal_attendees": { "type": "array", "description": "Optional: List of internal attendees (@organization) to include. System will check their availability before proposing slots. Format: [{'email': 'user@domain.com', 'name': 'User Name'}]", "items": { "type": "object", "properties": { "email": {"type": "string"}, "name": {"type": "string"} }, "required": ["email"] } }, "proposed_times": {"type": "array", "items": {"type": "string"}, "description": "Optional: Specific ISO datetime strings to propose. If not provided, system finds 3 available slots automatically"}, "direct_booking": {"type": "boolean", "description": "If true, creates calendar event immediately with first proposed_time (no booking link). If false (default), sends booking link with proposed slots for external attendee to choose.", "default": False}, "attachments": { "type": "array", "description": "Optional: Files to attach to meeting invite (will be included when external user confirms booking)", "items": { "type": "object", "properties": { "name": {"type": "string", "description": "Filename with extension"}, "contentType": {"type": "string", "description": "MIME type (e.g., application/pdf, application/vnd.openxmlformats-officedocument.presentationml.presentation)"}, "contentBytes": {"type": "string", "description": "Base64-encoded file content"} }, "required": ["name", "contentBytes"] } } }, "required": ["access_token", "microsoft_user", "organizer_email", "external_email", "subject", "duration_minutes"] } }, "booking_check_status": { "description": "Check status of booking request", "inputSchema": { "type": "object", "properties": { "session_id": {"type": "string"} }, "required": ["access_token", "microsoft_user", "session_id"] } }, "booking_list": { "description": "List all booking requests", "inputSchema": { "type": "object", "properties": { "organizer_email": {"type": "string"} } } }, "booking_cancel": { "description": "Cancel a booking request", "inputSchema": { "type": "object", "properties": { "session_id": {"type": "string"} }, "required": ["access_token", "microsoft_user", "session_id"] } }, # REMOVED: Infocert Digital Signature Tools (separate MCP system) # PEC (Posta Elettronica Certificata) Tools "pec_list_messages": { "description": "List PEC (Italian Certified Email) messages from INBOX with automatic unwrapping. Extracts original content from postacert.eml. CREDENTIALS: pec_user and pec_password are REQUIRED parameters.", "inputSchema": { "type": "object", "properties": { "pec_user": {"type": "string", "description": "PEC username/email from TrustyVault"}, "pec_password": {"type": "string", "description": "PEC password from TrustyVault"}, "folder": {"type": "string", "description": "IMAP folder (default: INBOX)", "default": "INBOX"}, "max_results": {"type": "integer", "description": "Maximum number of messages (default: 10)", "default": 10} }, "required": ["pec_user", "pec_password"] } }, "pec_get_message": { "description": "Get single PEC message with automatic sbustamento (unwrapping). Returns COMPACT summary by default (optimized for LLM agents). Set compact=false for full body content. CREDENTIALS: pec_user and pec_password are REQUIRED parameters.", "inputSchema": { "type": "object", "properties": { "pec_user": {"type": "string", "description": "PEC username/email from TrustyVault"}, "pec_password": {"type": "string", "description": "PEC password from TrustyVault"}, "message_id": {"type": "string", "description": "Message ID from pec_list_messages"}, "compact": { "type": "boolean", "description": "If true (default), returns compact summary with body preview (300 chars). If false, returns full body content (can be large, 50-100KB). Attachments data never included - use pec_get_attachment() to download.", "default": True } }, "required": ["pec_user", "pec_password", "message_id"] } }, "pec_get_attachment": { "description": "Get PEC (Italian Certified Email) attachment with automatic unwrapping - file is automatically saved to /app/attachments/. ⚠️ USE ONLY for PEC messages from pec_list_messages. If you see smime.p7m or postacert.eml attachments in Microsoft email, those ARE PEC messages - use pec_list_messages to access them properly with automatic unwrapping. Returns: 1) 'file_path' (use for forwarding), 2) 'markdown_content' (for analysis), 3) 'download_url', 4) 'download_link_html' (HTML link with target='_blank'). WORKFLOW EXAMPLE for forwarding: Step 1) result = pec_get_attachment(...), Step 2) Extract: file_path = result['data']['file_path'], Step 3) teams_send_message(attachments=[{'file_path': file_path}]). When showing download link to user, use 'download_link_html' so it opens in new tab. The file_path points to the saved file on server filesystem. Supports PDF, DOCX, XLSX, PPTX, images via MarkItDown. CREDENTIALS: pec_user and pec_password are REQUIRED parameters.", "inputSchema": { "type": "object", "properties": { "pec_user": {"type": "string", "description": "PEC username/email from TrustyVault"}, "pec_password": {"type": "string", "description": "PEC password from TrustyVault"}, "message_id": {"type": "string", "description": "Message ID"}, "attachment_index": {"type": "integer", "description": "Index of attachment (0-based)"} }, "required": ["pec_user", "pec_password", "message_id", "attachment_index"] } }, "pec_send_message": { "description": "Send PEC (Italian Certified Email) message via SMTP. CREDENTIALS: pec_user and pec_password are REQUIRED parameters.", "inputSchema": { "type": "object", "properties": { "pec_user": {"type": "string", "description": "PEC username/email from TrustyVault"}, "pec_password": {"type": "string", "description": "PEC password from TrustyVault"}, "to": {"type": "string", "description": "Recipient email address"}, "subject": {"type": "string", "description": "Email subject"}, "body": {"type": "string", "description": "Email body (text)"}, "attachments": {"type": "array", "description": "Optional attachments (base64 encoded)", "items": {"type": "object"}} }, "required": ["pec_user", "pec_password", "to", "subject", "body"] } }, # File Management Tools "file_upload": { "description": "Upload file from any interface (WhatsApp, Teams, PWA, web) and get public download URL. Accepts base64-encoded file content and saves to server. Returns file_path (for internal use with email/teams/pec tools) and download_url (public link for TrustySign, user download, etc.). UNIVERSAL: Works with all agent interfaces.", "inputSchema": { "type": "object", "properties": { "filename": {"type": "string", "description": "Original filename (e.g., 'document.pdf')"}, "content_base64": {"type": "string", "description": "Base64-encoded file content from user upload widget"}, "content_type": {"type": "string", "description": "MIME type (e.g., 'application/pdf', 'image/jpeg')"} }, "required": ["filename", "content_base64"] } } } # ==================== TOOL EXECUTION ==================== def check_oauth_error(result: Dict[str, Any], user_email: str) -> Dict[str, Any]: """Check if result contains OAuth error and add login URL if needed. Args: result: Operation result dictionary user_email: User email for logging purposes only Returns: Result with OAuth login URL added if authentication required """ error_message = result.get("error", {}).get("message", "") if not result.get("success") and "No valid access token available" in error_message: # Extract base URL from OAUTH_REDIRECT_URI or use default redirect_uri = os.getenv("OAUTH_REDIRECT_URI", "https://trustypa.brainaihub.tech/oauth/callback") # Remove /oauth/callback if present to get base URL base_url = redirect_uri.replace("/oauth/callback", "") login_url = f"{base_url}/oauth/login" result["error"]["oauth_required"] = True result["error"]["login_url"] = login_url result["error"]["instructions"] = f"Please authenticate by visiting: {login_url}" return result # ==================== TOOL REGISTRY ==================== # Generic mapping of tool_name -> (operation_type, action_name, user_email_param) TOOL_REGISTRY = { # Calendar Tools "calendar_list_events": ("calendar", "list", "user_email"), "calendar_create_event": ("calendar", "create", "user_email"), "calendar_update_event": ("calendar", "update", "user_email"), "calendar_delete_event": ("calendar", "delete", "user_email"), "calendar_find_free_time": ("calendar", "find_times", "user_email"), # Email Tools "email_list_messages": ("email", "list", "user_email"), "email_send_message": ("email", "send", "user_email"), "email_get_message": ("email", "get", "user_email"), "email_list_attachments": ("email", "list_attachments", "user_email"), "email_get_attachment": ("email", "get_attachment", "user_email"), "email_read_attachment_text": ("email", "read_attachment_text", "user_email"), "email_forward": ("email", "forward", "user_email"), "email_search": ("email", "search", "user_email"), "email_get_conversation": ("email", "get_conversation", "user_email"), # User Tools "users_get_profile": ("user", "get", "user_email"), "users_search": ("user", "search", "user_email"), # Teams Tools "teams_list_chats": ("teams", "list_chats", "user_email"), "teams_send_message": ("teams", "send_message", "user_email"), "teams_create_chat": ("teams", "create_chat", "user_email"), } async def execute_tool(tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]: """Execute an MCP tool and return the result""" try: logger.info(f"Executing tool: {tool_name} with params: {params}") # === SPECIAL TOOLS (Booking) === # Hybrid Booking Tools - use async direct methods (not BaseOperation) if tool_name == "booking_create": result = await hybrid_booking_ops.create_booking_request( organizer_email=params["organizer_email"], external_email=params["external_email"], subject=params["subject"], duration_minutes=params["duration_minutes"], proposed_times=params.get("proposed_times", []), internal_attendees=params.get("internal_attendees", []), attachments=params.get("attachments", []), direct_booking=params.get("direct_booking", False) ) return check_oauth_error(result, params["organizer_email"]) elif tool_name == "booking_check_status": result = await hybrid_booking_ops.get_booking_status( session_id=params["session_id"] ) return result elif tool_name == "booking_list": result = await hybrid_booking_ops.list_bookings( organizer_email=params.get("organizer_email") ) if params.get("organizer_email"): return check_oauth_error(result, params["organizer_email"]) return result elif tool_name == "booking_cancel": result = await hybrid_booking_ops.cancel_booking( session_id=params["session_id"] ) return result # === INFOCERT TOOLS REMOVED (separate MCP system) === # === PEC TOOLS === elif tool_name == "pec_list_messages": loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, pec_ops.execute, "list_messages", params ) return result elif tool_name == "pec_get_message": loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, pec_ops.execute, "get_message", params ) return result elif tool_name == "pec_get_attachment": loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, pec_ops.execute, "get_attachment", params ) return result elif tool_name == "pec_send_message": loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, pec_ops.execute, "send_message", params ) return result elif tool_name == "file_upload": # Import attachment helper from .attachment_helper import save_attachment import base64 try: # Decode base64 content content_base64 = params.get("content_base64", "") filename = params.get("filename", "file") content_type = params.get("content_type", "application/octet-stream") # Decode base64 try: content_bytes = base64.b64decode(content_base64) except Exception as e: return { "success": False, "error": {"code": "INVALID_BASE64", "message": f"Invalid base64 encoding: {str(e)}"} } # Save file using attachment_helper file_info = save_attachment(content_bytes, filename) return { "success": True, "file_path": file_info["file_path"], "download_url": file_info["download_url"], "download_link_html": file_info["download_link_html"], "filename": filename, "size": len(content_bytes), "content_type": content_type } except Exception as e: logger.error(f"❌ file_upload error: {e}", exc_info=True) return { "success": False, "error": {"code": "UPLOAD_FAILED", "message": str(e)} } # === HYBRID BOOKING AUTO-DETECTION === # Intercept calendar_create_event with external attendees elif tool_name == "calendar_create_event": try: # REMOVED AUTO-REDIRECT TO HYBRID BOOKING # Agent is now responsible for choosing: # - calendar_create_event: for fixed date/time meetings (immediate creation) # - booking_create: for flexible scheduling with multiple options # This ensures calendar_create_event behaves as expected: # Creates the event immediately without waiting for confirmation logger.info(f"📅 Creating Outlook event with attendees: {params.get('attendees', [])}") loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, calendar_ops.execute, "create", params) return check_oauth_error(result, params["user_email"]) except Exception as e: logger.error(f"❌ Error in calendar_create_event auto-detection: {e}", exc_info=True) # Fallback to normal calendar creation logger.warning(f"⚠️ Falling back to normal calendar creation") loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, calendar_ops.execute, "create", params) return check_oauth_error(result, params["user_email"]) # === GENERIC TOOL EXECUTION (Registry-based) === elif tool_name in TOOL_REGISTRY: operation_type, action, user_email_param = TOOL_REGISTRY[tool_name] # Get the operation instance operation_map = { "calendar": calendar_ops, "email": email_ops, "user": user_ops, "teams": teams_ops # "infocert": infocert_ops # REMOVED: separate MCP system } operation = operation_map.get(operation_type) if not operation: return {"error": f"Unknown operation type: {operation_type}"} # Execute the action in a thread pool to avoid blocking event loop # (operation.execute() is sync and makes blocking HTTP requests) loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, operation.execute, action, params) # Apply OAuth error checking if user_email is present if user_email_param and user_email_param in params: return check_oauth_error(result, params[user_email_param]) return result else: return {"error": f"Unknown tool: {tool_name}"} except Exception as e: logger.error(f"Error executing tool {tool_name}: {str(e)}", exc_info=True) return {"error": str(e)} # ==================== MCP PROTOCOL HANDLERS ==================== async def handle_mcp_request(request: MCPRequest, session: Optional[SessionManager] = None) -> Optional[MCPResponse]: """Handle incoming MCP JSON-RPC request. Returns None for notifications.""" try: method = request.method params = request.params or {} logger.info(f"Handling MCP request: {method}") # Handle notifications (no response needed) if request.id is None: if method.startswith("notifications/"): logger.info(f"Received notification: {method}") return None else: logger.warning(f"Received request without id: {method}") return None # Initialize handshake if method == "initialize": return MCPResponse( id=request.id, result={ "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "iris-mcp-server", "version": "1.0.0" } } ) # List available tools elif method == "tools/list": tools = [ { "name": name, "description": info["description"], "inputSchema": info["inputSchema"] } for name, info in MCP_TOOLS.items() ] return MCPResponse( id=request.id, result={"tools": tools} ) # Execute tool elif method == "tools/call": tool_name = params.get("name") tool_params = params.get("arguments", {}) if tool_name not in MCP_TOOLS: return MCPResponse( id=request.id, error={ "code": -32602, "message": f"Unknown tool: {tool_name}" } ) # ========== MIGRATE SESSION FROM GUEST TO USER_ID ========== # Check if state contains user_id and session needs migration if session and "state" in tool_params and isinstance(tool_params["state"], dict): state = tool_params["state"] if "user_id" in state: user_id = state["user_id"] current_session_id = session.session_id # If currently using guest session AND not already migrated, do migration if "guest@example.com" in current_session_id: # Check if already migrated for this user target_session_id = f"session_user_{user_id}" target_session = SessionManager(target_session_id) # Only migrate if target session exists and has user_id set # Otherwise create/migrate if not target_session.get("user_id"): logger.info(f"🔄 Migrating session from guest to user_id: {user_id}") # Copy existing data from guest session to user session old_data = session.load() old_data["user_id"] = user_id target_session.save(old_data) logger.info(f"✅ Session created: {target_session_id}") # Update active_connections mapping to point to user session connection_id = session.get("connection_id", "guest@example.com") active_connections[f"{connection_id}_session"] = target_session_id # Switch to user session session = target_session logger.info(f"📂 Using user session: {target_session_id}") # ========== AUTO-INJECT CREDENTIALS FROM DB (PERSISTENT!) ========== # NOTE: Microsoft tools now use access_token from TrustyVault (passed directly) # Only PEC tools still use thread_id-based credential lookup # Get thread_id from tool params (for PEC tools only now) thread_id = tool_params.get("thread_id") if thread_id: logger.info(f"🔍 Looking up credentials for thread_id: {thread_id[:20]}...") # Auto-inject PEC credentials if missing if tool_name.startswith("pec_"): if "pec_email" not in tool_params or not tool_params["pec_email"]: # Try DB first (persistent) creds = get_pec_credentials(thread_id) if creds and creds.pec_email and creds.pec_password: tool_params["pec_email"] = creds.pec_email tool_params["pec_password"] = creds.pec_password tool_params["_credentials_auto_filled"] = True logger.info(f"✅ Auto-injected PEC credentials from DB: {creds.pec_email}") # Fallback to session (backward compatibility) elif session: stored_pec_email = session.get("pec_email") stored_pec_password = session.get("pec_password") if stored_pec_email and stored_pec_password: tool_params["pec_email"] = stored_pec_email tool_params["pec_password"] = stored_pec_password tool_params["_credentials_auto_filled"] = True logger.info(f"🔄 Auto-injected PEC credentials from session: {stored_pec_email}") # Auto-inject Infocert credentials if missing (future use) else: logger.warning("⚠️ No thread_id in tool_params - cannot lookup persistent credentials") # ========== EXECUTE TOOL WITH CREDENTIALS ========== result = await execute_tool(tool_name, tool_params) # ========== INJECT CREDENTIAL REMINDER IN RESPONSE ========== # If PEC credentials were auto-injected, add them to response so agent remembers if tool_name.startswith("pec_") and tool_params.get("_credentials_auto_filled"): try: # Parse result and add credential info if isinstance(result, str): result_data = json.loads(result) if isinstance(result_data, dict) and "data" in result_data: # Add credential reminder to metadata if "metadata" not in result_data: result_data["metadata"] = {} result_data["metadata"]["pec_credentials_used"] = { "pec_email": tool_params.get("pec_email"), "note": "Credentials auto-filled from session - remember these for next PEC operations" } result = json.dumps(result_data) logger.info(f"📌 Added credential reminder to response for agent") except Exception as e: logger.warning(f"Failed to add credential reminder: {e}") # ========== SAVE CREDENTIALS TO DB + SESSION ========== # NOTE: Microsoft credentials no longer saved - access_token comes from TrustyVault if thread_id: # Save PEC credentials if provided if tool_name.startswith("pec_"): pec_email = tool_params.get("pec_email") pec_password = tool_params.get("pec_password") if pec_email and pec_password: try: save_pec_credentials(thread_id, pec_email, pec_password) logger.info(f"💾 Saved PEC credentials to DB: {pec_email}") except Exception as e: logger.error(f"Failed to save PEC credentials to DB: {e}") # Also save to session (backward compatibility) if session: session.update("pec_email", pec_email) session.update("pec_password", pec_password) # Also save to session if available (backward compatibility) if session and not thread_id: # Save user_email if provided if "user_email" in tool_params and tool_params["user_email"]: session.update("user_email", tool_params["user_email"]) # Save PEC credentials if provided if tool_name.startswith("pec_"): if "pec_email" in tool_params and tool_params["pec_email"]: session.update("pec_email", tool_params["pec_email"]) if "pec_password" in tool_params and tool_params["pec_password"]: session.update("pec_password", tool_params["pec_password"]) logger.info(f"💾 PEC credentials saved to session: {tool_params.get('pec_email')}") # Save file_path from attachment operations for later reuse if tool_name in ["pec_get_attachment", "email_get_attachment"]: logger.info(f"🔍 Checking if we should save attachment reference for {tool_name}") try: # Result is a JSON string, parse it first result_parsed = json.loads(result) if isinstance(result, str) else result logger.info(f"📋 Parsed result type: {type(result_parsed)}, keys: {result_parsed.keys() if isinstance(result_parsed, dict) else 'not a dict'}") # Navigate through nested structure: result -> data -> data -> file_path file_info = result_parsed if isinstance(file_info, dict) and "data" in file_info: file_info = file_info["data"] logger.info(f"📋 First level data, keys: {file_info.keys() if isinstance(file_info, dict) else 'not a dict'}") # For PEC, there's another nested 'data' level if isinstance(file_info, dict) and "data" in file_info: file_info = file_info["data"] logger.info(f"📋 Second level data, keys: {file_info.keys() if isinstance(file_info, dict) else 'not a dict'}") if isinstance(file_info, dict) and "file_path" in file_info: # Save with metadata for easy retrieval attachment_ref = { "file_path": file_info["file_path"], "filename": file_info.get("filename", ""), "download_url": file_info.get("download_url", ""), "content_type": file_info.get("content_type", ""), "timestamp": datetime.now().isoformat() } session.update("last_attachment", attachment_ref) logger.info(f"💾 Saved attachment reference: {file_info.get('filename')} at {file_info['file_path']}") else: logger.warning(f"⚠️ No file_path found in file_info: {file_info if isinstance(file_info, dict) else type(file_info)}") except Exception as e: logger.error(f"❌ Error saving attachment reference: {e}", exc_info=True) return MCPResponse( id=request.id, result={ "content": [ { "type": "text", "text": json.dumps(result, indent=2) } ] } ) # Ping elif method == "ping": return MCPResponse(id=request.id, result={}) else: return MCPResponse( id=request.id, error={ "code": -32601, "message": f"Method not found: {method}" } ) except Exception as e: logger.error(f"Error handling MCP request: {str(e)}", exc_info=True) return MCPResponse( id=request.id, error={ "code": -32603, "message": f"Internal error: {str(e)}" } ) # ==================== HTTP ENDPOINTS ==================== @app.get("/mcp/") async def mcp_root(): """MCP root endpoint - Server information""" return { "service": "IRIS MCP HTTP Server", "version": "1.0.0", "protocol": "MCP over HTTP/SSE", "status": "online", "endpoints": { "health": "/mcp/health", "sse": "/mcp/sse", "tools": "/mcp/tools", "messages": "/mcp/messages" }, "tools_count": len(MCP_TOOLS), "timestamp": datetime.utcnow().isoformat() } @app.get("/mcp/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "service": "IRIS MCP HTTP Server", "version": "1.0.0", "timestamp": datetime.utcnow().isoformat(), "protocol": "MCP over HTTP/SSE", "tools_count": len(MCP_TOOLS) } @app.get("/mcp/attachments/download/{filename}") async def download_attachment(filename: str): """ Serve attachment files publicly. Public URL: https://trustypa.brainaihub.tech/iris/attachments/download/{filename} (nginx routes /iris/ to iris-app:8001/mcp/) Files are stored in /app/attachments with token prefix for uniqueness. This endpoint makes files accessible via public URL for: - User download (email/pec attachments) - Cross-tool usage (TrustySign signing IRIS files) - External services """ attachments_dir = Path("/app/attachments") file_path = attachments_dir / filename # Security: Prevent directory traversal if not file_path.resolve().is_relative_to(attachments_dir.resolve()): raise HTTPException(status_code=403, detail="Access denied") # Check file exists if not file_path.exists() or not file_path.is_file(): raise HTTPException(status_code=404, detail="File not found") # Serve file with proper content type return FileResponse( path=str(file_path), filename=filename, media_type='application/octet-stream' ) @app.get("/mcp/sse") async def mcp_sse_endpoint(request: Request, user_info: Dict[str, Any] = Depends(authenticate)): """SSE endpoint for MCP protocol (server to client messages)""" # Use user email as connection ID (one connection per user) connection_id = user_info['user_email'] queue: asyncio.Queue = asyncio.Queue() active_connections[connection_id] = queue # Create or reuse session for this user (persistent across reconnections) # Use user_email as session_id so credentials persist across SSE reconnects session_id = f"session_{connection_id}" # e.g., "session_yyi9910@infocert.it" session = SessionManager(session_id) session.update('connection_id', connection_id) session.update('user_email', user_info['user_email']) # Store session_id in connection metadata active_connections[f"{connection_id}_session"] = session_id logger.info(f"New SSE connection: {connection_id} (session: {session_id})") async def event_generator() -> AsyncGenerator[str, None]: try: # Send endpoint event as required by MCP SSE specification # This tells the client where to send POST requests yield { "event": "endpoint", "data": "/mcp/message" } # Keep connection alive and send messages from queue while True: try: # Wait for message with timeout (non-blocking check for disconnect) message = await asyncio.wait_for(queue.get(), timeout=30.0) logger.info(f"SSE sending message to {connection_id}: {message}") # Send as SSE message event yield { "event": "message", "data": json.dumps(message) } except asyncio.TimeoutError: # Check if client disconnected before sending ping if await request.is_disconnected(): logger.info(f"SSE client disconnected during timeout: {connection_id}") break # Send keepalive ping logger.debug(f"SSE sending keepalive ping to {connection_id}") yield { "event": "message", "data": json.dumps({ "jsonrpc": "2.0", "method": "ping" }) } finally: # Cleanup connection (but keep session in Redis for reconnections) if connection_id in active_connections: del active_connections[connection_id] # Remove session reference from memory (but NOT from Redis) # Session will expire automatically after TTL (default 1 hour) # This allows credentials to persist across SSE reconnections session_key = f"{connection_id}_session" if session_key in active_connections: del active_connections[session_key] logger.info(f"SSE connection closed: {connection_id} (session preserved in Redis)") return EventSourceResponse(event_generator()) @app.post("/mcp/message") async def mcp_message_endpoint(raw_request: Request, user_info: Dict[str, Any] = Depends(authenticate)): """HTTP endpoint for MCP protocol (client to server messages)""" try: user_email = user_info['user_email'] # Log raw body for debugging body = await raw_request.body() logger.info(f"Raw request body: {body.decode('utf-8')}") # Parse as MCPRequest data = await raw_request.json() request = MCPRequest(**data) logger.info(f"Received MCP message: {request.method} from {user_email}") # Find the user's SSE connection queue if user_email not in active_connections: logger.error(f"No active SSE connection for user: {user_email}") return JSONResponse( status_code=400, content={ "jsonrpc": "2.0", "error": { "code": -32000, "message": "No active SSE connection. Connect to /mcp/sse first." } } ) # Get session for this connection session_key = f"{user_email}_session" session = None if session_key in active_connections: session_id = active_connections[session_key] session = SessionManager(session_id) logger.debug(f"📂 Using session: {session_id}") # Process the request with session response = await handle_mcp_request(request, session) # If it's a notification (no response), just acknowledge if response is None: logger.info(f"Notification processed (no response): {request.method}") return JSONResponse(status_code=202, content={"status": "accepted"}) # Put response in the SSE queue queue = active_connections[user_email] await queue.put(response.dict(exclude_none=True)) logger.info(f"Response queued for SSE delivery to {user_email}") # Return 202 Accepted (response will be sent via SSE) return JSONResponse(status_code=202, content={"status": "accepted"}) except HTTPException: raise except Exception as e: logger.error(f"Error processing MCP message: {e}", exc_info=True) return JSONResponse( status_code=422, content={ "jsonrpc": "2.0", "error": { "code": -32700, "message": f"Parse error: {str(e)}" } } ) @app.get("/mcp/tools") async def list_tools(user_info: Dict[str, Any] = Depends(authenticate)): """Convenience endpoint to list available tools""" return { "tools": [ { "name": name, "description": info["description"], "inputSchema": info["inputSchema"] } for name, info in MCP_TOOLS.items() ] } # ==================== BOOKING OPERATIONS ENDPOINT ==================== class OperationRequest(BaseModel): """Standard operation request for bookings endpoint""" action: str = Field(..., description="Action to perform") params: Dict[str, Any] = Field(default_factory=dict, description="Action parameters") @app.post("/mcp/bookings_operations") async def bookings_operations(request: OperationRequest): """ Booking operations endpoint (Microsoft Bookings + Hybrid Booking). This endpoint handles both Microsoft Bookings API operations and custom Hybrid Booking workflow for meetings with external attendees. Supported actions: - Microsoft Bookings API (list_businesses, create_appointment, etc.) - Hybrid Booking Operations (create_hybrid_session, confirm_hybrid_booking, etc.) """ logger.info(f"📋 Bookings operation: {request.action}") logger.info(f"📋 Parameters: {request.params}") try: result = booking_ops.execute(request.action, request.params) logger.info(f"✅ Operation {request.action} completed successfully") if not result["success"]: logger.error(f"❌ Operation {request.action} failed: {result}") raise HTTPException(status_code=400, detail=result) return result except Exception as e: logger.error(f"❌ Exception in bookings_operations: {e}", exc_info=True) raise HTTPException(status_code=500, detail={ "success": False, "error": { "code": "INTERNAL_ERROR", "message": str(e) } }) # ==================== MAIN ==================== if __name__ == "__main__": import uvicorn port = int(os.getenv("MCP_HTTP_PORT", "8001")) logger.info(f"Starting IRIS MCP HTTP Server on port {port}") logger.info(f"Available tools: {len(MCP_TOOLS)}") uvicorn.run( app, host="0.0.0.0", port=port, log_level="info" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ilvolodel/iris-legacy'

If you have feedback or need assistance with the MCP directory API, please join our Discord server