Skip to main content
Glama
cloud_monitoring.py31.1 kB
import datetime import json from typing import Dict, List, Optional from google.cloud import monitoring_v3 from google.protobuf.duration_pb2 import Duration from google.protobuf.timestamp_pb2 import Timestamp from services import client_instances def register(mcp_instance): """Register all Cloud Monitoring resources and tools with the MCP instance.""" # Resources @mcp_instance.resource("gcp://monitoring/{project_id}/metrics") def list_metrics_resource(project_id: str = None) -> str: """List all available metrics for a GCP project""" try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() name = f"projects/{project_id}" metrics = client.list_metric_descriptors(name=name) result = [] for metric in metrics: result.append( { "name": metric.name, "type": metric.type, "display_name": metric.display_name, "description": metric.description, "kind": monitoring_v3.MetricDescriptor.MetricKind.Name( metric.metric_kind ), "value_type": monitoring_v3.MetricDescriptor.ValueType.Name( metric.value_type ), "unit": metric.unit, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://monitoring/{project_id}/alerts") def list_alerts_resource(project_id: str = None) -> str: """List all alert policies for a GCP project""" try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() name = f"projects/{project_id}" policies = client.list_alert_policies(name=name) result = [] for policy in policies: result.append( { "name": policy.name, "display_name": policy.display_name, "enabled": policy.enabled, "conditions_count": len(policy.conditions), "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat() if policy.creation_record and policy.creation_record.mutate_time else None, "notification_channels": [ chan.split("/")[-1] for chan in policy.notification_channels ] if policy.notification_channels else [], } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://monitoring/{project_id}/alert/{alert_id}") def get_alert_resource(project_id: str = None, alert_id: str = None) -> str: """Get details for a specific alert policy""" try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() name = f"projects/{project_id}/alertPolicies/{alert_id}" policy = client.get_alert_policy(name=name) conditions = [] for condition in policy.conditions: condition_data = { "name": condition.name, "display_name": condition.display_name, "type": condition.condition_type_name if hasattr(condition, "condition_type_name") else None, } # Add condition-specific details if condition.HasField("condition_threshold"): threshold = condition.condition_threshold condition_data["threshold"] = { "filter": threshold.filter, "comparison": monitoring_v3.ComparisonType.Name( threshold.comparison ) if threshold.comparison else None, "threshold_value": threshold.threshold_value, "duration": f"{threshold.duration.seconds}s" if threshold.duration else None, "aggregation": { "alignment_period": f"{threshold.aggregations[0].alignment_period.seconds}s" if threshold.aggregations and threshold.aggregations[0].alignment_period else None, "per_series_aligner": monitoring_v3.Aggregation.Aligner.Name( threshold.aggregations[0].per_series_aligner ) if threshold.aggregations and threshold.aggregations[0].per_series_aligner else None, "cross_series_reducer": monitoring_v3.Aggregation.Reducer.Name( threshold.aggregations[0].cross_series_reducer ) if threshold.aggregations and threshold.aggregations[0].cross_series_reducer else None, } if threshold.aggregations and len(threshold.aggregations) > 0 else None, } conditions.append(condition_data) result = { "name": policy.name, "display_name": policy.display_name, "enabled": policy.enabled, "conditions": conditions, "combiner": monitoring_v3.AlertPolicy.ConditionCombinerType.Name( policy.combiner ) if policy.combiner else None, "notification_channels": policy.notification_channels, "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat() if policy.creation_record and policy.creation_record.mutate_time else None, "documentation": { "content": policy.documentation.content, "mime_type": policy.documentation.mime_type, } if policy.HasField("documentation") else None, } return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://monitoring/{project_id}/notification_channels") def list_notification_channels_resource(project_id: str = None) -> str: """List notification channels for a GCP project""" try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() name = f"projects/{project_id}" channels = client.list_notification_channels(name=name) result = [] for channel in channels: result.append( { "name": channel.name, "type": channel.type, "display_name": channel.display_name, "description": channel.description, "verification_status": monitoring_v3.NotificationChannel.VerificationStatus.Name( channel.verification_status ) if channel.verification_status else None, "enabled": channel.enabled, "labels": dict(channel.labels) if channel.labels else {}, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # Tools @mcp_instance.tool() def list_metrics(project_id: str = None, filter_str: str = "") -> str: """ List metrics in a GCP project with optional filtering Args: project_id: GCP project ID (defaults to configured project) filter_str: Optional filter string to narrow results (e.g., "metric.type = starts_with(\"compute.googleapis.com\")") """ try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() parent = f"projects/{project_id}" request = monitoring_v3.ListMetricDescriptorsRequest( name=parent, filter=filter_str ) print(f"Listing metrics for project {project_id}...") metrics = client.list_metric_descriptors(request=request) result = [] for metric in metrics: result.append( { "name": metric.name, "type": metric.type, "display_name": metric.display_name, "description": metric.description, "kind": monitoring_v3.MetricDescriptor.MetricKind.Name( metric.metric_kind ), "value_type": monitoring_v3.MetricDescriptor.ValueType.Name( metric.value_type ), "unit": metric.unit, } ) return json.dumps( {"status": "success", "metrics": result, "count": len(result)}, indent=2 ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def fetch_metric_timeseries( metric_type: str, project_id: str = None, filter_additions: str = "", hours: int = 1, alignment_period_seconds: int = 60, ) -> str: """ Fetch time series data for a specific metric Args: metric_type: The metric type (e.g., "compute.googleapis.com/instance/cpu/utilization") project_id: GCP project ID (defaults to configured project) filter_additions: Additional filter criteria (e.g., "resource.labels.instance_id = \"my-instance\"") hours: Number of hours of data to fetch (default: 1) alignment_period_seconds: Data point alignment period in seconds (default: 60) """ try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() # Build the filter filter_str = f'metric.type = "{metric_type}"' if filter_additions: filter_str += f" AND {filter_additions}" # Calculate time interval now = datetime.datetime.utcnow() seconds = int(now.timestamp()) end_time = Timestamp(seconds=seconds) start_time = Timestamp(seconds=seconds - hours * 3600) # Create interval interval = monitoring_v3.TimeInterval( end_time=end_time, start_time=start_time ) # Create aggregation aggregation = monitoring_v3.Aggregation( alignment_period=Duration(seconds=alignment_period_seconds), per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN, ) # Build request request = monitoring_v3.ListTimeSeriesRequest( name=f"projects/{project_id}", filter=filter_str, interval=interval, view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, aggregation=aggregation, ) print(f"Fetching time series data for {metric_type}...") time_series = client.list_time_series(request=request) result = [] for series in time_series: data_points = [] for point in series.points: point_time = point.interval.end_time.ToDatetime().isoformat() # Handle different value types if point.value.HasField("double_value"): value = point.value.double_value elif point.value.HasField("int64_value"): value = point.value.int64_value elif point.value.HasField("bool_value"): value = point.value.bool_value elif point.value.HasField("string_value"): value = point.value.string_value elif point.value.HasField("distribution_value"): value = ( "distribution" # Simplified, as distributions are complex ) else: value = None data_points.append({"time": point_time, "value": value}) series_data = { "metric": dict(series.metric.labels) if series.metric and series.metric.labels else {}, "resource": { "type": series.resource.type, "labels": dict(series.resource.labels) if series.resource and series.resource.labels else {}, } if series.resource else {}, "points": data_points, } result.append(series_data) return json.dumps( { "status": "success", "metric_type": metric_type, "time_range": { "start": start_time.ToDatetime().isoformat(), "end": end_time.ToDatetime().isoformat(), }, "time_series": result, "count": len(result), }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def list_alert_policies(project_id: str = None, filter_str: str = "") -> str: """ List alert policies in a GCP project with optional filtering Args: project_id: GCP project ID (defaults to configured project) filter_str: Optional filter string (e.g., "display_name = \"High CPU Alert\"") """ try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() parent = f"projects/{project_id}" request = monitoring_v3.ListAlertPoliciesRequest( name=parent, filter=filter_str ) print(f"Listing alert policies for project {project_id}...") policies = client.list_alert_policies(request=request) result = [] for policy in policies: policy_data = { "name": policy.name, "display_name": policy.display_name, "enabled": policy.enabled, "conditions_count": len(policy.conditions), "combiner": monitoring_v3.AlertPolicy.ConditionCombinerType.Name( policy.combiner ) if policy.combiner else None, "notification_channels": [ chan.split("/")[-1] for chan in policy.notification_channels ] if policy.notification_channels else [], "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat() if policy.creation_record and policy.creation_record.mutate_time else None, } result.append(policy_data) return json.dumps( {"status": "success", "alert_policies": result, "count": len(result)}, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def create_metric_threshold_alert( display_name: str, metric_type: str, filter_str: str, threshold_value: float, project_id: str = None, comparison: str = "COMPARISON_GT", duration_seconds: int = 300, alignment_period_seconds: int = 60, aligner: str = "ALIGN_MEAN", reducer: str = "REDUCE_MEAN", notification_channels: Optional[List[str]] = None, documentation: str = "", enabled: bool = True, ) -> str: """ Create a metric threshold alert policy Args: display_name: Human-readable name for the alert metric_type: The metric to alert on (e.g., "compute.googleapis.com/instance/cpu/utilization") filter_str: Filter string to define which resources to monitor threshold_value: The threshold value to trigger the alert project_id: GCP project ID (defaults to configured project) comparison: Comparison type (COMPARISON_GT, COMPARISON_GE, COMPARISON_LT, COMPARISON_LE, COMPARISON_EQ, COMPARISON_NE) duration_seconds: Duration in seconds the condition must be met to trigger alignment_period_seconds: Period in seconds for data point alignment aligner: Per-series aligner (ALIGN_MEAN, ALIGN_MAX, ALIGN_MIN, etc.) reducer: Cross-series reducer (REDUCE_MEAN, REDUCE_MAX, REDUCE_MIN, etc.) notification_channels: List of notification channel IDs documentation: Documentation for the alert (markdown supported) enabled: Whether the alert should be enabled """ try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() # Validate and convert enums try: comparison_enum = getattr(monitoring_v3.ComparisonType, comparison) aligner_enum = getattr(monitoring_v3.Aggregation.Aligner, aligner) reducer_enum = getattr(monitoring_v3.Aggregation.Reducer, reducer) except AttributeError as e: return json.dumps( { "status": "error", "message": f"Invalid enum value: {str(e)}. Please check documentation for valid values.", }, indent=2, ) # Prepare notification channels with full paths full_notification_channels = [] if notification_channels: for channel in notification_channels: if not channel.startswith( f"projects/{project_id}/notificationChannels/" ): channel = ( f"projects/{project_id}/notificationChannels/{channel}" ) full_notification_channels.append(channel) # Create aggregation aggregation = monitoring_v3.Aggregation( alignment_period=Duration(seconds=alignment_period_seconds), per_series_aligner=aligner_enum, cross_series_reducer=reducer_enum, group_by_fields=[], ) # Create condition threshold condition_threshold = monitoring_v3.AlertPolicy.Condition.MetricThreshold( filter=f'metric.type = "{metric_type}" AND {filter_str}', comparison=comparison_enum, threshold_value=threshold_value, duration=Duration(seconds=duration_seconds), aggregations=[aggregation], ) # Create condition condition = monitoring_v3.AlertPolicy.Condition( display_name=f"Threshold condition for {display_name}", condition_threshold=condition_threshold, ) # Create alert policy alert_policy = monitoring_v3.AlertPolicy( display_name=display_name, conditions=[condition], combiner=monitoring_v3.AlertPolicy.ConditionCombinerType.OR, notification_channels=full_notification_channels, enabled=monitoring_v3.BoolValue(value=enabled), ) # Add documentation if provided if documentation: alert_policy.documentation = monitoring_v3.AlertPolicy.Documentation( content=documentation, mime_type="text/markdown" ) # Create the request request = monitoring_v3.CreateAlertPolicyRequest( name=f"projects/{project_id}", alert_policy=alert_policy ) print(f"Creating alert policy: {display_name}...") response = client.create_alert_policy(request=request) return json.dumps( { "status": "success", "name": response.name, "display_name": response.display_name, "enabled": response.enabled, "conditions_count": len(response.conditions), "notification_channels": response.notification_channels, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def update_alert_policy( alert_id: str, project_id: str = None, display_name: Optional[str] = None, notification_channels: Optional[List[str]] = None, enabled: Optional[bool] = None, documentation: Optional[str] = None, ) -> str: """ Update an existing alert policy Args: alert_id: ID of the alert to update project_id: GCP project ID (defaults to configured project) display_name: New name for the alert notification_channels: List of notification channel IDs enabled: Whether the alert should be enabled documentation: Documentation for the alert (markdown supported) """ try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() # Get the existing policy name = f"projects/{project_id}/alertPolicies/{alert_id}" policy = client.get_alert_policy(name=name) # Update fields if provided if display_name: policy.display_name = display_name if notification_channels is not None: full_notification_channels = [] for channel in notification_channels: if not channel.startswith( f"projects/{project_id}/notificationChannels/" ): channel = ( f"projects/{project_id}/notificationChannels/{channel}" ) full_notification_channels.append(channel) policy.notification_channels = full_notification_channels if enabled is not None: policy.enabled = monitoring_v3.BoolValue(value=enabled) if documentation is not None: policy.documentation = monitoring_v3.AlertPolicy.Documentation( content=documentation, mime_type="text/markdown" ) # Create update mask update_mask = [] if display_name: update_mask.append("display_name") if notification_channels is not None: update_mask.append("notification_channels") if enabled is not None: update_mask.append("enabled") if documentation is not None: update_mask.append("documentation") # Update the policy request = monitoring_v3.UpdateAlertPolicyRequest( alert_policy=policy, update_mask={"paths": update_mask} ) print(f"Updating alert policy: {policy.name}...") response = client.update_alert_policy(request=request) return json.dumps( { "status": "success", "name": response.name, "display_name": response.display_name, "enabled": response.enabled, "conditions_count": len(response.conditions), "notification_channels": [ chan.split("/")[-1] for chan in response.notification_channels ] if response.notification_channels else [], }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def delete_alert_policy(alert_id: str, project_id: str = None) -> str: """ Delete an alert policy Args: alert_id: ID of the alert to delete project_id: GCP project ID (defaults to configured project) """ try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() name = f"projects/{project_id}/alertPolicies/{alert_id}" print(f"Deleting alert policy: {alert_id}...") client.delete_alert_policy(name=name) return json.dumps( { "status": "success", "message": f"Alert policy {alert_id} successfully deleted", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def create_notification_channel( display_name: str, channel_type: str, labels: Dict[str, str], project_id: str = None, description: str = "", enabled: bool = True, ) -> str: """ Create a notification channel Args: display_name: Human-readable name for the channel channel_type: Type of channel (email, sms, slack, pagerduty, etc.) labels: Channel-specific configuration (e.g., {"email_address": "user@example.com"}) project_id: GCP project ID (defaults to configured project) description: Optional description for the channel enabled: Whether the channel should be enabled """ try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() # Create notification channel notification_channel = monitoring_v3.NotificationChannel( type=channel_type, display_name=display_name, description=description, labels=labels, enabled=monitoring_v3.BoolValue(value=enabled), ) # Create the request request = monitoring_v3.CreateNotificationChannelRequest( name=f"projects/{project_id}", notification_channel=notification_channel ) print(f"Creating notification channel: {display_name} ({channel_type})...") response = client.create_notification_channel(request=request) return json.dumps( { "status": "success", "name": response.name, "type": response.type, "display_name": response.display_name, "description": response.description, "verification_status": monitoring_v3.NotificationChannel.VerificationStatus.Name( response.verification_status ) if response.verification_status else None, "enabled": response.enabled, "labels": dict(response.labels) if response.labels else {}, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) # Prompts @mcp_instance.prompt() def create_alert_prompt() -> str: """Prompt for creating a new alert policy""" return """ I need to create a new alert policy in Cloud Monitoring. Please help me with: 1. Selecting the appropriate metric type for my alert 2. Setting up sensible thresholds and durations 3. Understanding the different comparison types 4. Best practices for alert documentation 5. Setting up notification channels I'd like to create an alert that triggers when: """ @mcp_instance.prompt() def monitor_resources_prompt() -> str: """Prompt for guidance on monitoring GCP resources""" return """ I need to set up monitoring for my GCP resources. Please help me understand: 1. What are the most important metrics I should be monitoring for: - Compute Engine instances - Cloud SQL databases - Cloud Storage buckets - App Engine applications - Kubernetes Engine clusters 2. What are recommended thresholds for alerts on these resources? 3. How should I organize my monitoring to keep it manageable? 4. What visualization options do I have in Cloud Monitoring? """

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/enesbol/gcp-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server