#!/usr/bin/env python3
"""
AAP Controller System Extensions & Advanced Features Tool
"""
from typing import Any, Dict, Optional, Union
from fastmcp import FastMCP
from pydantic import Field
from connectors.aap_connector import get_aap_connector
def register_system_extensions_tools(mcp: FastMCP):
"""Register system extensions and advanced features tools with the MCP server"""
@mcp.tool()
def system_extensions_management(
action: str = Field(description="Action: mesh_visualizer, receptor_management, constructed_inventories, labels, debug_tools, credential_sources, approval_workflows, bulk_advanced"),
resource_id: Optional[Union[int, float]] = Field(None, description="Resource ID for specific operations"),
resource_type: Optional[str] = Field(None, description="Resource type for operations"),
resource_data: Optional[Dict[str, Any]] = Field(None, description="Resource data for create/update operations"),
filters: Optional[Dict[str, Any]] = Field(None, description="Filters for listing operations"),
debug_component: Optional[str] = Field(None, description="Debug component: dependency_manager, task_manager, workflow_manager"),
approval_data: Optional[Dict[str, Any]] = Field(None, description="Approval workflow data"),
bulk_operation: Optional[str] = Field(None, description="Bulk operation type: host_create, host_delete, job_launch"),
bulk_data: Optional[Dict[str, Any]] = Field(None, description="Bulk operation data")
) -> Dict[str, Any]:
"""
System extensions and advanced features management tool.
Handles mesh visualization, receptor management, constructed inventories, labels, debug tools, and more.
"""
try:
# Mesh Visualization
if action == "mesh_visualizer":
params = filters or {}
return get_aap_connector().get("mesh_visualizer/", params)
# Receptor Management
elif action == "receptor_management":
if resource_id:
return get_aap_connector().get(f"receptor_addresses/{resource_id}/")
else:
params = filters or {}
return get_aap_connector().get("receptor_addresses/", params)
elif action == "create_receptor_address":
if not resource_data:
return {"error": "resource_data is required for creating receptor addresses"}
return get_aap_connector().post("receptor_addresses/", resource_data)
elif action == "update_receptor_address":
if not resource_id or not resource_data:
return {"error": "resource_id and resource_data are required"}
return get_aap_connector().patch(f"receptor_addresses/{resource_id}/", resource_data)
elif action == "delete_receptor_address":
if not resource_id:
return {"error": "resource_id is required for deleting receptor addresses"}
return get_aap_connector().delete(f"receptor_addresses/{resource_id}/")
# Constructed Inventories
elif action == "list_constructed_inventories":
params = filters or {}
return get_aap_connector().get("constructed_inventories/", params)
elif action == "get_constructed_inventory":
if not resource_id:
return {"error": "resource_id is required"}
return get_aap_connector().get(f"constructed_inventories/{resource_id}/")
elif action == "create_constructed_inventory":
if not resource_data:
return {"error": "resource_data is required"}
return get_aap_connector().post("constructed_inventories/", resource_data)
elif action == "update_constructed_inventory":
if not resource_id or not resource_data:
return {"error": "resource_id and resource_data are required"}
return get_aap_connector().patch(f"constructed_inventories/{resource_id}/", resource_data)
elif action == "delete_constructed_inventory":
if not resource_id:
return {"error": "resource_id is required"}
return get_aap_connector().delete(f"constructed_inventories/{resource_id}/")
# Labels Management
elif action == "list_labels":
params = filters or {}
return get_aap_connector().get("labels/", params)
elif action == "create_label":
if not resource_data:
return {"error": "resource_data is required"}
return get_aap_connector().post("labels/", resource_data)
elif action == "get_label":
if not resource_id:
return {"error": "resource_id is required"}
return get_aap_connector().get(f"labels/{resource_id}/")
elif action == "update_label":
if not resource_id or not resource_data:
return {"error": "resource_id and resource_data are required"}
return get_aap_connector().patch(f"labels/{resource_id}/", resource_data)
elif action == "delete_label":
if not resource_id:
return {"error": "resource_id is required"}
return get_aap_connector().delete(f"labels/{resource_id}/")
# Debug Tools
elif action == "debug_tools":
if not debug_component:
return {"error": "debug_component is required (dependency_manager, task_manager, workflow_manager)"}
if debug_component == "dependency_manager":
return get_aap_connector().get("debug/dependency_manager/")
elif debug_component == "task_manager":
return get_aap_connector().get("debug/task_manager/")
elif debug_component == "workflow_manager":
return get_aap_connector().get("debug/workflow_manager/")
else:
return {"error": f"Unknown debug component: {debug_component}"}
# Credential Input Sources
elif action == "list_credential_sources":
params = filters or {}
return get_aap_connector().get("credential_input_sources/", params)
elif action == "create_credential_source":
if not resource_data:
return {"error": "resource_data is required"}
return get_aap_connector().post("credential_input_sources/", resource_data)
elif action == "get_credential_source":
if not resource_id:
return {"error": "resource_id is required"}
return get_aap_connector().get(f"credential_input_sources/{resource_id}/")
elif action == "update_credential_source":
if not resource_id or not resource_data:
return {"error": "resource_id and resource_data are required"}
return get_aap_connector().patch(f"credential_input_sources/{resource_id}/", resource_data)
elif action == "delete_credential_source":
if not resource_id:
return {"error": "resource_id is required"}
return get_aap_connector().delete(f"credential_input_sources/{resource_id}/")
# Approval Workflows
elif action == "list_workflow_approvals":
params = filters or {}
return get_aap_connector().get("workflow_approvals/", params)
elif action == "get_workflow_approval":
if not resource_id:
return {"error": "resource_id is required"}
return get_aap_connector().get(f"workflow_approvals/{resource_id}/")
elif action == "approve_workflow":
if not resource_id:
return {"error": "resource_id is required"}
approval_data_dict = approval_data or {}
return get_aap_connector().post(f"workflow_approvals/{resource_id}/approve/", approval_data_dict)
elif action == "deny_workflow":
if not resource_id:
return {"error": "resource_id is required"}
approval_data_dict = approval_data or {}
return get_aap_connector().post(f"workflow_approvals/{resource_id}/deny/", approval_data_dict)
elif action == "list_approval_templates":
params = filters or {}
return get_aap_connector().get("workflow_approval_templates/", params)
elif action == "get_approval_template":
if not resource_id:
return {"error": "resource_id is required"}
return get_aap_connector().get(f"workflow_approval_templates/{resource_id}/")
# Advanced Bulk Operations
elif action == "bulk_advanced":
if not bulk_operation:
return {"error": "bulk_operation is required (host_create, host_delete, job_launch)"}
if bulk_operation == "host_create":
if not bulk_data:
return {"error": "bulk_data is required for bulk host creation"}
return get_aap_connector().post("bulk/host_create/", bulk_data)
elif bulk_operation == "host_delete":
if not bulk_data:
return {"error": "bulk_data is required for bulk host deletion"}
return get_aap_connector().post("bulk/host_delete/", bulk_data)
elif bulk_operation == "job_launch":
if not bulk_data:
return {"error": "bulk_data is required for bulk job launch"}
return get_aap_connector().post("bulk/job_launch/", bulk_data)
else:
return {"error": f"Unknown bulk operation: {bulk_operation}"}
# Config Management
elif action == "config_attach":
if not resource_data:
return {"error": "resource_data is required"}
return get_aap_connector().post("config/attach/", resource_data)
elif action == "config_subscriptions":
params = filters or {}
return get_aap_connector().get("config/subscriptions/", params)
else:
return {"error": f"Unknown action: {action}"}
except Exception as e:
return {"error": f"System extensions management failed: {str(e)}"}