Skip to main content
Glama
controller_monitoring.py5.17 kB
#!/usr/bin/env python3 """ AAP Controller Monitoring & Analytics Tool """ from typing import Any, Dict, Optional, Union from fastmcp import FastMCP from pydantic import Field from connectors.aap_connector import get_aap_connector def register_monitoring_tools(mcp: FastMCP): """Register monitoring tools with the MCP server""" @mcp.tool() def monitoring_analytics( action: str = Field(description="Action: dashboard, job_stats, list_activity_stream, list_job_events, get_metrics, ping, config, get_license, list_instance_groups, list_instances"), resource_id: Optional[Union[int, float]] = Field(None, description="Resource ID for specific queries"), time_period: Optional[str] = Field(None, description="Time period for analytics (day, week, month)"), filters: Optional[Dict[str, Any]] = Field(None, description="Filters for listing and analytics") ) -> Dict[str, Any]: """ Monitoring and analytics tool. Handles dashboards, metrics, activity streams, and system information. """ try: # Dashboard Operations if action == "dashboard": dashboard_data = {} try: dashboard_data["jobs"] = get_aap_connector().get("jobs/", {"page_size": 10}) dashboard_data["inventories"] = get_aap_connector().get("inventories/", {"page_size": 10}) dashboard_data["projects"] = get_aap_connector().get("projects/", {"page_size": 10}) dashboard_data["templates"] = get_aap_connector().get("job_templates/", {"page_size": 10}) except Exception as e: dashboard_data["error"] = str(e) return dashboard_data elif action == "job_stats": params = filters or {} jobs = get_aap_connector().get("jobs/", params) # Calculate basic stats total = jobs.get("count", 0) successful = 0 failed = 0 running = 0 for job in jobs.get("results", []): status = job.get("status", "") if status == "successful": successful += 1 elif status in ["failed", "error", "canceled"]: failed += 1 elif status in ["running", "pending", "waiting"]: running += 1 return { "total_jobs": total, "successful": successful, "failed": failed, "running": running, "success_rate": (successful / total * 100) if total > 0 else 0 } # Activity and Events elif action == "list_activity_stream": params = filters or {} return get_aap_connector().get("activity_stream/", params) elif action == "list_job_events": if resource_id: params = filters or {} return get_aap_connector().get(f"jobs/{resource_id}/job_events/", params) else: params = filters or {} return get_aap_connector().get("job_events/", params) # System Information elif action == "get_metrics": try: config = get_aap_connector().get("config/") return { "version": config.get("version"), "license_info": config.get("license_info", {}), "analytics_status": config.get("analytics_status"), "install_uuid": config.get("install_uuid") } except Exception as e: return {"error": f"Could not retrieve metrics: {str(e)}"} elif action == "ping": return get_aap_connector().get("ping/") elif action == "config": return get_aap_connector().get("config/") elif action == "get_license": config = get_aap_connector().get("config/") return { "license_info": config.get("license_info", {}), "license_type": config.get("license_type"), "subscription_name": config.get("subscription_name") } # Instance Management elif action == "list_instance_groups": params = filters or {} return get_aap_connector().get("instance_groups/", params) elif action == "list_instances": params = filters or {} return get_aap_connector().get("instances/", params) else: return {"error": f"Unknown action: {action}"} except Exception as e: return {"error": f"Monitoring/Analytics failed: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/anshulbehl/aap-mcp-pilot'

If you have feedback or need assistance with the MCP directory API, please join our Discord server