dashboard_server.py•19 kB
#!/usr/bin/env python3
"""
Web Dashboard Server for UnrealBlueprintMCP
Provides a real-time web interface for monitoring and controlling
MCP operations with comprehensive project management features.
"""
import asyncio
import json
import logging
import os
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from pathlib import Path
import sys
# Add parent directory to path for imports
sys.path.append(str(Path(__file__).parent.parent))
from config import MCP_SERVER_URL, WEB_DASHBOARD_HOST, WEB_DASHBOARD_PORT
try:
from fastapi import FastAPI, WebSocket, Request, HTTPException, BackgroundTasks
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import websockets
from pydantic import BaseModel
except ImportError:
print("Required packages not installed. Run: pip install fastapi uvicorn jinja2 websockets")
sys.exit(1)
# Data models
class BlueprintCreateRequest(BaseModel):
name: str
parent_class: str
asset_path: str
components: List[Dict[str, Any]] = []
properties: Dict[str, Any] = {}
class OperationRequest(BaseModel):
operation_type: str
parameters: Dict[str, Any]
priority: int = 1
class DashboardServer:
"""Main dashboard server class"""
def __init__(self, host: str = WEB_DASHBOARD_HOST, port: int = WEB_DASHBOARD_PORT):
self.host = host
self.port = port
self.mcp_server_url = MCP_SERVER_URL
# Initialize FastAPI app
self.app = FastAPI(title="UnrealBlueprintMCP Dashboard", version="1.0.0")
self._setup_middleware()
self._setup_routes()
# Data storage
self.connected_clients: List[WebSocket] = []
self.operation_history: List[Dict[str, Any]] = []
self.performance_metrics: List[Dict[str, Any]] = []
self.current_status = {
"server_status": "unknown",
"connections": 0,
"operations_per_minute": 0,
"last_updated": datetime.now().isoformat()
}
# Setup logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
# Setup templates and static files
self.templates = Jinja2Templates(directory=Path(__file__).parent / "templates")
# Mount static files
static_dir = Path(__file__).parent / "static"
if static_dir.exists():
self.app.mount("/static", StaticFiles(directory=static_dir), name="static")
def _setup_middleware(self):
"""Setup middleware"""
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def _setup_routes(self):
"""Setup API routes"""
@self.app.get("/", response_class=HTMLResponse)
async def dashboard_home(request: Request):
"""Main dashboard page"""
return self.templates.TemplateResponse("index.html", {
"request": request,
"title": "UnrealBlueprintMCP Dashboard",
"status": self.current_status
})
@self.app.get("/blueprints", response_class=HTMLResponse)
async def blueprints_page(request: Request):
"""Blueprint management page"""
return self.templates.TemplateResponse("blueprints.html", {
"request": request,
"title": "Blueprint Manager"
})
@self.app.get("/performance", response_class=HTMLResponse)
async def performance_page(request: Request):
"""Performance monitoring page"""
return self.templates.TemplateResponse("performance.html", {
"request": request,
"title": "Performance Monitor"
})
# API Routes
@self.app.get("/api/status")
async def get_status():
"""Get current system status"""
status = await self._get_mcp_server_status()
return JSONResponse(status)
@self.app.get("/api/blueprints")
async def list_blueprints():
"""List all blueprints"""
blueprints = await self._get_blueprints_list()
return JSONResponse(blueprints)
@self.app.post("/api/blueprints")
async def create_blueprint(request: BlueprintCreateRequest, background_tasks: BackgroundTasks):
"""Create a new blueprint"""
operation_id = f"create_{request.name}_{int(time.time())}"
# Add to background tasks
background_tasks.add_task(
self._execute_blueprint_creation,
operation_id,
request
)
return JSONResponse({
"operation_id": operation_id,
"status": "queued",
"message": f"Blueprint creation queued for {request.name}"
})
@self.app.get("/api/performance")
async def get_performance_metrics():
"""Get performance metrics"""
return JSONResponse({
"current": self.current_status,
"history": self.performance_metrics[-100:], # Last 100 entries
"summary": await self._calculate_performance_summary()
})
@self.app.get("/api/operations")
async def get_operations():
"""Get operation history"""
return JSONResponse({
"operations": self.operation_history[-50:], # Last 50 operations
"total": len(self.operation_history)
})
@self.app.post("/api/operations")
async def queue_operation(request: OperationRequest, background_tasks: BackgroundTasks):
"""Queue a new operation"""
operation_id = f"op_{int(time.time())}"
background_tasks.add_task(
self._execute_operation,
operation_id,
request
)
return JSONResponse({
"operation_id": operation_id,
"status": "queued"
})
# WebSocket routes
@self.app.websocket("/ws/status")
async def websocket_status(websocket: WebSocket):
"""WebSocket for real-time status updates"""
await self._handle_websocket_connection(websocket, "status")
@self.app.websocket("/ws/operations")
async def websocket_operations(websocket: WebSocket):
"""WebSocket for operation updates"""
await self._handle_websocket_connection(websocket, "operations")
@self.app.websocket("/ws/performance")
async def websocket_performance(websocket: WebSocket):
"""WebSocket for performance metrics"""
await self._handle_websocket_connection(websocket, "performance")
async def _handle_websocket_connection(self, websocket: WebSocket, channel: str):
"""Handle WebSocket connections"""
await websocket.accept()
self.connected_clients.append(websocket)
try:
while True:
# Send updates based on channel
if channel == "status":
await websocket.send_json(self.current_status)
elif channel == "operations":
recent_ops = self.operation_history[-5:] if self.operation_history else []
await websocket.send_json({"recent_operations": recent_ops})
elif channel == "performance":
if self.performance_metrics:
await websocket.send_json(self.performance_metrics[-1])
await asyncio.sleep(5) # Update every 5 seconds
except Exception as e:
self.logger.error(f"WebSocket error: {e}")
finally:
if websocket in self.connected_clients:
self.connected_clients.remove(websocket)
async def _get_mcp_server_status(self) -> Dict[str, Any]:
"""Get MCP server status"""
try:
async with websockets.connect(self.mcp_server_url, timeout=5) as ws:
# Send a simple health check
health_check = {
"jsonrpc": "2.0",
"id": "health_check",
"method": "tools/call",
"params": {
"name": "get_server_status",
"arguments": {}
}
}
await ws.send(json.dumps(health_check))
response = await asyncio.wait_for(ws.recv(), timeout=5)
self.current_status.update({
"server_status": "online",
"connections": len(self.connected_clients),
"last_updated": datetime.now().isoformat(),
"response_time": "< 1s"
})
except Exception as e:
self.current_status.update({
"server_status": "offline",
"error": str(e),
"last_updated": datetime.now().isoformat()
})
return self.current_status
async def _get_blueprints_list(self) -> List[Dict[str, Any]]:
"""Get list of blueprints from MCP server"""
try:
async with websockets.connect(self.mcp_server_url, timeout=10) as ws:
request = {
"jsonrpc": "2.0",
"id": "list_blueprints",
"method": "tools/call",
"params": {
"name": "list_blueprints",
"arguments": {}
}
}
await ws.send(json.dumps(request))
response = await ws.recv()
result = json.loads(response)
if "result" in result:
return result["result"].get("blueprints", [])
else:
return []
except Exception as e:
self.logger.error(f"Error getting blueprints list: {e}")
return []
async def _execute_blueprint_creation(self, operation_id: str, request: BlueprintCreateRequest):
"""Execute blueprint creation in background"""
operation = {
"id": operation_id,
"type": "create_blueprint",
"status": "running",
"started_at": datetime.now().isoformat(),
"name": request.name
}
self.operation_history.append(operation)
try:
async with websockets.connect(self.mcp_server_url, timeout=30) as ws:
# Create blueprint
create_request = {
"jsonrpc": "2.0",
"id": operation_id,
"method": "tools/call",
"params": {
"name": "create_blueprint",
"arguments": {
"blueprint_name": request.name,
"parent_class": request.parent_class,
"asset_path": request.asset_path
}
}
}
await ws.send(json.dumps(create_request))
response = await ws.recv()
result = json.loads(response)
if "error" in result:
operation["status"] = "failed"
operation["error"] = result["error"]
else:
operation["status"] = "completed"
operation["result"] = result
operation["completed_at"] = datetime.now().isoformat()
# Notify connected clients
await self._broadcast_update("operation_completed", operation)
except Exception as e:
operation["status"] = "failed"
operation["error"] = str(e)
operation["completed_at"] = datetime.now().isoformat()
async def _execute_operation(self, operation_id: str, request: OperationRequest):
"""Execute generic operation in background"""
operation = {
"id": operation_id,
"type": request.operation_type,
"status": "running",
"started_at": datetime.now().isoformat(),
"parameters": request.parameters
}
self.operation_history.append(operation)
try:
# Execute operation based on type
if request.operation_type == "batch_create":
result = await self._execute_batch_operation(request.parameters)
elif request.operation_type == "performance_scan":
result = await self._execute_performance_scan(request.parameters)
else:
result = {"message": "Operation type not implemented"}
operation["status"] = "completed"
operation["result"] = result
operation["completed_at"] = datetime.now().isoformat()
except Exception as e:
operation["status"] = "failed"
operation["error"] = str(e)
operation["completed_at"] = datetime.now().isoformat()
async def _execute_batch_operation(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute batch operation"""
# Placeholder for batch operation logic
await asyncio.sleep(2) # Simulate work
return {"message": "Batch operation completed", "items_processed": parameters.get("count", 0)}
async def _execute_performance_scan(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute performance scan"""
# Placeholder for performance scan logic
await asyncio.sleep(3) # Simulate work
return {"message": "Performance scan completed", "issues_found": 0}
async def _calculate_performance_summary(self) -> Dict[str, Any]:
"""Calculate performance summary statistics"""
if not self.performance_metrics:
return {"message": "No performance data available"}
recent_metrics = self.performance_metrics[-10:] if len(self.performance_metrics) >= 10 else self.performance_metrics
return {
"avg_response_time": "< 1s",
"success_rate": "99.5%",
"operations_today": len([op for op in self.operation_history
if datetime.fromisoformat(op.get("started_at", "1970-01-01"))
> datetime.now() - timedelta(days=1)]),
"server_uptime": "99.9%"
}
async def _broadcast_update(self, event_type: str, data: Any):
"""Broadcast update to all connected WebSocket clients"""
if not self.connected_clients:
return
message = {
"event": event_type,
"data": data,
"timestamp": datetime.now().isoformat()
}
# Send to all connected clients
disconnected_clients = []
for client in self.connected_clients:
try:
await client.send_json(message)
except:
disconnected_clients.append(client)
# Remove disconnected clients
for client in disconnected_clients:
self.connected_clients.remove(client)
async def start_background_tasks(self):
"""Start background monitoring tasks"""
asyncio.create_task(self._monitor_performance())
asyncio.create_task(self._update_status())
async def _monitor_performance(self):
"""Monitor performance metrics"""
while True:
try:
# Collect performance metrics
metric = {
"timestamp": datetime.now().isoformat(),
"cpu_percent": 15.0, # Placeholder
"memory_mb": 256.0, # Placeholder
"active_connections": len(self.connected_clients),
"operations_count": len(self.operation_history)
}
self.performance_metrics.append(metric)
# Keep only last 1000 metrics
if len(self.performance_metrics) > 1000:
self.performance_metrics = self.performance_metrics[-1000:]
except Exception as e:
self.logger.error(f"Error monitoring performance: {e}")
await asyncio.sleep(10) # Collect every 10 seconds
async def _update_status(self):
"""Update system status regularly"""
while True:
try:
await self._get_mcp_server_status()
# Calculate operations per minute
now = datetime.now()
one_minute_ago = now - timedelta(minutes=1)
recent_ops = [op for op in self.operation_history
if datetime.fromisoformat(op.get("started_at", "1970-01-01")) > one_minute_ago]
self.current_status["operations_per_minute"] = len(recent_ops)
except Exception as e:
self.logger.error(f"Error updating status: {e}")
await asyncio.sleep(30) # Update every 30 seconds
def run(self):
"""Run the dashboard server"""
self.logger.info(f"Starting dashboard server on {self.host}:{self.port}")
# Start background tasks
asyncio.create_task(self.start_background_tasks())
# Run server
uvicorn.run(
self.app,
host=self.host,
port=self.port,
log_level="info"
)
def main():
"""Main function"""
import argparse
parser = argparse.ArgumentParser(description="UnrealBlueprintMCP Web Dashboard")
parser.add_argument("--host", default=WEB_DASHBOARD_HOST, help="Host to bind to")
parser.add_argument("--port", type=int, default=WEB_DASHBOARD_PORT, help="Port to bind to")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
args = parser.parse_args()
dashboard = DashboardServer(host=args.host, port=args.port)
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
try:
dashboard.run()
except KeyboardInterrupt:
print("\nDashboard server stopped")
if __name__ == "__main__":
main()