We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/tjnull/Ludus-FastMCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Handler for automation and orchestration operations."""
from datetime import datetime, timedelta
from typing import Any
import hashlib
from ludus_mcp.core.client import LudusAPIClient
from ludus_mcp.utils.logging import get_logger
logger = get_logger(__name__)
class AutomationOrchestrationHandler:
"""Handler for automation and orchestration."""
def __init__(self, client: LudusAPIClient) -> None:
"""Initialize the automation orchestration handler."""
self.client = client
async def create_deployment_pipeline(
self,
pipeline_name: str,
stages: list[dict],
user_id: str | None = None
) -> dict[str, Any]:
"""Define multi-stage deployment pipelines."""
try:
pipeline_id = hashlib.md5(f"{pipeline_name}{datetime.now()}".encode()).hexdigest()[:8]
# Validate stages
for i, stage in enumerate(stages):
if "name" not in stage:
return {"status": "error", "error": f"Stage {i} missing 'name' field"}
if "action" not in stage:
return {"status": "error", "error": f"Stage {i} missing 'action' field"}
pipeline = {
"pipeline_id": pipeline_id,
"name": pipeline_name,
"created_at": datetime.now().isoformat(),
"stages": stages,
"total_stages": len(stages)
}
return {
"status": "success",
"timestamp": datetime.now().isoformat(),
"pipeline": pipeline,
"execution_guide": {
"manual": "Execute each stage using corresponding ludus tools",
"automated": "Use CI/CD platform (GitHub Actions, GitLab CI) to orchestrate",
"example": "Stage 1: ludus.update_range_config -> Stage 2: ludus.deploy_range -> Stage 3: ludus.start_testing"
}
}
except Exception as e:
logger.error(f"Error creating deployment pipeline: {e}")
return {"status": "error", "error": str(e)}
async def schedule_range_tasks(
self,
task_type: str,
schedule: str,
parameters: dict | None = None,
user_id: str | None = None
) -> dict[str, Any]:
"""Schedule recurring tasks (start/stop, snapshots)."""
try:
valid_tasks = ["power_on", "power_off", "snapshot", "health_check"]
if task_type not in valid_tasks:
return {
"status": "error",
"error": f"Invalid task_type. Must be one of: {valid_tasks}"
}
# Validate cron schedule
schedule_parts = schedule.split()
if len(schedule_parts) != 5:
return {
"status": "error",
"error": "Invalid cron schedule format. Expected: 'minute hour day month weekday'"
}
task_id = hashlib.md5(f"{task_type}{schedule}{datetime.now()}".encode()).hexdigest()[:8]
task_config = {
"task_id": task_id,
"task_type": task_type,
"schedule": schedule,
"parameters": parameters or {},
"created_at": datetime.now().isoformat(),
"enabled": True,
"next_run": self._estimate_next_run(schedule)
}
return {
"status": "success",
"timestamp": datetime.now().isoformat(),
"task_configuration": task_config,
"implementation": {
"crontab": f"{schedule} ludus-fastmcp-task {task_type}",
"systemd_timer": "Create systemd timer unit",
"k8s_cronjob": "Deploy as Kubernetes CronJob"
}
}
except Exception as e:
logger.error(f"Error scheduling range task: {e}")
return {"status": "error", "error": str(e)}
async def auto_scaling(
self,
rules: list[dict],
user_id: str | None = None
) -> dict[str, Any]:
"""Define auto-scaling rules based on resource usage."""
try:
# Validate rules
for i, rule in enumerate(rules):
required_fields = ["metric", "threshold", "action"]
for field in required_fields:
if field not in rule:
return {
"status": "error",
"error": f"Rule {i} missing required field: {field}"
}
scaling_config = {
"enabled": True,
"rules": rules,
"created_at": datetime.now().isoformat(),
"monitoring_interval_seconds": 60
}
return {
"status": "success",
"timestamp": datetime.now().isoformat(),
"scaling_configuration": scaling_config,
"note": "Auto-scaling requires external monitoring and orchestration system",
"implementation_guide": {
"metrics_collection": "Use ludus.get_range_metrics periodically",
"decision_engine": "Compare metrics against thresholds",
"actions": "Execute ludus.update_range_config to adjust resources"
},
"example_rule": {
"metric": "cpu_utilization",
"threshold": 80,
"comparison": "greater_than",
"action": "increase_cpus",
"action_parameters": {"increment": 1}
}
}
except Exception as e:
logger.error(f"Error configuring auto-scaling: {e}")
return {"status": "error", "error": str(e)}
async def health_checks(
self,
check_interval_minutes: int = 5,
alert_on_failure: bool = True,
user_id: str | None = None
) -> dict[str, Any]:
"""Automated health checks with alerting."""
try:
range_info = await self.client.get_range(user_id)
health_check_config = {
"enabled": True,
"interval_minutes": check_interval_minutes,
"alert_on_failure": alert_on_failure,
"checks": [
{
"name": "range_state",
"description": "Check if range is in SUCCESS state",
"endpoint": "ludus.get_range",
"expected": "rangeState == SUCCESS"
},
{
"name": "vm_health",
"description": "Check if all VMs are running",
"endpoint": "ludus.get_range",
"expected": "all VMs have status == running"
},
{
"name": "deployment_logs",
"description": "Check for errors in logs",
"endpoint": "ludus.get_range_logs",
"expected": "no ERROR or FAILED messages"
}
],
"alert_channels": {
"email": "Configure SMTP settings",
"slack": "Use ludus.slack_notifications",
"webhook": "Use ludus.webhook_integration"
}
}
# Perform initial health check
vms = range_info.get("VMs", [])
running_vms = sum(1 for vm in vms if vm.get("status") == "running")
health_status = "healthy" if running_vms == len(vms) else "degraded"
return {
"status": "success",
"timestamp": datetime.now().isoformat(),
"health_check_configuration": health_check_config,
"current_health": {
"status": health_status,
"range_state": range_info.get("rangeState"),
"vms_running": f"{running_vms}/{len(vms)}"
},
"implementation": {
"polling": "Use cron or systemd timer to run checks",
"monitoring": "Integrate with Prometheus/Grafana for visualization"
}
}
except Exception as e:
logger.error(f"Error configuring health checks: {e}")
return {"status": "error", "error": str(e)}
def _estimate_next_run(self, schedule: str) -> str:
"""Estimate next run time (simplified)."""
# In production, use croniter library
return f"Estimated based on schedule: {schedule}"