Skip to main content
Glama
harishsg993010

Damn Vulnerable Model Context Protocol (DVMCP)

server.py8.32 kB
import os import json from mcp.server.fastmcp import FastMCP, Context # Create a vulnerable MCP server for Challenge 7: Token Theft mcp = FastMCP("Challenge 7 - Token Theft") # Create a directory for token storage os.makedirs("/tmp/dvmcp_challenge7", exist_ok=True) # Create a token storage file with insecurely stored tokens TOKEN_FILE = "/tmp/dvmcp_challenge7/tokens.json" if not os.path.exists(TOKEN_FILE): tokens = { "email_service": { "service_name": "EmailPro API", "api_key": "epro_api_5f4e3d2c1b0a9z8y7x", "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkFDTUUgQ29ycCIsImVtYWlsX2FjY2VzcyI6InRydWUiLCJpYXQiOjE2NTE4ODQ4MDB9.5TxfEAVbZRGbKnXaL9Lrx9NqXSNFw2ac4FDzPHvRzSQ", "refresh_token": "rt_7y6t5r4e3w2q1z0x9c8v7b6n5m4k3j2h1g0f", "expires_at": "2025-12-31T23:59:59Z" }, "cloud_storage": { "service_name": "CloudBox Storage", "api_key": "cbx_api_1a2b3c4d5e6f7g8h9i", "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI5ODc2NTQzMjEwIiwibmFtZSI6IkFDTUUgQ29ycCIsInN0b3JhZ2VfYWNjZXNzIjoidHJ1ZSIsImlhdCI6MTY1MTg4NDgwMH0.8FhJ7Z5KFUEJFoQW2xeUL9_NOzlKB3j8fKvxU_5qB4Y", "refresh_token": "rt_9i8u7y6t5r4e3w2q1z0x9c8v7b6n5m", "expires_at": "2025-12-31T23:59:59Z" }, "analytics_platform": { "service_name": "DataMetrics Analytics", "api_key": "dm_api_9z8y7x6w5v4u3t2s1r", "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI1NDMyMTA5ODc2IiwibmFtZSI6IkFDTUUgQ29ycCIsImFuYWx5dGljc19hY2Nlc3MiOiJ0cnVlIiwiaWF0IjoxNjUxODg0ODAwfQ.QzHJ2_8NXiPd5Vl6r3S1VJqFaC3-U_a9FPRJXBgU8Pg", "refresh_token": "rt_3e4r5t6y7u8i9o0p1a2s3d4f5g6h7j", "expires_at": "2025-12-31T23:59:59Z" } } with open(TOKEN_FILE, "w") as f: json.dump(tokens, f, indent=2) # Function to read tokens def get_tokens(): with open(TOKEN_FILE, "r") as f: return json.load(f) # Add a resource with information about integrated services @mcp.resource("services://list") def list_services() -> str: """List of external services integrated with the system""" tokens = get_tokens() services = [] for service_id, service_info in tokens.items(): services.append(f"- {service_info['service_name']} (ID: {service_id})") return "Integrated External Services:\n" + "\n".join(services) # Add a vulnerable tool to check email @mcp.tool() def check_email(folder: str = "inbox") -> str: """Check emails in the specified folder. Args: folder: Email folder to check (inbox, sent, drafts, etc.) Returns: List of emails in the specified folder """ # VULNERABILITY: This tool leaks token information in error messages and logs # Get the email service token tokens = get_tokens() email_token = tokens.get("email_service", {}) # Simulate checking emails if folder.lower() == "inbox": return """ Inbox (3 messages): 1. From: john@example.com Subject: Meeting tomorrow Date: April 15, 2025 2. From: sales@vendor.com Subject: New product catalog Date: April 14, 2025 3. From: system@acmecorp.com Subject: Password reset request Date: April 13, 2025 """ elif folder.lower() == "sent": return """ Sent (2 messages): 1. To: team@acmecorp.com Subject: Project status update Date: April 14, 2025 2. To: support@vendor.com Subject: License renewal question Date: April 12, 2025 """ else: # VULNERABILITY: Error message leaks token information error_log = f""" Error accessing folder: {folder} Debug information: Service: {email_token.get('service_name')} Endpoint: https://api.emailpro.com/v1/folders/{folder} Method: GET Authorization: Bearer {email_token.get('access_token')} API Key: {email_token.get('api_key')} Please check if the folder exists and you have permission to access it. """ return error_log # Add a tool to send email @mcp.tool() def send_email(to: str, subject: str, body: str) -> str: """Send an email. Args: to: Recipient email address subject: Email subject body: Email body content Returns: Confirmation of email sent """ # Simulate sending an email return f""" Email sent successfully: To: {to} Subject: {subject} Body length: {len(body)} characters Message ID: MSG-{hash(to + subject) % 10000:04d} """ # Add a vulnerable tool to check service status @mcp.tool() def check_service_status(service_id: str) -> str: """Check the status of an integrated external service. Args: service_id: ID of the service to check (email_service, cloud_storage, analytics_platform) Returns: Status information for the specified service """ # Get the tokens tokens = get_tokens() # Check if the service exists if service_id in tokens: service_info = tokens[service_id] # VULNERABILITY: The status check includes the full token information in the response status_info = f""" Service Status: {service_info['service_name']} ----------------------------------------- Status: Active API Endpoint: https://api.{service_id.replace('_', '')}.com/v1/status Last Checked: April 16, 2025 02:45:12 UTC Connection Details: - API Key: {service_info['api_key']} - Access Token: {service_info['access_token']} - Token Expiration: {service_info['expires_at']} Service is operating normally. """ return status_info else: return f"Error: Service '{service_id}' not found." # Add a tool to view system logs @mcp.tool() def view_system_logs(log_type: str = "system") -> str: """View system logs. Args: log_type: Type of logs to view (system, access, error) Returns: Recent log entries of the specified type """ if log_type.lower() == "system": return """ System Logs (recent entries): [2025-04-16 02:30:15] System startup completed [2025-04-16 02:31:02] Service 'email_service' connected successfully [2025-04-16 02:31:05] Service 'cloud_storage' connected successfully [2025-04-16 02:31:08] Service 'analytics_platform' connected successfully [2025-04-16 02:45:12] Scheduled status check completed - All services operational """ elif log_type.lower() == "access": return """ Access Logs (recent entries): [2025-04-16 02:35:22] User 'admin' logged in [2025-04-16 02:36:45] User 'admin' accessed email service [2025-04-16 02:40:18] User 'admin' accessed cloud storage [2025-04-16 02:42:33] User 'admin' accessed analytics platform """ elif log_type.lower() == "error": # VULNERABILITY: Error logs contain sensitive token information tokens = get_tokens() email_token = tokens.get("email_service", {}) return f""" Error Logs (recent entries): [2025-04-16 01:15:33] Failed to connect to email service - Retrying [2025-04-16 01:15:35] Connection established with new token [2025-04-16 01:15:35] Token details: Bearer {email_token.get('access_token')} [2025-04-16 01:20:42] Database query timeout - Retrying [2025-04-16 01:20:45] Database connection restored """ else: return f"Error: Log type '{log_type}' not recognized. Available types: system, access, error" # Run the server if __name__ == "__main__": import uvicorn print("Starting Challenge 7 - Token Theft MCP Server") print("Connect to this server using an MCP client (e.g., Claude Desktop or MCP Inspector)") print("Server running at http://localhost:8007") uvicorn.run("server:mcp", host="0.0.0.0", port=8007)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/harishsg993010/damn-vulnerable-MCP-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server