Skip to main content
Glama
server.py34.6 kB
""" 82ch - Unified MCP Security Framework Single entry point for Observer (MCP Proxy) + Engine (Threat Detection). Runs both components in a single process. """ import os import sys import asyncio from aiohttp import web from utils import safe_print from pathlib import Path # 현재 서버를 실행 중인 파이썬 인터프리터 경로 (cross-platform) PYTHON_CMD = sys.executable # config_finder.py의 절대 경로 BASE_DIR = Path(__file__).resolve().parent CONFIG_FINDER_PATH = BASE_DIR / "transports" / "config_finder.py" # Force UTF-8 encoding for stdin/stdout to handle Unicode properly # This prevents encoding issues on Windows (cp949) and other systems if sys.stdin.encoding != 'utf-8': sys.stdin.reconfigure(encoding='utf-8', errors='replace') if sys.stdout.encoding != 'utf-8': sys.stdout.reconfigure(encoding='utf-8', errors='replace') # Force unbuffered output for real-time logging sys.stdout.reconfigure(line_buffering=True) sys.stderr.reconfigure(line_buffering=True) from transports.stdio_handlers import ( handle_verify_request, handle_verify_response, handle_register_tools ) from transports.config_finder import ClaudeConfigFinder from state import state from config import config # Global flag to track if config has been restored _config_restored = False def restore_original_config(): """Restore original config files from backup""" global _config_restored if _config_restored: return _config_restored = True safe_print("\n[Config] Restoring original configuration...") try: import subprocess result = subprocess.run( [PYTHON_CMD, str(CONFIG_FINDER_PATH), '--restore'], text=True, timeout=10, capture_output=True ) if result.returncode == 0: safe_print("[Config] Configuration restored successfully") else: safe_print(f"[Config] Restore warning: {result.stderr}") except Exception as e: safe_print(f"[Config] Failed to restore configuration: {e}") # Register signal handlers for graceful shutdown with config restoration import signal def signal_handler(signum, frame): """Handle termination signals""" sig_name = signal.Signals(signum).name safe_print(f"\n[Server] Received {sig_name} signal") restore_original_config() sys.exit(0) # Register signal handlers signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # Process termination # Windows-specific: handle console close events (X button, shutdown, logoff) if sys.platform == 'win32': signal.signal(signal.SIGBREAK, signal_handler) # Also use atexit as fallback import atexit atexit.register(restore_original_config) # Use win32api for console control handler (handles X button better) try: import win32api import win32con def console_ctrl_handler(ctrl_type): """Handle Windows console control events""" if ctrl_type in (win32con.CTRL_CLOSE_EVENT, win32con.CTRL_SHUTDOWN_EVENT, win32con.CTRL_LOGOFF_EVENT): safe_print(f"\n[Server] Console control event: {ctrl_type}") restore_original_config() return True # Handled return False # Not handled win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) safe_print("[Server] Windows console control handler registered") except ImportError: safe_print("[Server] win32api not available, using signal handlers only") # Engine components from database import Database from event_hub import EventHub from engines.tools_poisoning_engine import ToolsPoisoningEngine from engines.command_injection_engine import CommandInjectionEngine from engines.file_system_exposure_engine import FileSystemExposureEngine from engines.pii_leak_engine import PIILeakEngine from engines.data_exfiltration_engine import DataExfiltrationEngine # WebSocket handler from websocket_handler import ws_handler def setup_engines(db: Database) -> list: """Initialize and configure detection engines based on config.""" engines = [] # Tools Poisoning Engine (LLM-based) if config.get_tools_poisoning_enabled(): try: engine = ToolsPoisoningEngine(db) engines.append(engine) except Exception as e: safe_print(f"[Engine] Failed to initialize ToolsPoisoningEngine: {e}") # Command Injection Engine if config.get_command_injection_enabled(): try: engine = CommandInjectionEngine(db) engines.append(engine) except Exception as e: safe_print(f"[Engine] Failed to initialize CommandInjectionEngine: {e}") # File System Exposure Engine if config.get_file_system_exposure_enabled(): try: engine = FileSystemExposureEngine(db) engines.append(engine) except Exception as e: safe_print(f"[Engine] Failed to initialize FileSystemExposureEngine: {e}") # PII Leak Engine if config.get_pii_leak_enabled(): try: engine = PIILeakEngine(db) engines.append(engine) except Exception as e: safe_print(f"[Engine] Failed to initialize PIILeakEngine: {e}") # Data Exfiltration Engine if config.get_data_exfiltration_enabled(): try: engine = DataExfiltrationEngine(db) engines.append(engine) except Exception as e: safe_print(f"[Engine] Failed to initialize DataExfiltrationEngine: {e}") return engines async def initialize_engine_system(): """Initialize the engine detection system.""" safe_print("=" * 80) safe_print("Initializing Engine System") safe_print("=" * 80) # Initialize database db = Database() await db.connect() # Setup engines engines = setup_engines(db) if engines: safe_print(f"\nActive Detection Engines ({len(engines)}):") for i, engine in enumerate(engines, 1): safe_print(f" {i}. {engine.name}") else: safe_print("\nWarning: No detection engines enabled!") # Initialize WebSocket handler await ws_handler.start() # Initialize EventHub with WebSocket handler event_hub = EventHub(engines, db, ws_handler) await event_hub.start() # Store in global state state.event_hub = event_hub safe_print("\nEngine system initialized successfully") safe_print("=" * 80) return db, event_hub async def handle_health(request): """Health check endpoint.""" return web.Response( text='{"status": "ok", "components": ["observer", "engine"]}', content_type='application/json' ) async def handle_analysis_status(request): """Get analysis status for all servers.""" import json from state import state statuses = {} for server_name, status in state.analysis_status.items(): statuses[server_name] = { 'total_tools': status.total_tools, 'analyzed_tools': status.analyzed_tools, 'malicious_found': status.malicious_found, 'status': status.status, 'started_at': status.started_at.isoformat(), 'completed_at': status.completed_at.isoformat() if status.completed_at else None, 'progress_percent': int((status.analyzed_tools / status.total_tools * 100) if status.total_tools > 0 else 0) } return web.Response( text=json.dumps(statuses), content_type='application/json' ) async def handle_get_tools_safety(request): """ Get safety status for tools of a specific server. Request body: { "mcp_tag": "server_name", "filter_dangerous": true // optional, default true } Response: { "tools": { "tool_name1": 1, // safety value (0-3) "tool_name2": 3, ... }, "dangerous_tools": ["tool_name2"], // safety=3 tools "filter_enabled": true } """ import json try: data = await request.json() mcp_tag = data.get('mcp_tag') if not mcp_tag: return web.Response( status=400, text=json.dumps({"error": "mcp_tag is required"}), content_type='application/json' ) db = request.app.get('db') if not db: return web.Response( status=500, text=json.dumps({"error": "Database not available"}), content_type='application/json' ) # Get all tools safety status for the server cursor = await db.conn.execute( """ SELECT tool, safety FROM mcpl WHERE mcpTag = ? """, (mcp_tag,) ) rows = await cursor.fetchall() tools_safety = {} dangerous_tools = [] for row in rows: tool_name, safety = row tools_safety[tool_name] = safety if safety is not None else 0 if safety == 3: # 조치필요 dangerous_tools.append(tool_name) filter_enabled = config.get_dangerous_tool_filter_enabled() return web.Response( text=json.dumps({ "tools": tools_safety, "dangerous_tools": dangerous_tools, "filter_enabled": filter_enabled }), content_type='application/json' ) except Exception as e: safe_print(f"[Server] Error in handle_get_tools_safety: {e}") return web.Response( status=500, text=json.dumps({"error": str(e)}), content_type='application/json' ) async def handle_update_tool_safety(request): """ Update safety status for a specific tool manually. Request body: { "mcp_tag": "server_name", "tool_name": "tool_name", "safety": 1 // 0-3 } Response: { "success": true } """ import json try: data = await request.json() mcp_tag = data.get('mcp_tag') tool_name = data.get('tool_name') safety = data.get('safety') if not mcp_tag or not tool_name or safety is None: return web.Response( status=400, text=json.dumps({"error": "mcp_tag, tool_name, and safety are required"}), content_type='application/json' ) if safety not in (0, 1, 2, 3): return web.Response( status=400, text=json.dumps({"error": "safety must be 0, 1, 2, or 3"}), content_type='application/json' ) db = request.app.get('db') if not db: return web.Response( status=500, text=json.dumps({"error": "Database not available"}), content_type='application/json' ) success = await db.set_tool_safety_manual(mcp_tag, tool_name, safety) if success: # Broadcast tool safety update via WebSocket if ws_handler: asyncio.create_task(ws_handler.broadcast_tool_safety_update(mcp_tag, tool_name, safety)) return web.Response( text=json.dumps({"success": True}), content_type='application/json' ) else: return web.Response( status=500, text=json.dumps({"error": "Failed to update safety"}), content_type='application/json' ) except Exception as e: safe_print(f"[Server] Error in handle_update_tool_safety: {e}") return web.Response( status=500, text=json.dumps({"error": str(e)}), content_type='application/json' ) async def handle_export_threats(request): """ Export all detected threats to CSV format. Response: CSV file with all engine_results data """ import csv import io from datetime import datetime try: db = request.app.get('db') if not db: return web.Response( status=500, text='{"error": "Database not available"}', content_type='application/json' ) # Query all engine results with raw event details query = """ SELECT er.id, er.engine_name, er.serverName, er.producer, er.severity, er.score, er.detail, er.created_at, re.ts as event_timestamp, re.event_type, re.pname as app_name FROM engine_results er LEFT JOIN raw_events re ON er.raw_event_id = re.id ORDER BY er.created_at DESC """ async with db.conn.execute(query) as cursor: rows = await cursor.fetchall() columns = [desc[0] for desc in cursor.description] # Create CSV in memory output = io.StringIO() writer = csv.writer(output) # Write header writer.writerow(columns) # Write data for row in rows: writer.writerow(row) csv_data = output.getvalue() output.close() # Generate filename with timestamp timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'82ch_threats_{timestamp}.csv' return web.Response( body=csv_data, headers={ 'Content-Type': 'text/csv', 'Content-Disposition': f'attachment; filename="{filename}"' } ) except Exception as e: safe_print(f"[Server] Error in handle_export_threats: {e}") return web.Response( status=500, text=json.dumps({"error": str(e)}), content_type='application/json' ) async def handle_delete_database(request): """ Clear all data from database tables (faster and safer than deleting files). Response: { "success": true, "message": "Database cleared successfully" } """ import json try: db = request.app.get('db') if not db: return web.Response( status=500, text=json.dumps({"error": "Database not available"}), content_type='application/json' ) safe_print(f"[Server] Clearing all database tables...") # Delete all data from tables (keep schema) tables_to_clear = [ 'raw_events', 'rpc_events', 'engine_results', 'mcpl' ] cleared_tables = [] for table in tables_to_clear: try: # Delete all rows await db.conn.execute(f"DELETE FROM {table}") # Reset autoincrement counter await db.conn.execute(f"DELETE FROM sqlite_sequence WHERE name='{table}'") cleared_tables.append(table) safe_print(f"[Server] Cleared table: {table}") except Exception as e: safe_print(f"[Server] Warning: Could not clear table {table}: {e}") # Commit all changes await db.conn.commit() # Vacuum to reclaim space and optimize try: await db.conn.execute("VACUUM") safe_print(f"[Server] Database vacuumed successfully") except Exception as e: safe_print(f"[Server] Warning: Vacuum failed: {e}") safe_print(f"[Server] Database cleared successfully") # Broadcast reload_all to all clients from websocket_handler import ws_handler if ws_handler: asyncio.create_task(ws_handler.broadcast_reload_all()) safe_print(f"[Server] Broadcasted reload_all to clients") return web.Response( text=json.dumps({ "success": True, "message": "Database cleared successfully", "cleared_tables": cleared_tables, "restart_required": False }), content_type='application/json' ) except Exception as e: safe_print(f"[Server] Error in handle_delete_database: {e}") import traceback traceback.print_exc() return web.Response( status=500, text=json.dumps({"error": str(e)}), content_type='application/json' ) async def handle_get_custom_rules(request): """ Get all custom rules, optionally filtered by engine name. Query params: engine_name: Optional engine name filter (e.g., 'pii_leak_engine') Response: { "rules": [ { "id": 1, "engine_name": "pii_leak_engine", "rule_name": "custom_pattern", "rule_content": "rule CustomPattern { ... }", "enabled": 1, "category": "PII", "description": "Custom PII pattern", "created_at": "2025-01-01 00:00:00", "updated_at": "2025-01-01 00:00:00" } ] } """ import json try: db = request.app.get('db') if not db: return web.Response( status=500, text=json.dumps({"error": "Database not available"}), content_type='application/json' ) engine_name = request.rel_url.query.get('engine_name') rules = await db.get_custom_rules(engine_name=engine_name) return web.Response( text=json.dumps({"rules": rules}), content_type='application/json' ) except Exception as e: safe_print(f"[Server] Error in handle_get_custom_rules: {e}") return web.Response( status=500, text=json.dumps({"error": str(e)}), content_type='application/json' ) async def handle_add_custom_rule(request): """ Add a new custom YARA rule. Request body: { "engine_name": "pii_leak_engine", "rule_name": "custom_pattern", "rule_content": "rule CustomPattern { strings: $a = \"pattern\" condition: $a }", "category": "PII", // optional "description": "Custom PII pattern" // optional } Response: { "success": true, "rule_id": 1 } """ import json import yara try: data = await request.json() engine_name = data.get('engine_name') rule_name = data.get('rule_name') rule_content = data.get('rule_content') category = data.get('category') description = data.get('description') if not engine_name or not rule_name or not rule_content: return web.Response( status=400, text=json.dumps({"error": "engine_name, rule_name, and rule_content are required"}), content_type='application/json' ) # Validate YARA rule syntax try: yara.compile(source=rule_content) except Exception as e: return web.Response( status=400, text=json.dumps({"error": f"Invalid YARA rule syntax: {str(e)}"}), content_type='application/json' ) db = request.app.get('db') if not db: return web.Response( status=500, text=json.dumps({"error": "Database not available"}), content_type='application/json' ) try: rule_id = await db.insert_custom_rule( engine_name=engine_name, rule_name=rule_name, rule_content=rule_content, category=category, description=description ) # Reload rules in the engine event_hub = request.app.get('event_hub') if event_hub: await event_hub.reload_engine_rules(engine_name) # Broadcast rule update via WebSocket if ws_handler: asyncio.create_task(ws_handler.broadcast_custom_rule_update(engine_name)) return web.Response( text=json.dumps({"success": True, "rule_id": rule_id}), content_type='application/json' ) except Exception as db_error: # Handle database-specific errors (e.g., duplicate rule) error_msg = str(db_error) if 'already exists' in error_msg: return web.Response( status=400, text=json.dumps({"error": error_msg}), content_type='application/json' ) else: return web.Response( status=500, text=json.dumps({"error": f"Failed to insert custom rule: {error_msg}"}), content_type='application/json' ) except Exception as e: safe_print(f"[Server] Error in handle_add_custom_rule: {e}") import traceback traceback.print_exc() return web.Response( status=500, text=json.dumps({"error": str(e)}), content_type='application/json' ) async def handle_delete_custom_rule(request): """ Delete a custom rule by ID. URL param: rule_id: Rule ID to delete Response: { "success": true } """ import json try: rule_id = int(request.match_info.get('rule_id')) db = request.app.get('db') if not db: return web.Response( status=500, text=json.dumps({"error": "Database not available"}), content_type='application/json' ) # Get rule info before deletion for reload rules = await db.get_custom_rules() rule_to_delete = next((r for r in rules if r['id'] == rule_id), None) success = await db.delete_custom_rule(rule_id) if success: # Reload rules in the engine if we found the rule if rule_to_delete: event_hub = request.app.get('event_hub') if event_hub: await event_hub.reload_engine_rules(rule_to_delete['engine_name']) # Broadcast rule update via WebSocket if ws_handler: asyncio.create_task(ws_handler.broadcast_custom_rule_update(rule_to_delete['engine_name'])) return web.Response( text=json.dumps({"success": True}), content_type='application/json' ) else: return web.Response( status=500, text=json.dumps({"error": "Failed to delete custom rule"}), content_type='application/json' ) except Exception as e: safe_print(f"[Server] Error in handle_delete_custom_rule: {e}") return web.Response( status=500, text=json.dumps({"error": str(e)}), content_type='application/json' ) async def handle_toggle_custom_rule(request): """ Enable or disable a custom rule. Request body: { "rule_id": 1, "enabled": true } Response: { "success": true } """ import json try: data = await request.json() rule_id = data.get('rule_id') enabled = data.get('enabled') if rule_id is None or enabled is None: return web.Response( status=400, text=json.dumps({"error": "rule_id and enabled are required"}), content_type='application/json' ) db = request.app.get('db') if not db: return web.Response( status=500, text=json.dumps({"error": "Database not available"}), content_type='application/json' ) # Get rule info before toggle for reload rules = await db.get_custom_rules() rule_to_toggle = next((r for r in rules if r['id'] == rule_id), None) success = await db.toggle_custom_rule(rule_id, enabled) if success: # Reload rules in the engine if we found the rule if rule_to_toggle: event_hub = request.app.get('event_hub') if event_hub: await event_hub.reload_engine_rules(rule_to_toggle['engine_name']) # Broadcast rule update via WebSocket if ws_handler: asyncio.create_task(ws_handler.broadcast_custom_rule_update(rule_to_toggle['engine_name'])) return web.Response( text=json.dumps({"success": True}), content_type='application/json' ) else: return web.Response( status=500, text=json.dumps({"error": "Failed to toggle custom rule"}), content_type='application/json' ) except Exception as e: safe_print(f"[Server] Error in handle_toggle_custom_rule: {e}") return web.Response( status=500, text=json.dumps({"error": str(e)}), content_type='application/json' ) def setup_routes(app): """Setup application routes.""" # Health check app.router.add_get('/health', handle_health) # Analysis status app.router.add_get('/analysis/status', handle_analysis_status) # WebSocket endpoint app.router.add_get('/ws', ws_handler.handle_websocket) # STDIO verification API endpoints app.router.add_post('/verify/request', handle_verify_request) app.router.add_post('/verify/response', handle_verify_response) app.router.add_post('/register-tools', handle_register_tools) # Tools safety API endpoints app.router.add_post('/tools/safety', handle_get_tools_safety) app.router.add_post('/tools/safety/update', handle_update_tool_safety) # Database management API endpoints app.router.add_get('/database/export', handle_export_threats) app.router.add_post('/database/delete', handle_delete_database) # Custom rules API endpoints app.router.add_get('/rules/custom', handle_get_custom_rules) app.router.add_post('/rules/custom', handle_add_custom_rule) app.router.add_delete('/rules/custom/{rule_id}', handle_delete_custom_rule) app.router.add_post('/rules/custom/toggle', handle_toggle_custom_rule) safe_print(f"[Server] Routes configured:") safe_print(f" GET /health - Health check") safe_print(f" GET /ws - WebSocket endpoint (real-time updates)") safe_print(f" POST /verify/request - STDIO verification API") safe_print(f" POST /verify/response - STDIO verification API") safe_print(f" POST /register-tools - Tool registration") safe_print(f" POST /tools/safety - Tools safety status") safe_print(f" POST /tools/safety/update - Update tool safety manually") safe_print(f" GET /database/export - Export threats to CSV") safe_print(f" POST /database/delete - Delete database and restart") safe_print(f" GET /rules/custom - Get custom rules") safe_print(f" POST /rules/custom - Add custom rule") safe_print(f" DELETE /rules/custom/{{rule_id}} - Delete custom rule") safe_print(f" POST /rules/custom/toggle - Toggle custom rule") async def on_startup(app): """Called when the application starts.""" state.running = True safe_print("\n" + "=" * 80) safe_print("82ch - MCP Security Framework") safe_print("=" * 80) safe_print("Observer + Engine integrated mode") safe_print("=" * 80) # Configure Claude Desktop config on startup import subprocess try: result = subprocess.run( [PYTHON_CMD, str(CONFIG_FINDER_PATH)], text=True, timeout=30, capture_output=True # stderr 보려고 이거 넣는 것도 추천 ) if result.returncode == 0: safe_print("\n[Config] Claude Desktop configured successfully") else: safe_print(f"\n[Config] Warning: Configuration failed - {result.stderr}") except Exception as e: safe_print(f"\n[Config] Warning: Failed to configure Claude Desktop - {e}") # Initialize engine system try: db, event_hub = await initialize_engine_system() # Store references for cleanup app['db'] = db app['event_hub'] = event_hub except Exception as e: safe_print(f"[Server] Warning: Failed to initialize engines: {e}") safe_print("[Server] Continuing in Observer-only mode...") # Create minimal database without engines db = Database() await db.connect() app['db'] = db safe_print(f"\n[Observer] Starting HTTP server...") safe_print(f"[Observer] Listening on http://{config.server_host}:{config.server_port}") safe_print(f"[Observer] Scan mode: {config.scan_mode}") safe_print("\n" + "=" * 80) safe_print("All components ready. Waiting for connections...") safe_print("Press Ctrl+C to stop") safe_print("=" * 80 + "\n") async def on_shutdown(app): """Called when the application shuts down.""" state.running = False safe_print(f"[Server] Cleanup starting...") # Restore original config files restore_original_config() # Close all SSE connections gracefully if state.sse_connections: safe_print(f"[Server] Closing {len(state.sse_connections)} SSE connections...") connections_to_close = list(state.sse_connections.values()) for conn in connections_to_close: try: # Send a close event to client if possible if conn.client_response and not conn.client_response._eof_sent: try: await conn.client_response.write_eof() except: pass # Close target session if exists if conn.target_session and not conn.target_session.closed: await conn.target_session.close() except Exception as e: safe_print(f"[Server] Error closing SSE connection {conn.connection_id}: {e}") # Clear all connections state.sse_connections.clear() safe_print(f"[Server] All SSE connections closed") # Stop WebSocket handler try: await asyncio.wait_for(ws_handler.stop(), timeout=0.5) except asyncio.TimeoutError: safe_print("[Server] WebSocket handler timeout") except Exception as e: safe_print(f"[Server] WebSocket handler error: {e}") # Stop EventHub if state.event_hub: try: await asyncio.wait_for(state.event_hub.stop(), timeout=0.5) except asyncio.TimeoutError: safe_print("[Server] EventHub timeout") except Exception as e: safe_print(f"[Server] EventHub error: {e}") # Close database if 'db' in app: try: await asyncio.wait_for(app['db'].close(), timeout=0.5) except asyncio.TimeoutError: safe_print("[Server] Database timeout") except Exception as e: safe_print(f"[Server] Database error: {e}") safe_print(f"[Server] Cleanup done") def create_app(): """Create and configure the aiohttp application.""" app = web.Application() # Setup routes setup_routes(app) # Setup lifecycle callbacks app.on_startup.append(on_startup) app.on_shutdown.append(on_shutdown) return app async def start_server(): """Main entry point.""" app = create_app() runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, config.server_host, config.server_port) await site.start() safe_print(f"[Observer] Listening on http://{config.server_host}:{config.server_port}") # Run the server try: while True: await asyncio.sleep(1) except (KeyboardInterrupt, asyncio.CancelledError): safe_print("\n[Server] Interrupted") finally: # Stop the site FIRST to prevent new connections and close the socket try: await site.stop() safe_print("[Server] Site stopped") except Exception as e: safe_print(f"[Server] Site stop error: {e}") # Then trigger shutdown callbacks to clean up SSE connections, etc. try: await asyncio.wait_for(app.shutdown(), timeout=2.0) except asyncio.TimeoutError: safe_print("[Server] App shutdown timeout") except Exception as e: safe_print(f"[Server] App shutdown error: {e}") # Cancel all remaining tasks try: tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] if tasks: safe_print(f"[Server] Cancelling {len(tasks)} remaining tasks...") for task in tasks: task.cancel() # Wait for tasks to complete cancellation with suppressed exceptions await asyncio.gather(*tasks, return_exceptions=True) except Exception as e: safe_print(f"[Server] Task cancellation error: {e}") # Final cleanup try: await asyncio.wait_for(runner.cleanup(), timeout=1.0) except asyncio.TimeoutError: safe_print("[Server] Runner cleanup timeout") except Exception as e: safe_print(f"[Server] Runner cleanup error: {e}") try: await asyncio.wait_for(app.cleanup(), timeout=1.0) except asyncio.TimeoutError: safe_print("[Server] App cleanup timeout") except Exception as e: safe_print(f"[Server] App cleanup error: {e}") safe_print("[Server] Server stopped") if __name__ == '__main__': try: asyncio.run(start_server()) except KeyboardInterrupt: # Clean exit, already handled in start_server() pass

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/seungwon9201/MCP-Dandan'

If you have feedback or need assistance with the MCP directory API, please join our Discord server