Skip to main content
Glama
enterprise_logging_tools.py40.2 kB
"""Enterprise-grade Cloud Logging tools for MCP server.""" import json import re import asyncio from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Union, Set from collections import defaultdict, Counter from concurrent.futures import ThreadPoolExecutor, as_completed import structlog from google.cloud import logging as cloud_logging from google.cloud.logging import Resource from mcp.types import Tool, TextContent from pydantic import BaseModel, Field, validator from ..auth import GCPAuthenticator from ..config import Config from ..exceptions import GCPServiceError, ValidationError logger = structlog.get_logger(__name__) class EnterpriseLoggingTools: """Enterprise-grade Cloud Logging tools for complex queries and analysis.""" def __init__(self, authenticator: GCPAuthenticator, config: Config): """Initialize enterprise logging tools.""" self.authenticator = authenticator self.config = config self.cache = {} self.executor = ThreadPoolExecutor(max_workers=10) async def initialize(self) -> None: """Initialize the logging tools.""" if not self.authenticator.logging_client: raise GCPServiceError("Logging client not initialized") logger.info("Enterprise logging tools initialized") async def get_tools(self) -> List[Tool]: """Get available enterprise logging tools.""" return [ Tool( name="advanced_log_query", description="Advanced multi-project log querying with complex filters, aggregations, and analytics. Supports enterprise-scale log analysis across multiple GCP projects with sophisticated filtering.", inputSchema={ "type": "object", "properties": { "projects": { "type": "array", "items": {"type": "string"}, "description": "List of GCP project IDs to query (supports multi-project queries)" }, "filter_expression": { "type": "string", "description": "Advanced Cloud Logging filter expression using AND/OR logic, regex patterns, and complex conditions" }, "time_range": { "type": "object", "properties": { "start": {"type": "string", "description": "Start time (ISO format or relative like '24h', '7d')"}, "end": {"type": "string", "description": "End time (ISO format, defaults to now)"} }, "required": ["start"] }, "aggregation": { "type": "object", "properties": { "group_by": { "type": "array", "items": {"type": "string"}, "description": "Fields to group by (e.g., ['resource.type', 'severity'])" }, "time_window": {"type": "string", "description": "Time window for aggregation (1m, 5m, 1h, 1d)"}, "metrics": { "type": "array", "items": {"type": "string"}, "description": "Metrics to calculate (count, rate, percentiles)" } } }, "advanced_options": { "type": "object", "properties": { "include_audit_logs": {"type": "boolean", "default": False}, "include_system_logs": {"type": "boolean", "default": True}, "exclude_patterns": { "type": "array", "items": {"type": "string"}, "description": "Regex patterns to exclude from results" }, "sampling_rate": {"type": "number", "minimum": 0.1, "maximum": 1.0, "default": 1.0}, "max_results": {"type": "integer", "minimum": 1, "maximum": 50000, "default": 1000} } } }, "required": ["time_range"] } ), Tool( name="error_root_cause_analysis", description="Comprehensive error analysis and root cause investigation across services. Identifies error patterns, correlations, and provides actionable insights for incident response.", inputSchema={ "type": "object", "properties": { "projects": { "type": "array", "items": {"type": "string"}, "description": "GCP project IDs to analyze" }, "analysis_scope": { "type": "object", "properties": { "services": { "type": "array", "items": {"type": "string"}, "description": "Specific services to analyze (optional)" }, "environments": { "type": "array", "items": {"type": "string"}, "description": "Environments to include (prod, staging, dev)" }, "minimum_severity": {"type": "string", "enum": ["WARNING", "ERROR", "CRITICAL"], "default": "ERROR"} } }, "time_window": { "type": "string", "description": "Analysis time window (1h, 6h, 24h, 7d)", "default": "6h" }, "correlation_analysis": { "type": "object", "properties": { "enable_service_correlation": {"type": "boolean", "default": True}, "enable_temporal_correlation": {"type": "boolean", "default": True}, "correlation_threshold": {"type": "number", "minimum": 0.5, "maximum": 1.0, "default": 0.8} } }, "output_format": { "type": "string", "enum": ["detailed", "summary", "actionable"], "default": "detailed", "description": "Level of detail in the analysis output" } }, "required": [] } ), Tool( name="security_log_analysis", description="Security-focused log analysis for threat detection, compliance monitoring, and security incident investigation. Analyzes audit logs, access patterns, and security events.", inputSchema={ "type": "object", "properties": { "projects": { "type": "array", "items": {"type": "string"}, "description": "GCP project IDs for security analysis" }, "analysis_type": { "type": "string", "enum": ["threat_detection", "compliance_audit", "access_analysis", "security_incidents"], "default": "threat_detection", "description": "Type of security analysis to perform" }, "time_range": { "type": "string", "description": "Time range for security analysis", "default": "24h" }, "security_scope": { "type": "object", "properties": { "include_audit_logs": {"type": "boolean", "default": True}, "include_vpc_flows": {"type": "boolean", "default": True}, "include_dns_logs": {"type": "boolean", "default": True}, "include_firewall_logs": {"type": "boolean", "default": True} } }, "threat_indicators": { "type": "array", "items": {"type": "string"}, "description": "Custom threat indicators or IOCs to search for" }, "compliance_framework": { "type": "string", "enum": ["SOC2", "PCI-DSS", "HIPAA", "GDPR", "CUSTOM"], "description": "Compliance framework for audit analysis" } }, "required": [] } ), Tool( name="performance_log_analysis", description="Performance-focused log analysis for identifying bottlenecks, latency issues, and optimization opportunities across services and infrastructure.", inputSchema={ "type": "object", "properties": { "projects": { "type": "array", "items": {"type": "string"}, "description": "GCP project IDs to analyze" }, "performance_scope": { "type": "object", "properties": { "resource_types": { "type": "array", "items": {"type": "string"}, "description": "Resource types to analyze (k8s_container, gce_instance, cloud_function, etc.)" }, "services": { "type": "array", "items": {"type": "string"}, "description": "Specific services to focus on" }, "performance_metrics": { "type": "array", "items": {"type": "string"}, "description": "Performance metrics to analyze (latency, throughput, errors, resource_usage)" } } }, "analysis_period": { "type": "string", "description": "Time period for performance analysis", "default": "4h" }, "thresholds": { "type": "object", "properties": { "latency_threshold_ms": {"type": "number", "default": 1000}, "error_rate_threshold": {"type": "number", "default": 0.05}, "cpu_threshold": {"type": "number", "default": 0.8}, "memory_threshold": {"type": "number", "default": 0.8} } } }, "required": [] } ), Tool( name="log_pattern_discovery", description="Automated pattern discovery and anomaly detection in logs using advanced analytics. Identifies unusual patterns, emerging issues, and trending topics.", inputSchema={ "type": "object", "properties": { "projects": { "type": "array", "items": {"type": "string"}, "description": "GCP project IDs for pattern analysis" }, "discovery_scope": { "type": "object", "properties": { "log_types": { "type": "array", "items": {"type": "string"}, "description": "Types of logs to analyze (application, system, security, audit)" }, "pattern_types": { "type": "array", "items": {"type": "string"}, "description": "Types of patterns to discover (errors, warnings, anomalies, trends)" } } }, "time_window": { "type": "string", "description": "Time window for pattern discovery", "default": "12h" }, "baseline_period": { "type": "string", "description": "Baseline period for anomaly detection", "default": "7d" }, "sensitivity": { "type": "string", "enum": ["low", "medium", "high"], "default": "medium", "description": "Sensitivity level for anomaly detection" }, "clustering": { "type": "object", "properties": { "enable_message_clustering": {"type": "boolean", "default": True}, "enable_temporal_clustering": {"type": "boolean", "default": True}, "similarity_threshold": {"type": "number", "minimum": 0.5, "maximum": 1.0, "default": 0.7} } } }, "required": [] } ), Tool( name="cross_service_trace_analysis", description="Analyze logs across multiple services to trace requests, identify dependencies, and troubleshoot distributed system issues.", inputSchema={ "type": "object", "properties": { "projects": { "type": "array", "items": {"type": "string"}, "description": "GCP project IDs to trace across" }, "trace_scope": { "type": "object", "properties": { "trace_id": {"type": "string", "description": "Specific trace ID to follow"}, "request_id": {"type": "string", "description": "Request ID to trace"}, "user_id": {"type": "string", "description": "User ID to trace requests for"}, "service_path": { "type": "array", "items": {"type": "string"}, "description": "Expected service call path" } } }, "time_range": { "type": "string", "description": "Time range for trace analysis", "default": "2h" }, "analysis_depth": { "type": "string", "enum": ["shallow", "deep", "comprehensive"], "default": "deep", "description": "Depth of trace analysis" } }, "required": [] } ) ] async def handle_tool_call(self, name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Handle tool calls for enterprise logging operations.""" try: if name == "advanced_log_query": return await self._advanced_log_query(arguments) elif name == "error_root_cause_analysis": return await self._error_root_cause_analysis(arguments) elif name == "security_log_analysis": return await self._security_log_analysis(arguments) elif name == "performance_log_analysis": return await self._performance_log_analysis(arguments) elif name == "log_pattern_discovery": return await self._log_pattern_discovery(arguments) elif name == "cross_service_trace_analysis": return await self._cross_service_trace_analysis(arguments) else: raise ValidationError(f"Unknown enterprise logging tool: {name}") except Exception as e: logger.error("Enterprise logging tool failed", tool=name, error=str(e)) return [TextContent(type="text", text=f"Error executing {name}: {str(e)}")] async def _advanced_log_query(self, args: Dict[str, Any]) -> List[TextContent]: """Execute advanced multi-project log query.""" projects = args.get("projects", [self.authenticator.get_project_id()]) filter_expr = args.get("filter_expression", "") time_range = args["time_range"] aggregation = args.get("aggregation", {}) advanced_options = args.get("advanced_options", {}) start_time = self._parse_time(time_range["start"]) end_time = self._parse_time(time_range.get("end")) if time_range.get("end") else datetime.utcnow() # Build comprehensive filter filter_parts = [] # Time filter if start_time: filter_parts.append(f'timestamp>="{start_time.isoformat()}Z"') if end_time: filter_parts.append(f'timestamp<="{end_time.isoformat()}Z"') # Custom filter expression if filter_expr: filter_parts.append(f"({filter_expr})") # Advanced options if not advanced_options.get("include_audit_logs", False): filter_parts.append('NOT log_name:"cloudaudit.googleapis.com"') if not advanced_options.get("include_system_logs", True): filter_parts.append('NOT log_name:"compute.googleapis.com"') final_filter = " AND ".join(filter_parts) max_results = advanced_options.get("max_results", 1000) # Execute queries across multiple projects all_results = [] query_tasks = [] for project in projects: task = self._execute_project_query(project, final_filter, max_results // len(projects)) query_tasks.append(task) # Execute queries in parallel results = await asyncio.gather(*query_tasks, return_exceptions=True) for result in results: if isinstance(result, Exception): logger.error("Project query failed", error=str(result)) else: all_results.extend(result) # Apply aggregation if requested if aggregation.get("group_by"): aggregated_results = self._aggregate_results(all_results, aggregation) response = { "query_summary": { "total_entries": len(all_results), "projects_queried": projects, "time_range": f"{start_time.isoformat()} to {end_time.isoformat()}", "filter": final_filter }, "aggregated_results": aggregated_results, "sample_entries": all_results[:10] } else: # Apply exclude patterns if advanced_options.get("exclude_patterns"): all_results = self._apply_exclude_patterns(all_results, advanced_options["exclude_patterns"]) response = { "query_summary": { "total_entries": len(all_results), "projects_queried": projects, "time_range": f"{start_time.isoformat()} to {end_time.isoformat()}", "filter": final_filter }, "results": all_results[:max_results] } return [TextContent(type="text", text=json.dumps(response, indent=2, default=str))] async def _error_root_cause_analysis(self, args: Dict[str, Any]) -> List[TextContent]: """Perform comprehensive error root cause analysis.""" projects = args.get("projects", [self.authenticator.get_project_id()]) analysis_scope = args.get("analysis_scope", {}) time_window = args.get("time_window", "6h") correlation_analysis = args.get("correlation_analysis", {}) output_format = args.get("output_format", "detailed") start_time = datetime.utcnow() - self._parse_duration(time_window) minimum_severity = analysis_scope.get("minimum_severity", "ERROR") # Build filter for error analysis filter_parts = [ f'timestamp>="{start_time.isoformat()}Z"', f'severity>={minimum_severity}' ] if analysis_scope.get("services"): service_filters = [f'resource.labels.service_name="{svc}"' for svc in analysis_scope["services"]] filter_parts.append(f"({' OR '.join(service_filters)})") if analysis_scope.get("environments"): env_filters = [f'labels.environment="{env}"' for env in analysis_scope["environments"]] filter_parts.append(f"({' OR '.join(env_filters)})") filter_expr = " AND ".join(filter_parts) # Collect error data from all projects all_errors = [] for project in projects: errors = await self._execute_project_query(project, filter_expr, 5000) all_errors.extend(errors) # Perform root cause analysis analysis_results = await self._perform_rca_analysis(all_errors, correlation_analysis) # Format results based on output format if output_format == "summary": response = self._format_rca_summary(analysis_results) elif output_format == "actionable": response = self._format_rca_actionable(analysis_results) else: # detailed response = self._format_rca_detailed(analysis_results) return [TextContent(type="text", text=json.dumps(response, indent=2, default=str))] async def _security_log_analysis(self, args: Dict[str, Any]) -> List[TextContent]: """Perform security-focused log analysis.""" projects = args.get("projects", [self.authenticator.get_project_id()]) analysis_type = args.get("analysis_type", "threat_detection") time_range = args.get("time_range", "24h") security_scope = args.get("security_scope", {}) threat_indicators = args.get("threat_indicators", []) compliance_framework = args.get("compliance_framework") start_time = datetime.utcnow() - self._parse_duration(time_range) # Build security-specific filters security_filters = [] if security_scope.get("include_audit_logs", True): security_filters.append('log_name:"cloudaudit.googleapis.com"') if security_scope.get("include_vpc_flows", True): security_filters.append('log_name:"compute.googleapis.com/vpc_flows"') if security_scope.get("include_dns_logs", True): security_filters.append('log_name:"dns.googleapis.com"') if security_scope.get("include_firewall_logs", True): security_filters.append('log_name:"compute.googleapis.com/firewall"') base_filter = f'timestamp>="{start_time.isoformat()}Z"' if security_filters: base_filter += f' AND ({" OR ".join(security_filters)})' # Add threat indicator filters if threat_indicators: indicator_filters = [f'textPayload:"{indicator}"' for indicator in threat_indicators] base_filter += f' AND ({" OR ".join(indicator_filters)})' # Collect security logs all_security_logs = [] for project in projects: logs = await self._execute_project_query(project, base_filter, 10000) all_security_logs.extend(logs) # Perform security analysis based on type if analysis_type == "threat_detection": analysis_results = await self._threat_detection_analysis(all_security_logs) elif analysis_type == "compliance_audit": analysis_results = await self._compliance_audit_analysis(all_security_logs, compliance_framework) elif analysis_type == "access_analysis": analysis_results = await self._access_pattern_analysis(all_security_logs) else: # security_incidents analysis_results = await self._security_incident_analysis(all_security_logs) response = { "security_analysis": { "analysis_type": analysis_type, "time_range": time_range, "projects_analyzed": projects, "total_security_events": len(all_security_logs) }, "findings": analysis_results, "recommendations": self._generate_security_recommendations(analysis_results) } return [TextContent(type="text", text=json.dumps(response, indent=2, default=str))] async def _performance_log_analysis(self, args: Dict[str, Any]) -> List[TextContent]: """Perform performance-focused log analysis.""" projects = args.get("projects", [self.authenticator.get_project_id()]) performance_scope = args.get("performance_scope", {}) analysis_period = args.get("analysis_period", "4h") thresholds = args.get("thresholds", {}) start_time = datetime.utcnow() - self._parse_duration(analysis_period) # Build performance-specific filters filter_parts = [f'timestamp>="{start_time.isoformat()}Z"'] if performance_scope.get("resource_types"): resource_filters = [f'resource.type="{rt}"' for rt in performance_scope["resource_types"]] filter_parts.append(f"({' OR '.join(resource_filters)})") if performance_scope.get("services"): service_filters = [f'resource.labels.service_name="{svc}"' for svc in performance_scope["services"]] filter_parts.append(f"({' OR '.join(service_filters)})") # Add performance-related log filters perf_keywords = ["latency", "timeout", "slow", "performance", "response_time", "duration"] keyword_filters = [f'textPayload:"{keyword}"' for keyword in perf_keywords] filter_parts.append(f"({' OR '.join(keyword_filters)})") filter_expr = " AND ".join(filter_parts) # Collect performance logs all_perf_logs = [] for project in projects: logs = await self._execute_project_query(project, filter_expr, 5000) all_perf_logs.extend(logs) # Perform performance analysis performance_analysis = await self._analyze_performance_logs(all_perf_logs, thresholds) response = { "performance_analysis": { "analysis_period": analysis_period, "projects_analyzed": projects, "total_performance_events": len(all_perf_logs), "thresholds_applied": thresholds }, "findings": performance_analysis, "optimization_recommendations": self._generate_performance_recommendations(performance_analysis) } return [TextContent(type="text", text=json.dumps(response, indent=2, default=str))] async def _log_pattern_discovery(self, args: Dict[str, Any]) -> List[TextContent]: """Discover patterns and anomalies in logs.""" projects = args.get("projects", [self.authenticator.get_project_id()]) discovery_scope = args.get("discovery_scope", {}) time_window = args.get("time_window", "12h") baseline_period = args.get("baseline_period", "7d") sensitivity = args.get("sensitivity", "medium") clustering = args.get("clustering", {}) current_time = datetime.utcnow() analysis_start = current_time - self._parse_duration(time_window) baseline_start = current_time - self._parse_duration(baseline_period) # Collect current period logs current_filter = f'timestamp>="{analysis_start.isoformat()}Z"' current_logs = [] for project in projects: logs = await self._execute_project_query(project, current_filter, 10000) current_logs.extend(logs) # Collect baseline logs for comparison baseline_filter = f'timestamp>="{baseline_start.isoformat()}Z" AND timestamp<"{analysis_start.isoformat()}Z"' baseline_logs = [] for project in projects: logs = await self._execute_project_query(project, baseline_filter, 20000) baseline_logs.extend(logs) # Perform pattern discovery pattern_analysis = await self._discover_log_patterns( current_logs, baseline_logs, sensitivity, clustering ) response = { "pattern_discovery": { "analysis_window": time_window, "baseline_period": baseline_period, "sensitivity": sensitivity, "current_period_logs": len(current_logs), "baseline_period_logs": len(baseline_logs) }, "discovered_patterns": pattern_analysis["patterns"], "anomalies": pattern_analysis["anomalies"], "trending_topics": pattern_analysis["trends"], "insights": pattern_analysis["insights"] } return [TextContent(type="text", text=json.dumps(response, indent=2, default=str))] async def _cross_service_trace_analysis(self, args: Dict[str, Any]) -> List[TextContent]: """Analyze traces across multiple services.""" projects = args.get("projects", [self.authenticator.get_project_id()]) trace_scope = args.get("trace_scope", {}) time_range = args.get("time_range", "2h") analysis_depth = args.get("analysis_depth", "deep") start_time = datetime.utcnow() - self._parse_duration(time_range) # Build trace-specific filters filter_parts = [f'timestamp>="{start_time.isoformat()}Z"'] if trace_scope.get("trace_id"): filter_parts.append(f'trace="{trace_scope["trace_id"]}"') elif trace_scope.get("request_id"): filter_parts.append(f'labels.request_id="{trace_scope["request_id"]}"') elif trace_scope.get("user_id"): filter_parts.append(f'labels.user_id="{trace_scope["user_id"]}"') filter_expr = " AND ".join(filter_parts) # Collect trace logs all_trace_logs = [] for project in projects: logs = await self._execute_project_query(project, filter_expr, 5000) all_trace_logs.extend(logs) # Perform trace analysis trace_analysis = await self._analyze_cross_service_traces(all_trace_logs, analysis_depth) response = { "trace_analysis": { "time_range": time_range, "analysis_depth": analysis_depth, "total_trace_events": len(all_trace_logs), "trace_scope": trace_scope }, "service_flow": trace_analysis["service_flow"], "timing_analysis": trace_analysis["timing"], "error_points": trace_analysis["errors"], "recommendations": trace_analysis["recommendations"] } return [TextContent(type="text", text=json.dumps(response, indent=2, default=str))] # Helper methods (implementations would be quite extensive) async def _execute_project_query(self, project_id: str, filter_expr: str, limit: int) -> List[Dict[str, Any]]: """Execute a query against a specific project.""" try: client = self.authenticator.logging_client entries = client.list_entries( resource_names=[f"projects/{project_id}"], filter_=filter_expr, order_by="timestamp desc", page_size=limit ) results = [] count = 0 for entry in entries: if count >= limit: break results.append({ "timestamp": entry.timestamp.isoformat() if entry.timestamp else None, "severity": entry.severity, "resource": { "type": entry.resource.type if entry.resource else None, "labels": dict(entry.resource.labels) if entry.resource else {} }, "message": str(entry.payload), "log_name": entry.log_name, "labels": dict(entry.labels) if entry.labels else {}, "project_id": project_id }) count += 1 return results except Exception as e: logger.error("Project query failed", project=project_id, error=str(e)) return [] def _parse_time(self, time_str: Optional[str]) -> Optional[datetime]: """Parse time string to datetime.""" if not time_str: return None # Try relative time first if time_str.endswith(('h', 'm', 'd')): return datetime.utcnow() - self._parse_duration(time_str) # Try ISO format try: return datetime.fromisoformat(time_str.replace('Z', '+00:00')) except ValueError: raise ValidationError(f"Invalid time format: {time_str}") def _parse_duration(self, duration_str: str) -> timedelta: """Parse duration string to timedelta.""" if duration_str.endswith('h'): hours = int(duration_str[:-1]) return timedelta(hours=hours) elif duration_str.endswith('m'): minutes = int(duration_str[:-1]) return timedelta(minutes=minutes) elif duration_str.endswith('d'): days = int(duration_str[:-1]) return timedelta(days=days) else: raise ValidationError(f"Invalid duration format: {duration_str}") # Placeholder implementations for complex analysis methods def _aggregate_results(self, results: List[Dict], aggregation: Dict) -> Dict: """Aggregate results based on specified grouping.""" # Implementation would group results by specified fields return {"aggregated": "results placeholder"} async def _perform_rca_analysis(self, errors: List[Dict], correlation: Dict) -> Dict: """Perform root cause analysis on errors.""" # Implementation would analyze error patterns, correlations, and root causes return {"rca": "analysis placeholder"} def _format_rca_summary(self, analysis: Dict) -> Dict: """Format RCA results as summary.""" return {"summary": "format placeholder"} def _format_rca_actionable(self, analysis: Dict) -> Dict: """Format RCA results as actionable insights.""" return {"actionable": "format placeholder"} def _format_rca_detailed(self, analysis: Dict) -> Dict: """Format RCA results with full details.""" return {"detailed": "format placeholder"} async def _threat_detection_analysis(self, logs: List[Dict]) -> Dict: """Analyze logs for security threats.""" return {"threats": "analysis placeholder"} async def _compliance_audit_analysis(self, logs: List[Dict], framework: str) -> Dict: """Analyze logs for compliance.""" return {"compliance": "analysis placeholder"} async def _access_pattern_analysis(self, logs: List[Dict]) -> Dict: """Analyze access patterns.""" return {"access_patterns": "analysis placeholder"} async def _security_incident_analysis(self, logs: List[Dict]) -> Dict: """Analyze security incidents.""" return {"incidents": "analysis placeholder"} def _generate_security_recommendations(self, analysis: Dict) -> List[str]: """Generate security recommendations.""" return ["security recommendation placeholder"] async def _analyze_performance_logs(self, logs: List[Dict], thresholds: Dict) -> Dict: """Analyze performance logs.""" return {"performance": "analysis placeholder"} def _generate_performance_recommendations(self, analysis: Dict) -> List[str]: """Generate performance recommendations.""" return ["performance recommendation placeholder"] async def _discover_log_patterns(self, current_logs: List[Dict], baseline_logs: List[Dict], sensitivity: str, clustering: Dict) -> Dict: """Discover patterns in logs.""" return { "patterns": [], "anomalies": [], "trends": [], "insights": [] } async def _analyze_cross_service_traces(self, logs: List[Dict], depth: str) -> Dict: """Analyze cross-service traces.""" return { "service_flow": [], "timing": {}, "errors": [], "recommendations": [] } def _apply_exclude_patterns(self, results: List[Dict], patterns: List[str]) -> List[Dict]: """Apply exclude patterns to filter results.""" filtered = [] for result in results: message = result.get("message", "") exclude = False for pattern in patterns: if re.search(pattern, message, re.IGNORECASE): exclude = True break if not exclude: filtered.append(result) return filtered

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JayRajGoyal/gcp-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server