We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Handler for metrics and analytics operations."""
from datetime import datetime, timedelta
from typing import Any
import json
from ludus_mcp.core.client import LudusAPIClient
from ludus_mcp.utils.logging import get_logger
logger = get_logger(__name__)
class MetricsHandler:
"""Handler for metrics and analytics."""
def __init__(self, client: LudusAPIClient) -> None:
"""Initialize the metrics handler."""
self.client = client
async def get_range_metrics(self, user_id: str | None = None) -> dict[str, Any]:
"""
Get comprehensive range metrics including VM resource utilization.
Args:
user_id: Optional user ID (admin only)
Returns:
Dictionary with range metrics
"""
try:
range_info = await self.client.get_range(user_id)
vms = range_info.get("VMs", [])
# Calculate VM statistics
total_vms = len(vms)
running_vms = sum(1 for vm in vms if vm.get("status") == "running")
stopped_vms = sum(1 for vm in vms if vm.get("status") == "stopped")
# Calculate resource allocations
total_memory_mb = sum(vm.get("memory", 0) for vm in vms)
total_cpus = sum(vm.get("cpus", 0) for vm in vms)
total_disk_gb = sum(vm.get("disk", 0) for vm in vms)
# VM distribution by OS
os_distribution = {}
for vm in vms:
os_name = vm.get("template", "unknown")
os_distribution[os_name] = os_distribution.get(os_name, 0) + 1
metrics = {
"status": "success",
"timestamp": datetime.now().isoformat(),
"range_state": range_info.get("rangeState", "UNKNOWN"),
"vm_statistics": {
"total_vms": total_vms,
"running_vms": running_vms,
"stopped_vms": stopped_vms,
"utilization_percentage": (running_vms / total_vms * 100) if total_vms > 0 else 0
},
"resource_allocation": {
"total_memory_mb": total_memory_mb,
"total_memory_gb": round(total_memory_mb / 1024, 2),
"total_cpus": total_cpus,
"total_disk_gb": total_disk_gb,
"average_memory_per_vm": round(total_memory_mb / total_vms, 2) if total_vms > 0 else 0,
"average_cpus_per_vm": round(total_cpus / total_vms, 2) if total_vms > 0 else 0
},
"os_distribution": os_distribution,
"network_count": len(range_info.get("networks", [])),
"testing_enabled": range_info.get("testingEnabled", False),
"last_deployment": range_info.get("lastDeployment")
}
logger.info(f"Retrieved metrics for range: {total_vms} VMs, {running_vms} running")
return metrics
except Exception as e:
logger.error(f"Error getting range metrics: {e}")
return {
"status": "error",
"error": str(e)
}
async def get_deployment_metrics(self, user_id: str | None = None) -> dict[str, Any]:
"""
Get deployment statistics and patterns.
Args:
user_id: Optional user ID (admin only)
Returns:
Dictionary with deployment metrics
"""
try:
range_info = await self.client.get_range(user_id)
logs = await self.client.get_range_logs(user_id)
# Parse deployment state
range_state = range_info.get("rangeState", "UNKNOWN")
last_deployment = range_info.get("lastDeployment")
# Analyze logs for deployment patterns
log_lines = logs.split("\n") if logs else []
error_count = sum(1 for line in log_lines if "error" in line.lower() or "failed" in line.lower())
warning_count = sum(1 for line in log_lines if "warning" in line.lower())
# Calculate deployment success
deployment_successful = range_state == "SUCCESS"
metrics = {
"status": "success",
"timestamp": datetime.now().isoformat(),
"current_deployment": {
"state": range_state,
"successful": deployment_successful,
"last_deployment_time": last_deployment,
"vm_count": range_info.get("numberOfVMs", 0)
},
"log_analysis": {
"total_lines": len(log_lines),
"error_count": error_count,
"warning_count": warning_count,
"has_errors": error_count > 0
},
"health_score": self._calculate_health_score(
range_state, error_count, warning_count, range_info
)
}
logger.info(f"Retrieved deployment metrics: state={range_state}, health={metrics['health_score']}")
return metrics
except Exception as e:
logger.error(f"Error getting deployment metrics: {e}")
return {
"status": "error",
"error": str(e)
}
def _calculate_health_score(
self,
range_state: str,
error_count: int,
warning_count: int,
range_info: dict
) -> dict[str, Any]:
"""Calculate a health score for the range."""
score = 100.0
issues = []
# Deduct points for state
if range_state == "FAILED":
score -= 50
issues.append("Deployment failed")
elif range_state == "DEPLOYING":
score -= 10
issues.append("Deployment in progress")
elif range_state == "DELETED":
score = 0
issues.append("Range deleted")
# Deduct points for errors and warnings
score -= min(error_count * 5, 30)
score -= min(warning_count * 2, 15)
if error_count > 0:
issues.append(f"{error_count} errors in logs")
if warning_count > 0:
issues.append(f"{warning_count} warnings in logs")
# Check VM health
vms = range_info.get("VMs", [])
if vms:
running = sum(1 for vm in vms if vm.get("status") == "running")
if running < len(vms):
ratio = running / len(vms)
score -= (1 - ratio) * 20
issues.append(f"Only {running}/{len(vms)} VMs running")
score = max(0, min(100, score))
return {
"score": round(score, 2),
"grade": self._score_to_grade(score),
"issues": issues
}
def _score_to_grade(self, score: float) -> str:
"""Convert health score to letter grade."""
if score >= 90:
return "A"
elif score >= 80:
return "B"
elif score >= 70:
return "C"
elif score >= 60:
return "D"
else:
return "F"
async def get_cost_estimation(self, user_id: str | None = None) -> dict[str, Any]:
"""
Estimate resource costs based on VM configurations.
Args:
user_id: Optional user ID (admin only)
Returns:
Dictionary with cost estimations
"""
try:
range_info = await self.client.get_range(user_id)
vms = range_info.get("VMs", [])
# Cost calculation constants (example rates - adjust as needed)
COST_PER_GB_RAM_HOUR = 0.01 # $0.01 per GB RAM per hour
COST_PER_CPU_HOUR = 0.05 # $0.05 per CPU per hour
COST_PER_GB_DISK_MONTH = 0.10 # $0.10 per GB disk per month
# Calculate costs
total_memory_gb = sum(vm.get("memory", 0) for vm in vms) / 1024
total_cpus = sum(vm.get("cpus", 0) for vm in vms)
total_disk_gb = sum(vm.get("disk", 0) for vm in vms)
# Hourly costs
hourly_ram_cost = total_memory_gb * COST_PER_GB_RAM_HOUR
hourly_cpu_cost = total_cpus * COST_PER_CPU_HOUR
hourly_total = hourly_ram_cost + hourly_cpu_cost
# Monthly costs
monthly_ram_cost = hourly_ram_cost * 24 * 30
monthly_cpu_cost = hourly_cpu_cost * 24 * 30
monthly_disk_cost = total_disk_gb * COST_PER_GB_DISK_MONTH
monthly_total = monthly_ram_cost + monthly_cpu_cost + monthly_disk_cost
estimation = {
"status": "success",
"timestamp": datetime.now().isoformat(),
"disclaimer": "These are estimated costs. Actual costs may vary based on your infrastructure provider.",
"resource_summary": {
"total_memory_gb": round(total_memory_gb, 2),
"total_cpus": total_cpus,
"total_disk_gb": total_disk_gb,
"vm_count": len(vms)
},
"hourly_costs": {
"ram": round(hourly_ram_cost, 4),
"cpu": round(hourly_cpu_cost, 4),
"total": round(hourly_total, 4)
},
"daily_costs": {
"total": round(hourly_total * 24, 2)
},
"monthly_costs": {
"ram": round(monthly_ram_cost, 2),
"cpu": round(monthly_cpu_cost, 2),
"disk": round(monthly_disk_cost, 2),
"total": round(monthly_total, 2)
},
"cost_breakdown_per_vm": self._calculate_per_vm_costs(vms)
}
logger.info(f"Cost estimation: ${monthly_total:.2f}/month for {len(vms)} VMs")
return estimation
except Exception as e:
logger.error(f"Error calculating cost estimation: {e}")
return {
"status": "error",
"error": str(e)
}
def _calculate_per_vm_costs(self, vms: list[dict]) -> list[dict]:
"""Calculate cost breakdown per VM."""
COST_PER_GB_RAM_HOUR = 0.01
COST_PER_CPU_HOUR = 0.05
vm_costs = []
for vm in vms:
memory_gb = vm.get("memory", 0) / 1024
cpus = vm.get("cpus", 0)
hourly_cost = (memory_gb * COST_PER_GB_RAM_HOUR) + (cpus * COST_PER_CPU_HOUR)
monthly_cost = hourly_cost * 24 * 30
vm_costs.append({
"name": vm.get("name", "unknown"),
"template": vm.get("template", "unknown"),
"hourly_cost": round(hourly_cost, 4),
"monthly_cost": round(monthly_cost, 2)
})
return vm_costs
async def export_metrics(
self,
format: str = "json",
user_id: str | None = None
) -> dict[str, Any]:
"""
Export metrics in various formats (json, prometheus, csv).
Args:
format: Export format (json, prometheus, csv)
user_id: Optional user ID (admin only)
Returns:
Dictionary with exported metrics
"""
try:
# Gather all metrics
range_metrics = await self.get_range_metrics(user_id)
deployment_metrics = await self.get_deployment_metrics(user_id)
cost_metrics = await self.get_cost_estimation(user_id)
combined_metrics = {
"export_timestamp": datetime.now().isoformat(),
"range_metrics": range_metrics,
"deployment_metrics": deployment_metrics,
"cost_metrics": cost_metrics
}
if format.lower() == "json":
return {
"status": "success",
"format": "json",
"data": json.dumps(combined_metrics, indent=2)
}
elif format.lower() == "prometheus":
prometheus_data = self._convert_to_prometheus(combined_metrics)
return {
"status": "success",
"format": "prometheus",
"data": prometheus_data
}
elif format.lower() == "csv":
csv_data = self._convert_to_csv(combined_metrics)
return {
"status": "success",
"format": "csv",
"data": csv_data
}
else:
return {
"status": "error",
"error": f"Unsupported format: {format}. Supported formats: json, prometheus, csv"
}
except Exception as e:
logger.error(f"Error exporting metrics: {e}")
return {
"status": "error",
"error": str(e)
}
def _convert_to_prometheus(self, metrics: dict) -> str:
"""Convert metrics to Prometheus format."""
lines = []
lines.append("# HELP ludus_range_vms_total Total number of VMs in range")
lines.append("# TYPE ludus_range_vms_total gauge")
if metrics.get("range_metrics", {}).get("status") == "success":
rm = metrics["range_metrics"]
vm_stats = rm.get("vm_statistics", {})
lines.append(f"ludus_range_vms_total {vm_stats.get('total_vms', 0)}")
lines.append(f"ludus_range_vms_running {vm_stats.get('running_vms', 0)}")
lines.append(f"ludus_range_vms_stopped {vm_stats.get('stopped_vms', 0)}")
res_alloc = rm.get("resource_allocation", {})
lines.append(f"ludus_range_memory_gb_total {res_alloc.get('total_memory_gb', 0)}")
lines.append(f"ludus_range_cpus_total {res_alloc.get('total_cpus', 0)}")
lines.append(f"ludus_range_disk_gb_total {res_alloc.get('total_disk_gb', 0)}")
if metrics.get("deployment_metrics", {}).get("status") == "success":
dm = metrics["deployment_metrics"]
health = dm.get("health_score", {})
lines.append(f"ludus_range_health_score {health.get('score', 0)}")
return "\n".join(lines)
def _convert_to_csv(self, metrics: dict) -> str:
"""Convert metrics to CSV format."""
lines = []
lines.append("metric_name,value,unit,timestamp")
timestamp = metrics.get("export_timestamp", "")
if metrics.get("range_metrics", {}).get("status") == "success":
rm = metrics["range_metrics"]
vm_stats = rm.get("vm_statistics", {})
lines.append(f"total_vms,{vm_stats.get('total_vms', 0)},count,{timestamp}")
lines.append(f"running_vms,{vm_stats.get('running_vms', 0)},count,{timestamp}")
lines.append(f"stopped_vms,{vm_stats.get('stopped_vms', 0)},count,{timestamp}")
res_alloc = rm.get("resource_allocation", {})
lines.append(f"total_memory,{res_alloc.get('total_memory_gb', 0)},GB,{timestamp}")
lines.append(f"total_cpus,{res_alloc.get('total_cpus', 0)},count,{timestamp}")
lines.append(f"total_disk,{res_alloc.get('total_disk_gb', 0)},GB,{timestamp}")
return "\n".join(lines)