#!/usr/bin/env python3
"""
AAP Controller Bulk Operations & Utilities Tool
"""
from typing import Any, Dict, Optional, List, Union
from fastmcp import FastMCP
from pydantic import Field
from connectors.aap_connector import get_aap_connector
def register_bulk_tools(mcp: FastMCP):
"""Register bulk operations tools with the MCP server"""
@mcp.tool()
def bulk_operations_utilities(
action: str = Field(description="Action: bulk_job_launch, bulk_host_update, copy_resource, export_data, import_data, cleanup_jobs, get_api_versions, list_labels, create_label, health_check"),
resource_type: Optional[str] = Field(None, description="Resource type for bulk operations"),
resource_ids: Optional[List[int]] = Field(None, description="List of resource IDs for bulk operations"),
bulk_data: Optional[Dict[str, Any]] = Field(None, description="Data for bulk operations"),
copy_data: Optional[Dict[str, Any]] = Field(None, description="Copy operation data"),
export_params: Optional[Dict[str, Any]] = Field(None, description="Export parameters"),
import_data: Optional[Dict[str, Any]] = Field(None, description="Import data"),
label_data: Optional[Dict[str, Any]] = Field(None, description="Label data"),
filters: Optional[Dict[str, Any]] = Field(None, description="Filters for operations")
) -> Dict[str, Any]:
"""
Bulk operations and utilities tool.
Handles bulk actions, resource copying, data import/export, and system utilities.
"""
try:
# Bulk Operations
if action == "bulk_job_launch":
if not resource_ids or not bulk_data:
return {"error": "resource_ids and bulk_data are required"}
results = []
for template_id in resource_ids:
try:
result = get_aap_connector().post(f"job_templates/{template_id}/launch/", bulk_data)
results.append({"template_id": template_id, "result": result})
except Exception as e:
results.append({"template_id": template_id, "error": str(e)})
return {"bulk_results": results}
elif action == "bulk_host_update":
if not resource_ids or not bulk_data:
return {"error": "resource_ids and bulk_data are required"}
results = []
for host_id in resource_ids:
try:
result = get_aap_connector().patch(f"hosts/{host_id}/", bulk_data)
results.append({"host_id": host_id, "result": result})
except Exception as e:
results.append({"host_id": host_id, "error": str(e)})
return {"bulk_results": results}
# Copy Operations
elif action == "copy_resource":
if not resource_type or not copy_data:
return {"error": "resource_type and copy_data are required"}
resource_id = copy_data.get("id")
if not resource_id:
return {"error": "Resource ID required in copy_data"}
endpoint_map = {
"job_template": f"job_templates/{resource_id}/copy/",
"workflow_template": f"workflow_job_templates/{resource_id}/copy/",
"inventory": f"inventories/{resource_id}/copy/",
"project": f"projects/{resource_id}/copy/"
}
endpoint = endpoint_map.get(resource_type)
if not endpoint:
return {"error": f"Unsupported resource type: {resource_type}"}
return get_aap_connector().post(endpoint, copy_data)
# Data Operations
elif action == "export_data":
# This would be a custom export function
params = export_params or {}
resource = params.get("resource", "jobs")
return get_aap_connector().get(f"{resource}/", params)
elif action == "import_data":
if not import_data:
return {"error": "import_data is required"}
# This would handle importing configurations
return {"message": "Import functionality would be implemented based on specific requirements"}
# Cleanup Operations
elif action == "cleanup_jobs":
params = filters or {}
params.setdefault("status__in", "successful,failed,error,canceled")
params.setdefault("finished__lt", "30 days ago") # Configurable
jobs = get_aap_connector().get("jobs/", params)
return {"cleanup_candidates": jobs.get("count", 0), "jobs": jobs}
# System Utilities
elif action == "get_api_versions":
return get_aap_connector().get("") # Root endpoint shows API versions
elif action == "health_check":
try:
ping = get_aap_connector().get("ping/")
config = get_aap_connector().get("config/")
return {
"ping": ping,
"version": config.get("version"),
"status": "healthy" if ping else "unhealthy"
}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
# Label Operations
elif action == "list_labels":
params = filters or {}
return get_aap_connector().get("labels/", params)
elif action == "create_label":
if not label_data:
return {"error": "label_data is required"}
return get_aap_connector().post("labels/", label_data)
else:
return {"error": f"Unknown action: {action}"}
except Exception as e:
return {"error": f"Bulk operations/Utilities failed: {str(e)}"}