Skip to main content
Glama

UnrealBlueprintMCP

by BestDev
dashboard_server.py19 kB
#!/usr/bin/env python3 """ Web Dashboard Server for UnrealBlueprintMCP Provides a real-time web interface for monitoring and controlling MCP operations with comprehensive project management features. """ import asyncio import json import logging import os import time from datetime import datetime, timedelta from typing import Dict, List, Any, Optional from pathlib import Path import sys # Add parent directory to path for imports sys.path.append(str(Path(__file__).parent.parent)) from config import MCP_SERVER_URL, WEB_DASHBOARD_HOST, WEB_DASHBOARD_PORT try: from fastapi import FastAPI, WebSocket, Request, HTTPException, BackgroundTasks from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware import uvicorn import websockets from pydantic import BaseModel except ImportError: print("Required packages not installed. Run: pip install fastapi uvicorn jinja2 websockets") sys.exit(1) # Data models class BlueprintCreateRequest(BaseModel): name: str parent_class: str asset_path: str components: List[Dict[str, Any]] = [] properties: Dict[str, Any] = {} class OperationRequest(BaseModel): operation_type: str parameters: Dict[str, Any] priority: int = 1 class DashboardServer: """Main dashboard server class""" def __init__(self, host: str = WEB_DASHBOARD_HOST, port: int = WEB_DASHBOARD_PORT): self.host = host self.port = port self.mcp_server_url = MCP_SERVER_URL # Initialize FastAPI app self.app = FastAPI(title="UnrealBlueprintMCP Dashboard", version="1.0.0") self._setup_middleware() self._setup_routes() # Data storage self.connected_clients: List[WebSocket] = [] self.operation_history: List[Dict[str, Any]] = [] self.performance_metrics: List[Dict[str, Any]] = [] self.current_status = { "server_status": "unknown", "connections": 0, "operations_per_minute": 0, "last_updated": datetime.now().isoformat() } # Setup logging logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) # Setup templates and static files self.templates = Jinja2Templates(directory=Path(__file__).parent / "templates") # Mount static files static_dir = Path(__file__).parent / "static" if static_dir.exists(): self.app.mount("/static", StaticFiles(directory=static_dir), name="static") def _setup_middleware(self): """Setup middleware""" self.app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def _setup_routes(self): """Setup API routes""" @self.app.get("/", response_class=HTMLResponse) async def dashboard_home(request: Request): """Main dashboard page""" return self.templates.TemplateResponse("index.html", { "request": request, "title": "UnrealBlueprintMCP Dashboard", "status": self.current_status }) @self.app.get("/blueprints", response_class=HTMLResponse) async def blueprints_page(request: Request): """Blueprint management page""" return self.templates.TemplateResponse("blueprints.html", { "request": request, "title": "Blueprint Manager" }) @self.app.get("/performance", response_class=HTMLResponse) async def performance_page(request: Request): """Performance monitoring page""" return self.templates.TemplateResponse("performance.html", { "request": request, "title": "Performance Monitor" }) # API Routes @self.app.get("/api/status") async def get_status(): """Get current system status""" status = await self._get_mcp_server_status() return JSONResponse(status) @self.app.get("/api/blueprints") async def list_blueprints(): """List all blueprints""" blueprints = await self._get_blueprints_list() return JSONResponse(blueprints) @self.app.post("/api/blueprints") async def create_blueprint(request: BlueprintCreateRequest, background_tasks: BackgroundTasks): """Create a new blueprint""" operation_id = f"create_{request.name}_{int(time.time())}" # Add to background tasks background_tasks.add_task( self._execute_blueprint_creation, operation_id, request ) return JSONResponse({ "operation_id": operation_id, "status": "queued", "message": f"Blueprint creation queued for {request.name}" }) @self.app.get("/api/performance") async def get_performance_metrics(): """Get performance metrics""" return JSONResponse({ "current": self.current_status, "history": self.performance_metrics[-100:], # Last 100 entries "summary": await self._calculate_performance_summary() }) @self.app.get("/api/operations") async def get_operations(): """Get operation history""" return JSONResponse({ "operations": self.operation_history[-50:], # Last 50 operations "total": len(self.operation_history) }) @self.app.post("/api/operations") async def queue_operation(request: OperationRequest, background_tasks: BackgroundTasks): """Queue a new operation""" operation_id = f"op_{int(time.time())}" background_tasks.add_task( self._execute_operation, operation_id, request ) return JSONResponse({ "operation_id": operation_id, "status": "queued" }) # WebSocket routes @self.app.websocket("/ws/status") async def websocket_status(websocket: WebSocket): """WebSocket for real-time status updates""" await self._handle_websocket_connection(websocket, "status") @self.app.websocket("/ws/operations") async def websocket_operations(websocket: WebSocket): """WebSocket for operation updates""" await self._handle_websocket_connection(websocket, "operations") @self.app.websocket("/ws/performance") async def websocket_performance(websocket: WebSocket): """WebSocket for performance metrics""" await self._handle_websocket_connection(websocket, "performance") async def _handle_websocket_connection(self, websocket: WebSocket, channel: str): """Handle WebSocket connections""" await websocket.accept() self.connected_clients.append(websocket) try: while True: # Send updates based on channel if channel == "status": await websocket.send_json(self.current_status) elif channel == "operations": recent_ops = self.operation_history[-5:] if self.operation_history else [] await websocket.send_json({"recent_operations": recent_ops}) elif channel == "performance": if self.performance_metrics: await websocket.send_json(self.performance_metrics[-1]) await asyncio.sleep(5) # Update every 5 seconds except Exception as e: self.logger.error(f"WebSocket error: {e}") finally: if websocket in self.connected_clients: self.connected_clients.remove(websocket) async def _get_mcp_server_status(self) -> Dict[str, Any]: """Get MCP server status""" try: async with websockets.connect(self.mcp_server_url, timeout=5) as ws: # Send a simple health check health_check = { "jsonrpc": "2.0", "id": "health_check", "method": "tools/call", "params": { "name": "get_server_status", "arguments": {} } } await ws.send(json.dumps(health_check)) response = await asyncio.wait_for(ws.recv(), timeout=5) self.current_status.update({ "server_status": "online", "connections": len(self.connected_clients), "last_updated": datetime.now().isoformat(), "response_time": "< 1s" }) except Exception as e: self.current_status.update({ "server_status": "offline", "error": str(e), "last_updated": datetime.now().isoformat() }) return self.current_status async def _get_blueprints_list(self) -> List[Dict[str, Any]]: """Get list of blueprints from MCP server""" try: async with websockets.connect(self.mcp_server_url, timeout=10) as ws: request = { "jsonrpc": "2.0", "id": "list_blueprints", "method": "tools/call", "params": { "name": "list_blueprints", "arguments": {} } } await ws.send(json.dumps(request)) response = await ws.recv() result = json.loads(response) if "result" in result: return result["result"].get("blueprints", []) else: return [] except Exception as e: self.logger.error(f"Error getting blueprints list: {e}") return [] async def _execute_blueprint_creation(self, operation_id: str, request: BlueprintCreateRequest): """Execute blueprint creation in background""" operation = { "id": operation_id, "type": "create_blueprint", "status": "running", "started_at": datetime.now().isoformat(), "name": request.name } self.operation_history.append(operation) try: async with websockets.connect(self.mcp_server_url, timeout=30) as ws: # Create blueprint create_request = { "jsonrpc": "2.0", "id": operation_id, "method": "tools/call", "params": { "name": "create_blueprint", "arguments": { "blueprint_name": request.name, "parent_class": request.parent_class, "asset_path": request.asset_path } } } await ws.send(json.dumps(create_request)) response = await ws.recv() result = json.loads(response) if "error" in result: operation["status"] = "failed" operation["error"] = result["error"] else: operation["status"] = "completed" operation["result"] = result operation["completed_at"] = datetime.now().isoformat() # Notify connected clients await self._broadcast_update("operation_completed", operation) except Exception as e: operation["status"] = "failed" operation["error"] = str(e) operation["completed_at"] = datetime.now().isoformat() async def _execute_operation(self, operation_id: str, request: OperationRequest): """Execute generic operation in background""" operation = { "id": operation_id, "type": request.operation_type, "status": "running", "started_at": datetime.now().isoformat(), "parameters": request.parameters } self.operation_history.append(operation) try: # Execute operation based on type if request.operation_type == "batch_create": result = await self._execute_batch_operation(request.parameters) elif request.operation_type == "performance_scan": result = await self._execute_performance_scan(request.parameters) else: result = {"message": "Operation type not implemented"} operation["status"] = "completed" operation["result"] = result operation["completed_at"] = datetime.now().isoformat() except Exception as e: operation["status"] = "failed" operation["error"] = str(e) operation["completed_at"] = datetime.now().isoformat() async def _execute_batch_operation(self, parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute batch operation""" # Placeholder for batch operation logic await asyncio.sleep(2) # Simulate work return {"message": "Batch operation completed", "items_processed": parameters.get("count", 0)} async def _execute_performance_scan(self, parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute performance scan""" # Placeholder for performance scan logic await asyncio.sleep(3) # Simulate work return {"message": "Performance scan completed", "issues_found": 0} async def _calculate_performance_summary(self) -> Dict[str, Any]: """Calculate performance summary statistics""" if not self.performance_metrics: return {"message": "No performance data available"} recent_metrics = self.performance_metrics[-10:] if len(self.performance_metrics) >= 10 else self.performance_metrics return { "avg_response_time": "< 1s", "success_rate": "99.5%", "operations_today": len([op for op in self.operation_history if datetime.fromisoformat(op.get("started_at", "1970-01-01")) > datetime.now() - timedelta(days=1)]), "server_uptime": "99.9%" } async def _broadcast_update(self, event_type: str, data: Any): """Broadcast update to all connected WebSocket clients""" if not self.connected_clients: return message = { "event": event_type, "data": data, "timestamp": datetime.now().isoformat() } # Send to all connected clients disconnected_clients = [] for client in self.connected_clients: try: await client.send_json(message) except: disconnected_clients.append(client) # Remove disconnected clients for client in disconnected_clients: self.connected_clients.remove(client) async def start_background_tasks(self): """Start background monitoring tasks""" asyncio.create_task(self._monitor_performance()) asyncio.create_task(self._update_status()) async def _monitor_performance(self): """Monitor performance metrics""" while True: try: # Collect performance metrics metric = { "timestamp": datetime.now().isoformat(), "cpu_percent": 15.0, # Placeholder "memory_mb": 256.0, # Placeholder "active_connections": len(self.connected_clients), "operations_count": len(self.operation_history) } self.performance_metrics.append(metric) # Keep only last 1000 metrics if len(self.performance_metrics) > 1000: self.performance_metrics = self.performance_metrics[-1000:] except Exception as e: self.logger.error(f"Error monitoring performance: {e}") await asyncio.sleep(10) # Collect every 10 seconds async def _update_status(self): """Update system status regularly""" while True: try: await self._get_mcp_server_status() # Calculate operations per minute now = datetime.now() one_minute_ago = now - timedelta(minutes=1) recent_ops = [op for op in self.operation_history if datetime.fromisoformat(op.get("started_at", "1970-01-01")) > one_minute_ago] self.current_status["operations_per_minute"] = len(recent_ops) except Exception as e: self.logger.error(f"Error updating status: {e}") await asyncio.sleep(30) # Update every 30 seconds def run(self): """Run the dashboard server""" self.logger.info(f"Starting dashboard server on {self.host}:{self.port}") # Start background tasks asyncio.create_task(self.start_background_tasks()) # Run server uvicorn.run( self.app, host=self.host, port=self.port, log_level="info" ) def main(): """Main function""" import argparse parser = argparse.ArgumentParser(description="UnrealBlueprintMCP Web Dashboard") parser.add_argument("--host", default=WEB_DASHBOARD_HOST, help="Host to bind to") parser.add_argument("--port", type=int, default=WEB_DASHBOARD_PORT, help="Port to bind to") parser.add_argument("--debug", action="store_true", help="Enable debug mode") args = parser.parse_args() dashboard = DashboardServer(host=args.host, port=args.port) if args.debug: logging.getLogger().setLevel(logging.DEBUG) try: dashboard.run() except KeyboardInterrupt: print("\nDashboard server stopped") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BestDev/unreal_bp_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server