"""
MCP HTTP/SSE Server - Standard Model Context Protocol over HTTP
Exposes all IRIS capabilities (OAuth, Calendar, Email, Users, Booking) via MCP protocol
"""
import asyncio
import json
import logging
import os
import uuid
from pathlib import Path
from typing import Dict, Any, Optional, AsyncGenerator, Union
from datetime import datetime
from dotenv import load_dotenv
from fastapi import FastAPI, Request, HTTPException, Security, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, FileResponse
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel, Field
import hashlib
import psycopg2
from psycopg2.extras import RealDictCursor
import redis
# Load environment variables
env_path = Path(__file__).parent.parent.parent / '.env'
if env_path.exists():
load_dotenv(env_path)
print(f"✅ Loaded .env from: {env_path}")
print(f"🔍 TRUSTY_LAZY_AUTH = {os.getenv('TRUSTY_LAZY_AUTH', 'NOT SET')}")
else:
print(f"⚠️ .env file not found at: {env_path}")
# Import operations
from .microsoft.graph_client import MicrosoftGraphClient
from .microsoft.operations import (
UserOperations,
CalendarOperations,
EmailOperations,
TeamsOperations,
FileOperations,
BookingOperations
)
from .microsoft.operations.hybrid_booking_operations import HybridBookingOperations
# from .infocert import InfocertOperations # REMOVED: Infocert is a separate MCP system
from .pec import PECOperations
# OAuth functionality migrated to TrustyVault
# Legacy imports removed: oauth_server.database (archived)
# Import thread credentials management
from ..database.credentials import (
save_credentials,
get_credentials,
update_last_used,
save_pec_credentials,
get_pec_credentials,
save_microsoft_credentials,
get_microsoft_credentials
)
# Configure logging
import os
log_dir = os.getenv('LOG_DIR', '/app/logs')
os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(log_dir, 'mcp.log')),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Security: HTTP Bearer token scheme
security = HTTPBearer(auto_error=False)
# Database configuration for API keys
DB_CONFIG = {
'host': os.getenv('DB_HOST', 'localhost'),
'port': os.getenv('DB_PORT', '5432'),
'database': os.getenv('DB_NAME', 'iris'),
'user': os.getenv('DB_USER', 'postgres'),
'password': os.getenv('DB_PASSWORD', '')
}
# Create FastAPI app
app = FastAPI(
title="IRIS MCP HTTP Server",
description="Model Context Protocol over HTTP/SSE for IRIS Personal Assistant",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Add request logging middleware
@app.middleware("http")
async def log_requests(request: Request, call_next):
"""Log all incoming requests"""
logger.info(f"🔵 Incoming request: {request.method} {request.url.path}")
logger.info(f"🔵 Headers: {dict(request.headers)}")
# Log body for POST requests
if request.method == "POST":
body = await request.body()
logger.info(f"🔵 Body (raw bytes): {body[:500]}") # First 500 bytes
try:
body_str = body.decode('utf-8')
logger.info(f"🔵 Body (decoded): {body_str[:500]}")
except Exception as e:
logger.error(f"🔵 Body decode error: {e}")
# IMPORTANT: Re-wrap the body so FastAPI can read it again
async def receive():
return {"type": "http.request", "body": body}
request._receive = receive
response = await call_next(request)
logger.info(f"🟢 Response status: {response.status_code}")
return response
# Initialize operations (graph_client no longer needed - each action creates its own)
# Legacy: Operations still accept graph_client parameter for backward compatibility
graph_client = None # Deprecated: Each action now creates MicrosoftGraphClient(access_token=...)
user_ops = UserOperations(graph_client)
calendar_ops = CalendarOperations(graph_client)
email_ops = EmailOperations(graph_client)
teams_ops = TeamsOperations(graph_client)
file_ops = FileOperations(graph_client)
booking_ops = BookingOperations(graph_client)
hybrid_booking_ops = HybridBookingOperations(graph_client)
# infocert_ops = InfocertOperations() # REMOVED: Infocert is a separate MCP system
pec_ops = PECOperations()
# Active SSE connections
active_connections: Dict[str, asyncio.Queue] = {}
# ==================== SESSION MANAGER (REDIS) ====================
# Redis client for session storage
try:
redis_client = redis.Redis(
host=os.getenv('REDIS_HOST', 'swissknife-redis'),
port=int(os.getenv('REDIS_PORT', 6379)),
decode_responses=True,
socket_connect_timeout=5
)
redis_client.ping()
logger.info("✅ Redis connected for session storage")
except Exception as e:
logger.warning(f"⚠️ Redis not available: {e}. Session storage disabled.")
redis_client = None
class SessionManager:
"""Manages user session state (user_email, PEC credentials) in Redis"""
def __init__(self, session_id: str):
self.session_id = session_id
self.key = f"mcp_session:{session_id}"
self.ttl = 3600 # 1 hour
def save(self, data: Dict[str, Any]):
"""Save session data to Redis"""
if redis_client:
try:
redis_client.setex(self.key, self.ttl, json.dumps(data))
logger.debug(f"📝 Session saved: {self.session_id}")
except Exception as e:
logger.error(f"❌ Failed to save session: {e}")
def load(self) -> Dict[str, Any]:
"""Load session data from Redis"""
if redis_client:
try:
data = redis_client.get(self.key)
if data:
logger.debug(f"📖 Session loaded: {self.session_id}")
return json.loads(data)
except Exception as e:
logger.error(f"❌ Failed to load session: {e}")
return {}
def update(self, key: str, value: Any):
"""Update a specific key in session"""
data = self.load()
data[key] = value
self.save(data)
logger.info(f"🔄 Session updated: {self.session_id} - {key} = {value if key != 'pec_password' else '***'}")
def get(self, key: str, default: Any = None) -> Any:
"""Get a specific key from session"""
data = self.load()
return data.get(key, default)
def delete(self):
"""Delete session from Redis"""
if redis_client:
try:
redis_client.delete(self.key)
logger.info(f"🗑️ Session deleted: {self.session_id}")
except Exception as e:
logger.error(f"❌ Failed to delete session: {e}")
# ==================== AUTHENTICATION ====================
def verify_api_key_db(api_key: str) -> Optional[Dict[str, Any]]:
"""Verify API key against database and return user info"""
try:
# Hash the provided key
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
# Check database
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute("""
UPDATE mcp_api_keys
SET last_used_at = CURRENT_TIMESTAMP
WHERE key_hash = %s
AND is_active = TRUE
AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
RETURNING id, key_name, user_email
""", (key_hash,))
result = cursor.fetchone()
conn.commit()
cursor.close()
conn.close()
if result:
logger.info(f"✅ Valid API key: {result['key_name']} (user: {result['user_email']})")
return dict(result)
else:
logger.warning(f"🚫 Invalid or expired API key")
return None
except Exception as e:
logger.error(f"❌ Database error during API key verification: {str(e)}")
return None
async def authenticate(request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Security(security)) -> Dict[str, Any]:
"""FastAPI dependency for API key authentication - supports both header and query param"""
# Try query parameter first (for MCP clients)
api_key = request.query_params.get("api_key")
if not api_key:
# Fallback to Authorization header
if credentials:
api_key = credentials.credentials
if not api_key:
# Check if TRUSTY_LAZY_AUTH is enabled (bypass authentication)
lazy_auth = os.getenv("TRUSTY_LAZY_AUTH", "false").lower() == "true"
if lazy_auth:
logger.info("🟢 TRUSTY_LAZY_AUTH enabled - allowing unauthenticated access")
return {
"user_email": "guest@example.com",
"user_name": "Guest User",
"api_key": None,
"lazy_auth": True
}
logger.warning("🚫 Missing authentication (no header or query param)")
raise HTTPException(
status_code=401,
detail="Missing authentication. Provide: Authorization: Bearer <key> OR ?api_key=<key>",
headers={"WWW-Authenticate": "Bearer"}
)
user_info = verify_api_key_db(api_key)
if not user_info:
raise HTTPException(
status_code=403,
detail="Invalid or expired API key"
)
return user_info
# ==================== OAUTH HELPER FUNCTIONS ====================
# OAuth functionality removed - migrated to TrustyVault
# IRIS MCP tools now receive credentials via TrustyVault integration
# Tools that need OAuth should use TrustyVault's credential management
# ==================== MODELS ====================
class MCPRequest(BaseModel):
"""MCP JSON-RPC request"""
jsonrpc: str = "2.0"
id: Optional[Union[str, int]] = None # JSON-RPC allows string, number or null
method: str
params: Optional[Dict[str, Any]] = None
class MCPResponse(BaseModel):
"""MCP JSON-RPC response"""
jsonrpc: str = "2.0"
id: Optional[Union[str, int]] = None # JSON-RPC allows string, number or null
result: Optional[Dict[str, Any]] = None
error: Optional[Dict[str, Any]] = None
# ==================== MCP TOOL DEFINITIONS ====================
MCP_TOOLS = {
# OAuth Tools - DEPRECATED (use TrustyVault for authentication)
# Removed: oauth_check_status, oauth_get_login_url
# Calendar Tools
"calendar_list_events": {
"description": "List calendar events for a user",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"start_date": {"type": "string", "format": "date-time"},
"end_date": {"type": "string", "format": "date-time"},
"max_results": {"type": "integer", "default": 10}
},
"required": ["access_token", "microsoft_user"]
}
},
"calendar_create_event": {
"description": "Create calendar event for meetings with INTERNAL attendees ONLY (same organization, e.g., @infocert.it). Workflow: 1) Detect if ALL attendees are internal (same domain), 2) Use calendar_find_free_time with ALL attendees to find common free slot, 3) Create event directly. DO NOT use for external attendees (Gmail, Yahoo) - use booking_create instead!",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"subject": {"type": "string"},
"start": {"type": "string", "format": "date-time"},
"end": {"type": "string", "format": "date-time"},
"attendees": {"type": "array", "items": {"type": "string"}},
"body": {"type": "string"},
"location": {"type": "string"},
"is_online": {"type": "boolean", "default": False}
},
"required": ["access_token", "microsoft_user", "subject", "start", "end"]
}
},
"calendar_update_event": {
"description": "Update an existing calendar event",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"event_id": {"type": "string"},
"subject": {"type": "string"},
"start": {"type": "string"},
"end": {"type": "string"},
"body": {"type": "string"},
"location": {"type": "string"}
},
"required": ["access_token", "microsoft_user", "event_id"]
}
},
"calendar_delete_event": {
"description": "Delete a calendar event",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"event_id": {"type": "string"}
},
"required": ["access_token", "microsoft_user", "event_id"]
}
},
"calendar_find_free_time": {
"description": "Find common free time slots for organizer AND attendees. Use 'attendees' parameter to check availability of ALL participants (not just organizer!). For internal meetings: find common free slot among all attendees. For external meetings: check only organizer's availability (external calendars not accessible). IMPORTANT: Parse user time preferences carefully - 'domani mattina' = tomorrow 9-13, 'tra 3 giorni pomeriggio' = 3 days from now 14-18.",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"attendees": {"type": "array", "items": {"type": "string"}, "description": "List of attendee email addresses to check availability for"},
"duration_minutes": {"type": "integer", "description": "Meeting duration in minutes"},
"start_date": {"type": "string", "description": "Start of search window (ISO datetime). Example: for 'domani mattina' use tomorrow at 09:00, for 'tra 3 giorni' use 3 days from now."},
"end_date": {"type": "string", "description": "End of search window (ISO datetime). Example: for 'mattina' use 13:00, for 'pomeriggio' use 18:00, for full day use 18:00."}
},
"required": ["access_token", "microsoft_user", "duration_minutes"]
}
},
# Email Tools
"email_list_messages": {
"description": "List Microsoft 365 email messages. ⚠️ NOTE: If you see attachments named 'smime.p7m' or 'postacert.eml', those are PEC (Italian Certified Email) messages imported into Microsoft mailbox. To access PEC content and attachments properly, use pec_list_messages + pec_get_attachment tools instead (requires pec_user + pec_password from TrustyVault).",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"folder": {"type": "string", "default": "inbox"},
"max_results": {"type": "integer", "default": 10},
"filter": {"type": "string"}
},
"required": ["access_token", "microsoft_user"]
}
},
"email_send_message": {
"description": "Send an email message with optional attachments. To attach files: use 'file_path' from pec_get_attachment/email_get_attachment result (NOT download_url or content_bytes). Example: attachments=[{'file_path': '/app/attachments/20231124_abc123_document.pdf'}]",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"to": {"type": "array", "items": {"type": "string"}},
"subject": {"type": "string"},
"body": {"type": "string"},
"cc": {"type": "array", "items": {"type": "string"}},
"bcc": {"type": "array", "items": {"type": "string"}},
"attachments": {
"type": "array",
"items": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"file_path": {"type": "string", "description": "Path to file on server filesystem (e.g. /app/attachments/file.pdf) - get this from pec_get_attachment or email_get_attachment"},
"name": {"type": "string", "description": "Optional: Override filename"},
"content_type": {"type": "string", "description": "Optional: MIME type"},
"content_bytes": {"type": "string", "description": "Optional: Base64 encoded content (alternative to file_path)"}
}
},
"description": "Attachments with file_path from get_attachment tools"
}
},
"required": ["access_token", "microsoft_user", "to", "subject", "body"]
}
},
"email_get_message": {
"description": "Get full email message content including body and metadata",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"message_id": {"type": "string", "description": "Message ID to retrieve"}
},
"required": ["access_token", "microsoft_user", "message_id"]
}
},
"email_list_attachments": {
"description": "List all attachments for an email message",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"message_id": {"type": "string", "description": "Message ID to get attachments from"}
},
"required": ["access_token", "microsoft_user", "message_id"]
}
},
"email_get_attachment": {
"description": "Get Microsoft email attachment - file is automatically saved to /app/attachments/. ⚠️ USE ONLY for Microsoft 365 email messages from email_list_messages. DO NOT use for PEC messages or attachments named 'smime.p7m' / 'postacert.eml' - those are PEC envelopes, use pec_list_messages + pec_get_attachment instead. Returns: 1) 'file_path' (use for forwarding), 2) 'markdown_content' (for analysis), 3) 'download_url', 4) 'download_link_html' (HTML link with target='_blank'). WORKFLOW EXAMPLE for forwarding: Step 1) result = email_get_attachment(...), Step 2) Extract: file_path = result['data']['file_path'], Step 3) teams_send_message(attachments=[{'file_path': file_path}]). When showing download link to user, use 'download_link_html' so it opens in new tab. The file_path points to the saved file on server filesystem. Supports PDF, DOCX, XLSX, PPTX, images via MarkItDown.",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"message_id": {"type": "string", "description": "Message ID containing the attachment"},
"attachment_id": {"type": "string", "description": "Attachment ID to download"}
},
"required": ["access_token", "microsoft_user", "message_id", "attachment_id"]
}
},
"email_read_attachment_text": {
"description": "Read and extract text from email attachments (PDF, EML, TXT, etc.) without downloading. Returns extracted text content directly.",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"message_id": {"type": "string", "description": "Message ID containing the attachment"},
"attachment_id": {"type": "string", "description": "Attachment ID to read"}
},
"required": ["access_token", "microsoft_user", "message_id", "attachment_id"]
}
},
"email_forward": {
"description": "Forward an email message to one or more recipients",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"message_id": {"type": "string", "description": "Message ID to forward"},
"to": {"type": "array", "items": {"type": "string"}, "description": "Recipients email addresses"},
"comment": {"type": "string", "description": "Optional comment to add to forwarded message"}
},
"required": ["access_token", "microsoft_user", "message_id", "to"]
}
},
"email_search": {
"description": "Search emails using Microsoft Search API with advanced query capabilities. Supports KQL (Keyword Query Language) for precise searches across subject, body, and attachments. Examples: 'belluzzo', 'from:giovanni.belluzzo@infocert.it', 'subject:meeting', 'hasattachments:true'",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"query": {"type": "string", "description": "Search query (supports KQL syntax)"},
"max_results": {"type": "integer", "default": 25, "description": "Maximum number of results to return"},
"enable_top_results": {"type": "boolean", "default": False, "description": "Enable relevance-based ranking"}
},
"required": ["access_token", "microsoft_user", "query"]
}
},
"email_get_conversation": {
"description": "Get all messages in an email conversation/thread. Useful for following entire discussion chains. Provide either conversation_id directly or message_id to retrieve its thread.",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"conversation_id": {"type": "string", "description": "Conversation ID to retrieve (optional if message_id provided)"},
"message_id": {"type": "string", "description": "Message ID to get conversation from (optional if conversation_id provided)"}
},
"required": ["access_token", "microsoft_user"]
}
},
# User Tools
"users_get_profile": {
"description": "Get user profile information. If target_email is omitted, returns YOUR profile (authenticated user via /me endpoint). If target_email is provided, searches for that user's profile. Accepts both UPN and email display - automatically resolves to correct user.",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"target_email": {"type": "string", "description": "OPTIONAL - Email or UPN of user to search (e.g., giovanni.albero@infocert.it or GAL1234@infocert.it). If omitted, returns YOUR profile using /me endpoint."}
},
"required": ["access_token"]
}
},
"users_search": {
"description": "Search for users in organization",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"query": {"type": "string"}
},
"required": ["access_token", "microsoft_user", "query"]
}
},
# Teams Tools
"teams_list_chats": {
"description": "List user's Teams chats (1-on-1 and group chats). Returns chat_id, topic/displayName, and members for each chat. IMPORTANT: When user asks to send message to a group by name (e.g. 'send to LegalVector group'), you MUST: 1) Call teams_list_chats first, 2) Find the chat with matching topic/displayName, 3) Extract its chat_id, 4) Use that chat_id in teams_send_message. DO NOT ask user for member emails if group already exists!",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
},
"required": ["access_token", "microsoft_user"]
}
},
"teams_send_message": {
"description": "Send a message to a Teams chat with optional file attachments. SIMPLIFIED: Provide 'recipient_email' to auto-find/create 1-on-1 chat (e.g., 'giovanni.albero@infocert.it' or 'GAL1234@infocert.it'), OR provide 'chat_id' for existing group chats. ONE OF THE TWO IS REQUIRED (chat_id OR recipient_email). System automatically handles chat discovery and creation. To attach files: use 'file_path' from pec_get_attachment/email_get_attachment result. Example: teams_send_message({recipient_email: 'user@domain.com', message: 'Hello!'})",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"chat_id": {"type": "string", "description": "OPTIONAL - Chat ID to send message to (for existing group chats). Not needed if using recipient_email. ONE OF chat_id OR recipient_email REQUIRED."},
"recipient_email": {"type": "string", "description": "OPTIONAL - Email display or UPN of recipient for 1-on-1 chat (e.g., 'giovanni.albero@infocert.it' or 'GAL1234@infocert.it'). System auto-finds or creates chat. Use this for simplicity. ONE OF chat_id OR recipient_email REQUIRED."},
"message": {"type": "string", "description": "Message content"},
"content_type": {"type": "string", "default": "text", "description": "Content type (text or html)"},
"attachments": {
"type": "array",
"items": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"file_path": {"type": "string", "description": "Path to file on server filesystem (e.g. /app/attachments/file.pdf) - get this from pec_get_attachment or email_get_attachment"},
"name": {"type": "string", "description": "Optional: Override filename"},
"content_type": {"type": "string", "description": "Optional: MIME type"}
}
},
"description": "Attachments with file_path from get_attachment tools (max 3MB per file)"
}
},
"required": ["access_token", "microsoft_user", "message"]
}
},
"teams_create_chat": {
"description": "Create or get existing Teams chat with members. IDEMPOTENT: If chat already exists, returns existing chat_id (no error). AUTO-RESOLVES UPNs: Accepts display emails (e.g. 'firstname.lastname@domain.com') and automatically finds correct UPN. Use this directly to send messages - no need to search users or existing chats first. The caller (user_email) is automatically included. For 1-on-1 chat, provide only the other person's email in members list. Returns chat_id that can be used immediately with teams_send_message.",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com)"},
"members": {"type": "array", "items": {"type": "string"}, "description": "List of OTHER member email addresses (caller is auto-included). Accepts display emails or UPNs - system auto-resolves. For 1-on-1, provide only 1 email."},
"topic": {"type": "string", "description": "Chat topic (for group chats with 3+ people)"}
},
"required": ["access_token", "microsoft_user", "members"]
}
},
# Hybrid Booking Tools
"booking_create": {
"description": "Create booking for meetings with ONE external attendee (Gmail, Yahoo, etc.) + optional internal attendees. IMPORTANT: Always ask user for their email (organizer_email) if not already known. WORKFLOW: 1) Ask organizer email if needed, 2) Ask if they want to PROPOSE slots or CONFIRM specific time, 3a) If PROPOSE: Find 3 free slots and send booking link, OR 3b) If CONFIRM: Create calendar event immediately with specific time. Example: 'Meeting with filippo@gmail.com tomorrow at 10' → Ask email → Ask 'Propose slots or confirm 10am?' → If confirm: set direct_booking=true + proposed_times=['2025-11-26T10:00:00'].",
"inputSchema": {
"type": "object",
"properties": {
"access_token": {"type": "string", "description": "Microsoft Graph API access token from TrustyVault"},
"microsoft_user": {"type": "string", "description": "Microsoft user UPN from TrustyVault (e.g., user@domain.com) - same as organizer_email"},
"organizer_email": {"type": "string", "description": "Email of the meeting organizer (REQUIRED - must be a valid Microsoft account with OAuth access)"},
"external_email": {"type": "string", "description": "Email of external attendee (non-Microsoft account like Gmail)"},
"subject": {"type": "string", "description": "Meeting subject/title"},
"duration_minutes": {"type": "integer", "description": "Meeting duration in minutes"},
"internal_attendees": {
"type": "array",
"description": "Optional: List of internal attendees (@organization) to include. System will check their availability before proposing slots. Format: [{'email': 'user@domain.com', 'name': 'User Name'}]",
"items": {
"type": "object",
"properties": {
"email": {"type": "string"},
"name": {"type": "string"}
},
"required": ["email"]
}
},
"proposed_times": {"type": "array", "items": {"type": "string"}, "description": "Optional: Specific ISO datetime strings to propose. If not provided, system finds 3 available slots automatically"},
"direct_booking": {"type": "boolean", "description": "If true, creates calendar event immediately with first proposed_time (no booking link). If false (default), sends booking link with proposed slots for external attendee to choose.", "default": False},
"attachments": {
"type": "array",
"description": "Optional: Files to attach to meeting invite (will be included when external user confirms booking)",
"items": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "Filename with extension"},
"contentType": {"type": "string", "description": "MIME type (e.g., application/pdf, application/vnd.openxmlformats-officedocument.presentationml.presentation)"},
"contentBytes": {"type": "string", "description": "Base64-encoded file content"}
},
"required": ["name", "contentBytes"]
}
}
},
"required": ["access_token", "microsoft_user", "organizer_email", "external_email", "subject", "duration_minutes"]
}
},
"booking_check_status": {
"description": "Check status of booking request",
"inputSchema": {
"type": "object",
"properties": {
"session_id": {"type": "string"}
},
"required": ["access_token", "microsoft_user", "session_id"]
}
},
"booking_list": {
"description": "List all booking requests",
"inputSchema": {
"type": "object",
"properties": {
"organizer_email": {"type": "string"}
}
}
},
"booking_cancel": {
"description": "Cancel a booking request",
"inputSchema": {
"type": "object",
"properties": {
"session_id": {"type": "string"}
},
"required": ["access_token", "microsoft_user", "session_id"]
}
},
# REMOVED: Infocert Digital Signature Tools (separate MCP system)
# PEC (Posta Elettronica Certificata) Tools
"pec_list_messages": {
"description": "List PEC (Italian Certified Email) messages from INBOX with automatic unwrapping. Extracts original content from postacert.eml. CREDENTIALS: pec_user and pec_password are REQUIRED parameters.",
"inputSchema": {
"type": "object",
"properties": {
"pec_user": {"type": "string", "description": "PEC username/email from TrustyVault"},
"pec_password": {"type": "string", "description": "PEC password from TrustyVault"},
"folder": {"type": "string", "description": "IMAP folder (default: INBOX)", "default": "INBOX"},
"max_results": {"type": "integer", "description": "Maximum number of messages (default: 10)", "default": 10}
},
"required": ["pec_user", "pec_password"]
}
},
"pec_get_message": {
"description": "Get single PEC message with automatic sbustamento (unwrapping). Returns COMPACT summary by default (optimized for LLM agents). Set compact=false for full body content. CREDENTIALS: pec_user and pec_password are REQUIRED parameters.",
"inputSchema": {
"type": "object",
"properties": {
"pec_user": {"type": "string", "description": "PEC username/email from TrustyVault"},
"pec_password": {"type": "string", "description": "PEC password from TrustyVault"},
"message_id": {"type": "string", "description": "Message ID from pec_list_messages"},
"compact": {
"type": "boolean",
"description": "If true (default), returns compact summary with body preview (300 chars). If false, returns full body content (can be large, 50-100KB). Attachments data never included - use pec_get_attachment() to download.",
"default": True
}
},
"required": ["pec_user", "pec_password", "message_id"]
}
},
"pec_get_attachment": {
"description": "Get PEC (Italian Certified Email) attachment with automatic unwrapping - file is automatically saved to /app/attachments/. ⚠️ USE ONLY for PEC messages from pec_list_messages. If you see smime.p7m or postacert.eml attachments in Microsoft email, those ARE PEC messages - use pec_list_messages to access them properly with automatic unwrapping. Returns: 1) 'file_path' (use for forwarding), 2) 'markdown_content' (for analysis), 3) 'download_url', 4) 'download_link_html' (HTML link with target='_blank'). WORKFLOW EXAMPLE for forwarding: Step 1) result = pec_get_attachment(...), Step 2) Extract: file_path = result['data']['file_path'], Step 3) teams_send_message(attachments=[{'file_path': file_path}]). When showing download link to user, use 'download_link_html' so it opens in new tab. The file_path points to the saved file on server filesystem. Supports PDF, DOCX, XLSX, PPTX, images via MarkItDown. CREDENTIALS: pec_user and pec_password are REQUIRED parameters.",
"inputSchema": {
"type": "object",
"properties": {
"pec_user": {"type": "string", "description": "PEC username/email from TrustyVault"},
"pec_password": {"type": "string", "description": "PEC password from TrustyVault"},
"message_id": {"type": "string", "description": "Message ID"},
"attachment_index": {"type": "integer", "description": "Index of attachment (0-based)"}
},
"required": ["pec_user", "pec_password", "message_id", "attachment_index"]
}
},
"pec_send_message": {
"description": "Send PEC (Italian Certified Email) message via SMTP. CREDENTIALS: pec_user and pec_password are REQUIRED parameters.",
"inputSchema": {
"type": "object",
"properties": {
"pec_user": {"type": "string", "description": "PEC username/email from TrustyVault"},
"pec_password": {"type": "string", "description": "PEC password from TrustyVault"},
"to": {"type": "string", "description": "Recipient email address"},
"subject": {"type": "string", "description": "Email subject"},
"body": {"type": "string", "description": "Email body (text)"},
"attachments": {"type": "array", "description": "Optional attachments (base64 encoded)", "items": {"type": "object"}}
},
"required": ["pec_user", "pec_password", "to", "subject", "body"]
}
},
# File Management Tools
"file_upload": {
"description": "Upload file from any interface (WhatsApp, Teams, PWA, web) and get public download URL. Accepts base64-encoded file content and saves to server. Returns file_path (for internal use with email/teams/pec tools) and download_url (public link for TrustySign, user download, etc.). UNIVERSAL: Works with all agent interfaces.",
"inputSchema": {
"type": "object",
"properties": {
"filename": {"type": "string", "description": "Original filename (e.g., 'document.pdf')"},
"content_base64": {"type": "string", "description": "Base64-encoded file content from user upload widget"},
"content_type": {"type": "string", "description": "MIME type (e.g., 'application/pdf', 'image/jpeg')"}
},
"required": ["filename", "content_base64"]
}
}
}
# ==================== TOOL EXECUTION ====================
def check_oauth_error(result: Dict[str, Any], user_email: str) -> Dict[str, Any]:
"""Check if result contains OAuth error and add login URL if needed.
Args:
result: Operation result dictionary
user_email: User email for logging purposes only
Returns:
Result with OAuth login URL added if authentication required
"""
error_message = result.get("error", {}).get("message", "")
if not result.get("success") and "No valid access token available" in error_message:
# Extract base URL from OAUTH_REDIRECT_URI or use default
redirect_uri = os.getenv("OAUTH_REDIRECT_URI", "https://trustypa.brainaihub.tech/oauth/callback")
# Remove /oauth/callback if present to get base URL
base_url = redirect_uri.replace("/oauth/callback", "")
login_url = f"{base_url}/oauth/login"
result["error"]["oauth_required"] = True
result["error"]["login_url"] = login_url
result["error"]["instructions"] = f"Please authenticate by visiting: {login_url}"
return result
# ==================== TOOL REGISTRY ====================
# Generic mapping of tool_name -> (operation_type, action_name, user_email_param)
TOOL_REGISTRY = {
# Calendar Tools
"calendar_list_events": ("calendar", "list", "user_email"),
"calendar_create_event": ("calendar", "create", "user_email"),
"calendar_update_event": ("calendar", "update", "user_email"),
"calendar_delete_event": ("calendar", "delete", "user_email"),
"calendar_find_free_time": ("calendar", "find_times", "user_email"),
# Email Tools
"email_list_messages": ("email", "list", "user_email"),
"email_send_message": ("email", "send", "user_email"),
"email_get_message": ("email", "get", "user_email"),
"email_list_attachments": ("email", "list_attachments", "user_email"),
"email_get_attachment": ("email", "get_attachment", "user_email"),
"email_read_attachment_text": ("email", "read_attachment_text", "user_email"),
"email_forward": ("email", "forward", "user_email"),
"email_search": ("email", "search", "user_email"),
"email_get_conversation": ("email", "get_conversation", "user_email"),
# User Tools
"users_get_profile": ("user", "get", "user_email"),
"users_search": ("user", "search", "user_email"),
# Teams Tools
"teams_list_chats": ("teams", "list_chats", "user_email"),
"teams_send_message": ("teams", "send_message", "user_email"),
"teams_create_chat": ("teams", "create_chat", "user_email"),
}
async def execute_tool(tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Execute an MCP tool and return the result"""
try:
logger.info(f"Executing tool: {tool_name} with params: {params}")
# === SPECIAL TOOLS (Booking) ===
# Hybrid Booking Tools - use async direct methods (not BaseOperation)
if tool_name == "booking_create":
result = await hybrid_booking_ops.create_booking_request(
organizer_email=params["organizer_email"],
external_email=params["external_email"],
subject=params["subject"],
duration_minutes=params["duration_minutes"],
proposed_times=params.get("proposed_times", []),
internal_attendees=params.get("internal_attendees", []),
attachments=params.get("attachments", []),
direct_booking=params.get("direct_booking", False)
)
return check_oauth_error(result, params["organizer_email"])
elif tool_name == "booking_check_status":
result = await hybrid_booking_ops.get_booking_status(
session_id=params["session_id"]
)
return result
elif tool_name == "booking_list":
result = await hybrid_booking_ops.list_bookings(
organizer_email=params.get("organizer_email")
)
if params.get("organizer_email"):
return check_oauth_error(result, params["organizer_email"])
return result
elif tool_name == "booking_cancel":
result = await hybrid_booking_ops.cancel_booking(
session_id=params["session_id"]
)
return result
# === INFOCERT TOOLS REMOVED (separate MCP system) ===
# === PEC TOOLS ===
elif tool_name == "pec_list_messages":
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
pec_ops.execute,
"list_messages",
params
)
return result
elif tool_name == "pec_get_message":
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
pec_ops.execute,
"get_message",
params
)
return result
elif tool_name == "pec_get_attachment":
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
pec_ops.execute,
"get_attachment",
params
)
return result
elif tool_name == "pec_send_message":
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
pec_ops.execute,
"send_message",
params
)
return result
elif tool_name == "file_upload":
# Import attachment helper
from .attachment_helper import save_attachment
import base64
try:
# Decode base64 content
content_base64 = params.get("content_base64", "")
filename = params.get("filename", "file")
content_type = params.get("content_type", "application/octet-stream")
# Decode base64
try:
content_bytes = base64.b64decode(content_base64)
except Exception as e:
return {
"success": False,
"error": {"code": "INVALID_BASE64", "message": f"Invalid base64 encoding: {str(e)}"}
}
# Save file using attachment_helper
file_info = save_attachment(content_bytes, filename)
return {
"success": True,
"file_path": file_info["file_path"],
"download_url": file_info["download_url"],
"download_link_html": file_info["download_link_html"],
"filename": filename,
"size": len(content_bytes),
"content_type": content_type
}
except Exception as e:
logger.error(f"❌ file_upload error: {e}", exc_info=True)
return {
"success": False,
"error": {"code": "UPLOAD_FAILED", "message": str(e)}
}
# === HYBRID BOOKING AUTO-DETECTION ===
# Intercept calendar_create_event with external attendees
elif tool_name == "calendar_create_event":
try:
# REMOVED AUTO-REDIRECT TO HYBRID BOOKING
# Agent is now responsible for choosing:
# - calendar_create_event: for fixed date/time meetings (immediate creation)
# - booking_create: for flexible scheduling with multiple options
# This ensures calendar_create_event behaves as expected:
# Creates the event immediately without waiting for confirmation
logger.info(f"📅 Creating Outlook event with attendees: {params.get('attendees', [])}")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, calendar_ops.execute, "create", params)
return check_oauth_error(result, params["user_email"])
except Exception as e:
logger.error(f"❌ Error in calendar_create_event auto-detection: {e}", exc_info=True)
# Fallback to normal calendar creation
logger.warning(f"⚠️ Falling back to normal calendar creation")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, calendar_ops.execute, "create", params)
return check_oauth_error(result, params["user_email"])
# === GENERIC TOOL EXECUTION (Registry-based) ===
elif tool_name in TOOL_REGISTRY:
operation_type, action, user_email_param = TOOL_REGISTRY[tool_name]
# Get the operation instance
operation_map = {
"calendar": calendar_ops,
"email": email_ops,
"user": user_ops,
"teams": teams_ops
# "infocert": infocert_ops # REMOVED: separate MCP system
}
operation = operation_map.get(operation_type)
if not operation:
return {"error": f"Unknown operation type: {operation_type}"}
# Execute the action in a thread pool to avoid blocking event loop
# (operation.execute() is sync and makes blocking HTTP requests)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, operation.execute, action, params)
# Apply OAuth error checking if user_email is present
if user_email_param and user_email_param in params:
return check_oauth_error(result, params[user_email_param])
return result
else:
return {"error": f"Unknown tool: {tool_name}"}
except Exception as e:
logger.error(f"Error executing tool {tool_name}: {str(e)}", exc_info=True)
return {"error": str(e)}
# ==================== MCP PROTOCOL HANDLERS ====================
async def handle_mcp_request(request: MCPRequest, session: Optional[SessionManager] = None) -> Optional[MCPResponse]:
"""Handle incoming MCP JSON-RPC request. Returns None for notifications."""
try:
method = request.method
params = request.params or {}
logger.info(f"Handling MCP request: {method}")
# Handle notifications (no response needed)
if request.id is None:
if method.startswith("notifications/"):
logger.info(f"Received notification: {method}")
return None
else:
logger.warning(f"Received request without id: {method}")
return None
# Initialize handshake
if method == "initialize":
return MCPResponse(
id=request.id,
result={
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "iris-mcp-server",
"version": "1.0.0"
}
}
)
# List available tools
elif method == "tools/list":
tools = [
{
"name": name,
"description": info["description"],
"inputSchema": info["inputSchema"]
}
for name, info in MCP_TOOLS.items()
]
return MCPResponse(
id=request.id,
result={"tools": tools}
)
# Execute tool
elif method == "tools/call":
tool_name = params.get("name")
tool_params = params.get("arguments", {})
if tool_name not in MCP_TOOLS:
return MCPResponse(
id=request.id,
error={
"code": -32602,
"message": f"Unknown tool: {tool_name}"
}
)
# ========== MIGRATE SESSION FROM GUEST TO USER_ID ==========
# Check if state contains user_id and session needs migration
if session and "state" in tool_params and isinstance(tool_params["state"], dict):
state = tool_params["state"]
if "user_id" in state:
user_id = state["user_id"]
current_session_id = session.session_id
# If currently using guest session AND not already migrated, do migration
if "guest@example.com" in current_session_id:
# Check if already migrated for this user
target_session_id = f"session_user_{user_id}"
target_session = SessionManager(target_session_id)
# Only migrate if target session exists and has user_id set
# Otherwise create/migrate
if not target_session.get("user_id"):
logger.info(f"🔄 Migrating session from guest to user_id: {user_id}")
# Copy existing data from guest session to user session
old_data = session.load()
old_data["user_id"] = user_id
target_session.save(old_data)
logger.info(f"✅ Session created: {target_session_id}")
# Update active_connections mapping to point to user session
connection_id = session.get("connection_id", "guest@example.com")
active_connections[f"{connection_id}_session"] = target_session_id
# Switch to user session
session = target_session
logger.info(f"📂 Using user session: {target_session_id}")
# ========== AUTO-INJECT CREDENTIALS FROM DB (PERSISTENT!) ==========
# NOTE: Microsoft tools now use access_token from TrustyVault (passed directly)
# Only PEC tools still use thread_id-based credential lookup
# Get thread_id from tool params (for PEC tools only now)
thread_id = tool_params.get("thread_id")
if thread_id:
logger.info(f"🔍 Looking up credentials for thread_id: {thread_id[:20]}...")
# Auto-inject PEC credentials if missing
if tool_name.startswith("pec_"):
if "pec_email" not in tool_params or not tool_params["pec_email"]:
# Try DB first (persistent)
creds = get_pec_credentials(thread_id)
if creds and creds.pec_email and creds.pec_password:
tool_params["pec_email"] = creds.pec_email
tool_params["pec_password"] = creds.pec_password
tool_params["_credentials_auto_filled"] = True
logger.info(f"✅ Auto-injected PEC credentials from DB: {creds.pec_email}")
# Fallback to session (backward compatibility)
elif session:
stored_pec_email = session.get("pec_email")
stored_pec_password = session.get("pec_password")
if stored_pec_email and stored_pec_password:
tool_params["pec_email"] = stored_pec_email
tool_params["pec_password"] = stored_pec_password
tool_params["_credentials_auto_filled"] = True
logger.info(f"🔄 Auto-injected PEC credentials from session: {stored_pec_email}")
# Auto-inject Infocert credentials if missing (future use)
else:
logger.warning("⚠️ No thread_id in tool_params - cannot lookup persistent credentials")
# ========== EXECUTE TOOL WITH CREDENTIALS ==========
result = await execute_tool(tool_name, tool_params)
# ========== INJECT CREDENTIAL REMINDER IN RESPONSE ==========
# If PEC credentials were auto-injected, add them to response so agent remembers
if tool_name.startswith("pec_") and tool_params.get("_credentials_auto_filled"):
try:
# Parse result and add credential info
if isinstance(result, str):
result_data = json.loads(result)
if isinstance(result_data, dict) and "data" in result_data:
# Add credential reminder to metadata
if "metadata" not in result_data:
result_data["metadata"] = {}
result_data["metadata"]["pec_credentials_used"] = {
"pec_email": tool_params.get("pec_email"),
"note": "Credentials auto-filled from session - remember these for next PEC operations"
}
result = json.dumps(result_data)
logger.info(f"📌 Added credential reminder to response for agent")
except Exception as e:
logger.warning(f"Failed to add credential reminder: {e}")
# ========== SAVE CREDENTIALS TO DB + SESSION ==========
# NOTE: Microsoft credentials no longer saved - access_token comes from TrustyVault
if thread_id:
# Save PEC credentials if provided
if tool_name.startswith("pec_"):
pec_email = tool_params.get("pec_email")
pec_password = tool_params.get("pec_password")
if pec_email and pec_password:
try:
save_pec_credentials(thread_id, pec_email, pec_password)
logger.info(f"💾 Saved PEC credentials to DB: {pec_email}")
except Exception as e:
logger.error(f"Failed to save PEC credentials to DB: {e}")
# Also save to session (backward compatibility)
if session:
session.update("pec_email", pec_email)
session.update("pec_password", pec_password)
# Also save to session if available (backward compatibility)
if session and not thread_id:
# Save user_email if provided
if "user_email" in tool_params and tool_params["user_email"]:
session.update("user_email", tool_params["user_email"])
# Save PEC credentials if provided
if tool_name.startswith("pec_"):
if "pec_email" in tool_params and tool_params["pec_email"]:
session.update("pec_email", tool_params["pec_email"])
if "pec_password" in tool_params and tool_params["pec_password"]:
session.update("pec_password", tool_params["pec_password"])
logger.info(f"💾 PEC credentials saved to session: {tool_params.get('pec_email')}")
# Save file_path from attachment operations for later reuse
if tool_name in ["pec_get_attachment", "email_get_attachment"]:
logger.info(f"🔍 Checking if we should save attachment reference for {tool_name}")
try:
# Result is a JSON string, parse it first
result_parsed = json.loads(result) if isinstance(result, str) else result
logger.info(f"📋 Parsed result type: {type(result_parsed)}, keys: {result_parsed.keys() if isinstance(result_parsed, dict) else 'not a dict'}")
# Navigate through nested structure: result -> data -> data -> file_path
file_info = result_parsed
if isinstance(file_info, dict) and "data" in file_info:
file_info = file_info["data"]
logger.info(f"📋 First level data, keys: {file_info.keys() if isinstance(file_info, dict) else 'not a dict'}")
# For PEC, there's another nested 'data' level
if isinstance(file_info, dict) and "data" in file_info:
file_info = file_info["data"]
logger.info(f"📋 Second level data, keys: {file_info.keys() if isinstance(file_info, dict) else 'not a dict'}")
if isinstance(file_info, dict) and "file_path" in file_info:
# Save with metadata for easy retrieval
attachment_ref = {
"file_path": file_info["file_path"],
"filename": file_info.get("filename", ""),
"download_url": file_info.get("download_url", ""),
"content_type": file_info.get("content_type", ""),
"timestamp": datetime.now().isoformat()
}
session.update("last_attachment", attachment_ref)
logger.info(f"💾 Saved attachment reference: {file_info.get('filename')} at {file_info['file_path']}")
else:
logger.warning(f"⚠️ No file_path found in file_info: {file_info if isinstance(file_info, dict) else type(file_info)}")
except Exception as e:
logger.error(f"❌ Error saving attachment reference: {e}", exc_info=True)
return MCPResponse(
id=request.id,
result={
"content": [
{
"type": "text",
"text": json.dumps(result, indent=2)
}
]
}
)
# Ping
elif method == "ping":
return MCPResponse(id=request.id, result={})
else:
return MCPResponse(
id=request.id,
error={
"code": -32601,
"message": f"Method not found: {method}"
}
)
except Exception as e:
logger.error(f"Error handling MCP request: {str(e)}", exc_info=True)
return MCPResponse(
id=request.id,
error={
"code": -32603,
"message": f"Internal error: {str(e)}"
}
)
# ==================== HTTP ENDPOINTS ====================
@app.get("/mcp/")
async def mcp_root():
"""MCP root endpoint - Server information"""
return {
"service": "IRIS MCP HTTP Server",
"version": "1.0.0",
"protocol": "MCP over HTTP/SSE",
"status": "online",
"endpoints": {
"health": "/mcp/health",
"sse": "/mcp/sse",
"tools": "/mcp/tools",
"messages": "/mcp/messages"
},
"tools_count": len(MCP_TOOLS),
"timestamp": datetime.utcnow().isoformat()
}
@app.get("/mcp/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"service": "IRIS MCP HTTP Server",
"version": "1.0.0",
"timestamp": datetime.utcnow().isoformat(),
"protocol": "MCP over HTTP/SSE",
"tools_count": len(MCP_TOOLS)
}
@app.get("/mcp/attachments/download/{filename}")
async def download_attachment(filename: str):
"""
Serve attachment files publicly.
Public URL: https://trustypa.brainaihub.tech/iris/attachments/download/{filename}
(nginx routes /iris/ to iris-app:8001/mcp/)
Files are stored in /app/attachments with token prefix for uniqueness.
This endpoint makes files accessible via public URL for:
- User download (email/pec attachments)
- Cross-tool usage (TrustySign signing IRIS files)
- External services
"""
attachments_dir = Path("/app/attachments")
file_path = attachments_dir / filename
# Security: Prevent directory traversal
if not file_path.resolve().is_relative_to(attachments_dir.resolve()):
raise HTTPException(status_code=403, detail="Access denied")
# Check file exists
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="File not found")
# Serve file with proper content type
return FileResponse(
path=str(file_path),
filename=filename,
media_type='application/octet-stream'
)
@app.get("/mcp/sse")
async def mcp_sse_endpoint(request: Request, user_info: Dict[str, Any] = Depends(authenticate)):
"""SSE endpoint for MCP protocol (server to client messages)"""
# Use user email as connection ID (one connection per user)
connection_id = user_info['user_email']
queue: asyncio.Queue = asyncio.Queue()
active_connections[connection_id] = queue
# Create or reuse session for this user (persistent across reconnections)
# Use user_email as session_id so credentials persist across SSE reconnects
session_id = f"session_{connection_id}" # e.g., "session_yyi9910@infocert.it"
session = SessionManager(session_id)
session.update('connection_id', connection_id)
session.update('user_email', user_info['user_email'])
# Store session_id in connection metadata
active_connections[f"{connection_id}_session"] = session_id
logger.info(f"New SSE connection: {connection_id} (session: {session_id})")
async def event_generator() -> AsyncGenerator[str, None]:
try:
# Send endpoint event as required by MCP SSE specification
# This tells the client where to send POST requests
yield {
"event": "endpoint",
"data": "/mcp/message"
}
# Keep connection alive and send messages from queue
while True:
try:
# Wait for message with timeout (non-blocking check for disconnect)
message = await asyncio.wait_for(queue.get(), timeout=30.0)
logger.info(f"SSE sending message to {connection_id}: {message}")
# Send as SSE message event
yield {
"event": "message",
"data": json.dumps(message)
}
except asyncio.TimeoutError:
# Check if client disconnected before sending ping
if await request.is_disconnected():
logger.info(f"SSE client disconnected during timeout: {connection_id}")
break
# Send keepalive ping
logger.debug(f"SSE sending keepalive ping to {connection_id}")
yield {
"event": "message",
"data": json.dumps({
"jsonrpc": "2.0",
"method": "ping"
})
}
finally:
# Cleanup connection (but keep session in Redis for reconnections)
if connection_id in active_connections:
del active_connections[connection_id]
# Remove session reference from memory (but NOT from Redis)
# Session will expire automatically after TTL (default 1 hour)
# This allows credentials to persist across SSE reconnections
session_key = f"{connection_id}_session"
if session_key in active_connections:
del active_connections[session_key]
logger.info(f"SSE connection closed: {connection_id} (session preserved in Redis)")
return EventSourceResponse(event_generator())
@app.post("/mcp/message")
async def mcp_message_endpoint(raw_request: Request, user_info: Dict[str, Any] = Depends(authenticate)):
"""HTTP endpoint for MCP protocol (client to server messages)"""
try:
user_email = user_info['user_email']
# Log raw body for debugging
body = await raw_request.body()
logger.info(f"Raw request body: {body.decode('utf-8')}")
# Parse as MCPRequest
data = await raw_request.json()
request = MCPRequest(**data)
logger.info(f"Received MCP message: {request.method} from {user_email}")
# Find the user's SSE connection queue
if user_email not in active_connections:
logger.error(f"No active SSE connection for user: {user_email}")
return JSONResponse(
status_code=400,
content={
"jsonrpc": "2.0",
"error": {
"code": -32000,
"message": "No active SSE connection. Connect to /mcp/sse first."
}
}
)
# Get session for this connection
session_key = f"{user_email}_session"
session = None
if session_key in active_connections:
session_id = active_connections[session_key]
session = SessionManager(session_id)
logger.debug(f"📂 Using session: {session_id}")
# Process the request with session
response = await handle_mcp_request(request, session)
# If it's a notification (no response), just acknowledge
if response is None:
logger.info(f"Notification processed (no response): {request.method}")
return JSONResponse(status_code=202, content={"status": "accepted"})
# Put response in the SSE queue
queue = active_connections[user_email]
await queue.put(response.dict(exclude_none=True))
logger.info(f"Response queued for SSE delivery to {user_email}")
# Return 202 Accepted (response will be sent via SSE)
return JSONResponse(status_code=202, content={"status": "accepted"})
except HTTPException:
raise
except Exception as e:
logger.error(f"Error processing MCP message: {e}", exc_info=True)
return JSONResponse(
status_code=422,
content={
"jsonrpc": "2.0",
"error": {
"code": -32700,
"message": f"Parse error: {str(e)}"
}
}
)
@app.get("/mcp/tools")
async def list_tools(user_info: Dict[str, Any] = Depends(authenticate)):
"""Convenience endpoint to list available tools"""
return {
"tools": [
{
"name": name,
"description": info["description"],
"inputSchema": info["inputSchema"]
}
for name, info in MCP_TOOLS.items()
]
}
# ==================== BOOKING OPERATIONS ENDPOINT ====================
class OperationRequest(BaseModel):
"""Standard operation request for bookings endpoint"""
action: str = Field(..., description="Action to perform")
params: Dict[str, Any] = Field(default_factory=dict, description="Action parameters")
@app.post("/mcp/bookings_operations")
async def bookings_operations(request: OperationRequest):
"""
Booking operations endpoint (Microsoft Bookings + Hybrid Booking).
This endpoint handles both Microsoft Bookings API operations and
custom Hybrid Booking workflow for meetings with external attendees.
Supported actions:
- Microsoft Bookings API (list_businesses, create_appointment, etc.)
- Hybrid Booking Operations (create_hybrid_session, confirm_hybrid_booking, etc.)
"""
logger.info(f"📋 Bookings operation: {request.action}")
logger.info(f"📋 Parameters: {request.params}")
try:
result = booking_ops.execute(request.action, request.params)
logger.info(f"✅ Operation {request.action} completed successfully")
if not result["success"]:
logger.error(f"❌ Operation {request.action} failed: {result}")
raise HTTPException(status_code=400, detail=result)
return result
except Exception as e:
logger.error(f"❌ Exception in bookings_operations: {e}", exc_info=True)
raise HTTPException(status_code=500, detail={
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
})
# ==================== MAIN ====================
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("MCP_HTTP_PORT", "8001"))
logger.info(f"Starting IRIS MCP HTTP Server on port {port}")
logger.info(f"Available tools: {len(MCP_TOOLS)}")
uvicorn.run(
app,
host="0.0.0.0",
port=port,
log_level="info"
)