Skip to main content
Glama
enterprise_monitoring_tools.py33.8 kB
"""Enterprise-grade Cloud Monitoring tools for MCP server.""" import json import asyncio from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Union from collections import defaultdict import structlog from google.cloud import monitoring_v3 from google.cloud import error_reporting from mcp.types import Tool, TextContent from pydantic import BaseModel, Field from ..auth import GCPAuthenticator from ..config import Config from ..exceptions import GCPServiceError, ValidationError logger = structlog.get_logger(__name__) class EnterpriseMonitoringTools: """Enterprise-grade Cloud Monitoring tools for comprehensive observability.""" def __init__(self, authenticator: GCPAuthenticator, config: Config): """Initialize enterprise monitoring tools.""" self.authenticator = authenticator self.config = config async def initialize(self) -> None: """Initialize the monitoring tools.""" if not self.authenticator.monitoring_client: raise GCPServiceError("Monitoring client not initialized") logger.info("Enterprise monitoring tools initialized") async def get_tools(self) -> List[Tool]: """Get available enterprise monitoring tools.""" return [ Tool( name="advanced_metrics_query", description="Query comprehensive metrics across multiple GCP services with advanced aggregation, filtering, and time-series analysis. Perfect for performance monitoring and capacity planning.", inputSchema={ "type": "object", "properties": { "projects": { "type": "array", "items": {"type": "string"}, "description": "GCP project IDs to query metrics from" }, "metric_queries": { "type": "array", "items": { "type": "object", "properties": { "metric_type": {"type": "string", "description": "Metric type (e.g., 'compute.googleapis.com/instance/cpu/utilization')"}, "resource_type": {"type": "string", "description": "Resource type filter"}, "filters": { "type": "object", "description": "Additional filters for the metric query" }, "aggregation": { "type": "object", "properties": { "alignment_period": {"type": "string", "default": "60s"}, "per_series_aligner": {"type": "string", "default": "ALIGN_MEAN"}, "cross_series_reducer": {"type": "string", "default": "REDUCE_MEAN"}, "group_by_fields": {"type": "array", "items": {"type": "string"}} } } }, "required": ["metric_type"] }, "description": "List of metric queries to execute" }, "time_range": { "type": "object", "properties": { "start": {"type": "string", "description": "Start time (ISO format or relative like '24h')"}, "end": {"type": "string", "description": "End time (defaults to now)"} }, "required": ["start"] }, "analysis_options": { "type": "object", "properties": { "detect_anomalies": {"type": "boolean", "default": False}, "calculate_trends": {"type": "boolean", "default": False}, "generate_forecasts": {"type": "boolean", "default": False}, "correlation_analysis": {"type": "boolean", "default": False} } } }, "required": ["metric_queries", "time_range"] } ), Tool( name="sla_slo_analysis", description="Analyze Service Level Objectives (SLOs) and Service Level Agreements (SLAs) across services. Monitor error budgets, availability, and performance against defined targets.", inputSchema={ "type": "object", "properties": { "projects": { "type": "array", "items": {"type": "string"}, "description": "GCP project IDs to analyze" }, "slo_definitions": { "type": "array", "items": { "type": "object", "properties": { "name": {"type": "string", "description": "SLO name"}, "service": {"type": "string", "description": "Service name"}, "sli_type": {"type": "string", "enum": ["availability", "latency", "error_rate", "throughput"]}, "target": {"type": "number", "description": "SLO target (e.g., 0.99 for 99% availability)"}, "time_window": {"type": "string", "description": "SLO evaluation window (e.g., '30d', '7d')"}, "metric_config": { "type": "object", "properties": { "good_events_filter": {"type": "string"}, "total_events_filter": {"type": "string"}, "threshold": {"type": "number"} } } }, "required": ["name", "service", "sli_type", "target"] } }, "analysis_period": { "type": "string", "description": "Period to analyze SLO performance", "default": "7d" }, "include_error_budget": { "type": "boolean", "description": "Calculate error budget consumption", "default": True } }, "required": ["slo_definitions"] } ), Tool( name="alert_policy_analysis", description="Analyze alerting policies, their effectiveness, and alert fatigue patterns. Optimize alert thresholds and reduce false positives.", inputSchema={ "type": "object", "properties": { "projects": { "type": "array", "items": {"type": "string"}, "description": "GCP project IDs to analyze alerting for" }, "analysis_scope": { "type": "object", "properties": { "policy_names": { "type": "array", "items": {"type": "string"}, "description": "Specific alert policies to analyze" }, "severity_levels": { "type": "array", "items": {"type": "string"}, "description": "Alert severity levels to include" }, "notification_channels": { "type": "array", "items": {"type": "string"}, "description": "Notification channels to analyze" } } }, "time_range": { "type": "string", "description": "Time range for alert analysis", "default": "30d" }, "analysis_type": { "type": "string", "enum": ["effectiveness", "fatigue", "coverage", "optimization"], "default": "effectiveness", "description": "Type of alert analysis to perform" } }, "required": [] } ), Tool( name="resource_optimization_analysis", description="Analyze resource utilization and provide optimization recommendations for cost reduction and performance improvement.", inputSchema={ "type": "object", "properties": { "projects": { "type": "array", "items": {"type": "string"}, "description": "GCP project IDs to analyze" }, "resource_scope": { "type": "object", "properties": { "resource_types": { "type": "array", "items": {"type": "string"}, "description": "Resource types to analyze (gce_instance, k8s_container, cloud_function)" }, "services": { "type": "array", "items": {"type": "string"}, "description": "Specific services to focus on" }, "environments": { "type": "array", "items": {"type": "string"}, "description": "Environments to include (prod, staging, dev)" } } }, "optimization_targets": { "type": "array", "items": {"type": "string"}, "description": "Optimization targets (cost, performance, reliability, efficiency)", "default": ["cost", "performance"] }, "analysis_period": { "type": "string", "description": "Period for resource analysis", "default": "30d" }, "utilization_thresholds": { "type": "object", "properties": { "cpu_underutilized": {"type": "number", "default": 0.2}, "cpu_overutilized": {"type": "number", "default": 0.8}, "memory_underutilized": {"type": "number", "default": 0.3}, "memory_overutilized": {"type": "number", "default": 0.85} } } }, "required": [] } ), Tool( name="custom_dashboard_metrics", description="Generate custom dashboard metrics and visualizations for executive reporting and operational insights.", inputSchema={ "type": "object", "properties": { "projects": { "type": "array", "items": {"type": "string"}, "description": "GCP project IDs to include in dashboard" }, "dashboard_config": { "type": "object", "properties": { "dashboard_type": { "type": "string", "enum": ["executive", "operational", "security", "performance", "cost"], "description": "Type of dashboard to generate" }, "time_range": {"type": "string", "default": "7d"}, "refresh_interval": {"type": "string", "default": "5m"}, "include_predictions": {"type": "boolean", "default": False} } }, "kpi_definitions": { "type": "array", "items": { "type": "object", "properties": { "name": {"type": "string"}, "description": {"type": "string"}, "metric_query": {"type": "string"}, "target_value": {"type": "number"}, "threshold_warning": {"type": "number"}, "threshold_critical": {"type": "number"} }, "required": ["name", "metric_query"] } }, "output_format": { "type": "string", "enum": ["json", "csv", "grafana", "monitoring_dashboard"], "default": "json", "description": "Output format for dashboard data" } }, "required": ["dashboard_config"] } ), Tool( name="infrastructure_health_check", description="Comprehensive health check of infrastructure across all GCP services with automated problem detection and remediation suggestions.", inputSchema={ "type": "object", "properties": { "projects": { "type": "array", "items": {"type": "string"}, "description": "GCP project IDs to health check" }, "health_check_scope": { "type": "object", "properties": { "services": { "type": "array", "items": {"type": "string"}, "description": "GCP services to check (compute, storage, networking, database)" }, "environments": { "type": "array", "items": {"type": "string"}, "description": "Environments to check" }, "criticality_levels": { "type": "array", "items": {"type": "string"}, "description": "Criticality levels to focus on (critical, high, medium, low)" } } }, "check_categories": { "type": "array", "items": {"type": "string"}, "description": "Categories of health checks (availability, performance, security, cost, compliance)", "default": ["availability", "performance", "security"] }, "severity_threshold": { "type": "string", "enum": ["info", "warning", "error", "critical"], "default": "warning", "description": "Minimum severity level to report" }, "include_recommendations": { "type": "boolean", "description": "Include automated remediation recommendations", "default": True } }, "required": [] } ) ] async def handle_tool_call(self, name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Handle tool calls for enterprise monitoring operations.""" try: if name == "advanced_metrics_query": return await self._advanced_metrics_query(arguments) elif name == "sla_slo_analysis": return await self._sla_slo_analysis(arguments) elif name == "alert_policy_analysis": return await self._alert_policy_analysis(arguments) elif name == "resource_optimization_analysis": return await self._resource_optimization_analysis(arguments) elif name == "custom_dashboard_metrics": return await self._custom_dashboard_metrics(arguments) elif name == "infrastructure_health_check": return await self._infrastructure_health_check(arguments) else: raise ValidationError(f"Unknown enterprise monitoring tool: {name}") except Exception as e: logger.error("Enterprise monitoring tool failed", tool=name, error=str(e)) return [TextContent(type="text", text=f"Error executing {name}: {str(e)}")] async def _advanced_metrics_query(self, args: Dict[str, Any]) -> List[TextContent]: """Execute advanced metrics queries with analysis.""" projects = args.get("projects", [self.authenticator.get_project_id()]) metric_queries = args["metric_queries"] time_range = args["time_range"] analysis_options = args.get("analysis_options", {}) # Parse time range start_time = self._parse_time(time_range["start"]) end_time = self._parse_time(time_range.get("end")) if time_range.get("end") else datetime.utcnow() # Execute metric queries results = [] for project in projects: for query in metric_queries: metric_result = await self._execute_metric_query(project, query, start_time, end_time) results.append(metric_result) # Perform analysis if requested analysis_results = {} if analysis_options.get("detect_anomalies"): analysis_results["anomalies"] = await self._detect_metric_anomalies(results) if analysis_options.get("calculate_trends"): analysis_results["trends"] = await self._calculate_metric_trends(results) if analysis_options.get("generate_forecasts"): analysis_results["forecasts"] = await self._generate_metric_forecasts(results) if analysis_options.get("correlation_analysis"): analysis_results["correlations"] = await self._analyze_metric_correlations(results) response = { "metrics_summary": { "total_queries": len(metric_queries), "projects_queried": projects, "time_range": f"{start_time.isoformat()} to {end_time.isoformat()}", "analysis_enabled": list(analysis_options.keys()) }, "metric_results": results, "analysis": analysis_results } return [TextContent(type="text", text=json.dumps(response, indent=2, default=str))] async def _sla_slo_analysis(self, args: Dict[str, Any]) -> List[TextContent]: """Analyze SLA/SLO performance and error budgets.""" projects = args.get("projects", [self.authenticator.get_project_id()]) slo_definitions = args["slo_definitions"] analysis_period = args.get("analysis_period", "7d") include_error_budget = args.get("include_error_budget", True) # Analyze each SLO slo_results = [] for slo in slo_definitions: slo_analysis = await self._analyze_single_slo(projects, slo, analysis_period, include_error_budget) slo_results.append(slo_analysis) # Generate overall SLA summary overall_summary = await self._generate_sla_summary(slo_results) response = { "sla_slo_summary": { "analysis_period": analysis_period, "total_slos": len(slo_definitions), "projects_analyzed": projects }, "slo_results": slo_results, "overall_summary": overall_summary, "recommendations": await self._generate_slo_recommendations(slo_results) } return [TextContent(type="text", text=json.dumps(response, indent=2, default=str))] async def _alert_policy_analysis(self, args: Dict[str, Any]) -> List[TextContent]: """Analyze alert policies and their effectiveness.""" projects = args.get("projects", [self.authenticator.get_project_id()]) analysis_scope = args.get("analysis_scope", {}) time_range = args.get("time_range", "30d") analysis_type = args.get("analysis_type", "effectiveness") # Get alert policies and incidents alert_data = await self._collect_alert_data(projects, analysis_scope, time_range) # Perform analysis based on type if analysis_type == "effectiveness": analysis_results = await self._analyze_alert_effectiveness(alert_data) elif analysis_type == "fatigue": analysis_results = await self._analyze_alert_fatigue(alert_data) elif analysis_type == "coverage": analysis_results = await self._analyze_alert_coverage(alert_data) else: # optimization analysis_results = await self._analyze_alert_optimization(alert_data) response = { "alert_analysis": { "analysis_type": analysis_type, "time_range": time_range, "projects_analyzed": projects, "total_policies": len(alert_data.get("policies", [])), "total_incidents": len(alert_data.get("incidents", [])) }, "findings": analysis_results, "optimization_recommendations": await self._generate_alert_recommendations(analysis_results) } return [TextContent(type="text", text=json.dumps(response, indent=2, default=str))] async def _resource_optimization_analysis(self, args: Dict[str, Any]) -> List[TextContent]: """Analyze resource utilization and optimization opportunities.""" projects = args.get("projects", [self.authenticator.get_project_id()]) resource_scope = args.get("resource_scope", {}) optimization_targets = args.get("optimization_targets", ["cost", "performance"]) analysis_period = args.get("analysis_period", "30d") utilization_thresholds = args.get("utilization_thresholds", {}) # Collect resource utilization data resource_data = await self._collect_resource_utilization_data( projects, resource_scope, analysis_period ) # Perform optimization analysis optimization_results = await self._analyze_resource_optimization( resource_data, optimization_targets, utilization_thresholds ) response = { "resource_optimization": { "analysis_period": analysis_period, "optimization_targets": optimization_targets, "projects_analyzed": projects, "total_resources": len(resource_data.get("resources", [])) }, "utilization_analysis": optimization_results["utilization"], "cost_optimization": optimization_results.get("cost", {}), "performance_optimization": optimization_results.get("performance", {}), "recommendations": optimization_results["recommendations"] } return [TextContent(type="text", text=json.dumps(response, indent=2, default=str))] async def _custom_dashboard_metrics(self, args: Dict[str, Any]) -> List[TextContent]: """Generate custom dashboard metrics.""" projects = args.get("projects", [self.authenticator.get_project_id()]) dashboard_config = args["dashboard_config"] kpi_definitions = args.get("kpi_definitions", []) output_format = args.get("output_format", "json") # Generate dashboard based on type dashboard_type = dashboard_config.get("dashboard_type", "operational") dashboard_data = await self._generate_dashboard_data( projects, dashboard_type, dashboard_config, kpi_definitions ) # Format output based on requested format if output_format == "grafana": formatted_output = await self._format_for_grafana(dashboard_data) elif output_format == "csv": formatted_output = await self._format_for_csv(dashboard_data) else: # json formatted_output = dashboard_data response = { "dashboard_metadata": { "dashboard_type": dashboard_type, "projects_included": projects, "kpis_count": len(kpi_definitions), "output_format": output_format }, "dashboard_data": formatted_output } return [TextContent(type="text", text=json.dumps(response, indent=2, default=str))] async def _infrastructure_health_check(self, args: Dict[str, Any]) -> List[TextContent]: """Perform comprehensive infrastructure health check.""" projects = args.get("projects", [self.authenticator.get_project_id()]) health_check_scope = args.get("health_check_scope", {}) check_categories = args.get("check_categories", ["availability", "performance", "security"]) severity_threshold = args.get("severity_threshold", "warning") include_recommendations = args.get("include_recommendations", True) # Perform health checks by category health_results = {} for category in check_categories: health_results[category] = await self._perform_health_check_category( projects, category, health_check_scope, severity_threshold ) # Generate overall health score overall_health = await self._calculate_overall_health_score(health_results) # Generate recommendations if requested recommendations = [] if include_recommendations: recommendations = await self._generate_health_recommendations(health_results) response = { "health_check_summary": { "projects_checked": projects, "categories_checked": check_categories, "severity_threshold": severity_threshold, "overall_health_score": overall_health["score"], "health_grade": overall_health["grade"] }, "detailed_results": health_results, "critical_issues": overall_health["critical_issues"], "recommendations": recommendations } return [TextContent(type="text", text=json.dumps(response, indent=2, default=str))] # Helper methods (placeholder implementations for enterprise features) async def _execute_metric_query(self, project: str, query: Dict, start_time: datetime, end_time: datetime) -> Dict: """Execute a single metric query.""" # Enterprise implementation would use Cloud Monitoring API return { "project": project, "metric_type": query["metric_type"], "values": [], # Placeholder for actual metric values "status": "success" } def _parse_time(self, time_str: str) -> datetime: """Parse time string to datetime.""" if time_str.endswith(('h', 'm', 'd')): if time_str.endswith('h'): hours = int(time_str[:-1]) return datetime.utcnow() - timedelta(hours=hours) elif time_str.endswith('m'): minutes = int(time_str[:-1]) return datetime.utcnow() - timedelta(minutes=minutes) elif time_str.endswith('d'): days = int(time_str[:-1]) return datetime.utcnow() - timedelta(days=days) try: return datetime.fromisoformat(time_str.replace('Z', '+00:00')) except ValueError: raise ValidationError(f"Invalid time format: {time_str}") # Placeholder implementations for complex analysis methods async def _detect_metric_anomalies(self, results: List[Dict]) -> List[Dict]: return [{"anomaly": "placeholder"}] async def _calculate_metric_trends(self, results: List[Dict]) -> List[Dict]: return [{"trend": "placeholder"}] async def _generate_metric_forecasts(self, results: List[Dict]) -> List[Dict]: return [{"forecast": "placeholder"}] async def _analyze_metric_correlations(self, results: List[Dict]) -> List[Dict]: return [{"correlation": "placeholder"}] async def _analyze_single_slo(self, projects: List[str], slo: Dict, period: str, include_budget: bool) -> Dict: return {"slo_analysis": "placeholder"} async def _generate_sla_summary(self, slo_results: List[Dict]) -> Dict: return {"summary": "placeholder"} async def _generate_slo_recommendations(self, slo_results: List[Dict]) -> List[str]: return ["slo recommendation placeholder"] async def _collect_alert_data(self, projects: List[str], scope: Dict, time_range: str) -> Dict: return {"policies": [], "incidents": []} async def _analyze_alert_effectiveness(self, alert_data: Dict) -> Dict: return {"effectiveness": "analysis placeholder"} async def _analyze_alert_fatigue(self, alert_data: Dict) -> Dict: return {"fatigue": "analysis placeholder"} async def _analyze_alert_coverage(self, alert_data: Dict) -> Dict: return {"coverage": "analysis placeholder"} async def _analyze_alert_optimization(self, alert_data: Dict) -> Dict: return {"optimization": "analysis placeholder"} async def _generate_alert_recommendations(self, analysis_results: Dict) -> List[str]: return ["alert recommendation placeholder"] async def _collect_resource_utilization_data(self, projects: List[str], scope: Dict, period: str) -> Dict: return {"resources": []} async def _analyze_resource_optimization(self, resource_data: Dict, targets: List[str], thresholds: Dict) -> Dict: return { "utilization": {}, "cost": {}, "performance": {}, "recommendations": [] } async def _generate_dashboard_data(self, projects: List[str], dashboard_type: str, config: Dict, kpis: List[Dict]) -> Dict: return {"dashboard": "data placeholder"} async def _format_for_grafana(self, dashboard_data: Dict) -> Dict: return {"grafana": "format placeholder"} async def _format_for_csv(self, dashboard_data: Dict) -> str: return "csv,format,placeholder" async def _perform_health_check_category(self, projects: List[str], category: str, scope: Dict, threshold: str) -> Dict: return {"health_check": "placeholder"} async def _calculate_overall_health_score(self, health_results: Dict) -> Dict: return { "score": 85, "grade": "B", "critical_issues": [] } async def _generate_health_recommendations(self, health_results: Dict) -> List[str]: return ["health recommendation placeholder"]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JayRajGoyal/gcp-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server