Skip to main content
Glama
logging_tools.py22.3 kB
"""Cloud Logging tools for MCP server.""" import json import re from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Union from collections import defaultdict, Counter import structlog from google.cloud import logging as cloud_logging from google.cloud.logging import Resource from mcp.types import Tool, TextContent from pydantic import BaseModel, Field, validator from ..auth import GCPAuthenticator from ..config import Config from ..exceptions import GCPServiceError, ValidationError logger = structlog.get_logger(__name__) class LogQuery(BaseModel): """Log query parameters.""" project_ids: List[str] = Field(default_factory=list, description="List of GCP project IDs") filter: Optional[str] = Field(None, description="Cloud Logging filter expression") start_time: Optional[str] = Field(None, description="Start time (ISO or relative)") end_time: Optional[str] = Field(None, description="End time (ISO or relative)") limit: int = Field(100, ge=1, le=10000, description="Maximum results") order_by: str = Field("timestamp desc", description="Sort order") resource_types: List[str] = Field(default_factory=list, description="Resource types to filter") severities: List[str] = Field(default_factory=list, description="Log severities to include") include_audit_logs: bool = Field(False, description="Include audit logs") @validator('severities') def validate_severities(cls, v): valid_severities = {'DEFAULT', 'DEBUG', 'INFO', 'NOTICE', 'WARNING', 'ERROR', 'CRITICAL', 'ALERT', 'EMERGENCY'} for severity in v: if severity.upper() not in valid_severities: raise ValueError(f"Invalid severity: {severity}") return [s.upper() for s in v] class LogAnalysis(BaseModel): """Log analysis results.""" total_entries: int time_range: str error_patterns: List[Dict[str, Any]] resource_breakdown: Dict[str, int] severity_distribution: Dict[str, int] timeline: List[Dict[str, Any]] recommendations: List[str] class SecurityLogQuery(BaseModel): """Security-focused log query.""" project_ids: List[str] = Field(default_factory=list) time_range: str = Field("24h", description="Time range for security analysis") include_audit: bool = Field(True, description="Include audit logs") include_vpc: bool = Field(True, description="Include VPC flow logs") include_dns: bool = Field(True, description="Include DNS logs") threat_indicators: List[str] = Field(default_factory=list, description="Custom threat indicators") class LoggingTools: """Cloud Logging tools for analyzing GCP logs.""" def __init__(self, authenticator: GCPAuthenticator, config: Config): """Initialize logging tools. Args: authenticator: GCP authenticator instance config: Configuration object """ self.authenticator = authenticator self.config = config async def initialize(self) -> None: """Initialize the logging tools.""" if not self.authenticator.logging_client: raise GCPServiceError("Logging client not initialized") logger.info("Logging tools initialized") async def get_tools(self) -> List[Tool]: """Get available logging tools.""" return [ Tool( name="query_logs", description="Query GCP Cloud Logging for log entries. Useful for investigating issues, finding errors, and analyzing application behavior.", inputSchema={ "type": "object", "properties": { "project_id": { "type": "string", "description": "GCP project ID (optional, uses default if not specified)" }, "filter": { "type": "string", "description": "Log filter using Cloud Logging syntax (e.g., 'severity>=ERROR', 'resource.type=\"gce_instance\"')" }, "start_time": { "type": "string", "description": "Start time in ISO format (e.g., '2024-01-01T00:00:00Z') or relative (e.g., '1h', '30m', '1d')" }, "end_time": { "type": "string", "description": "End time in ISO format (optional, defaults to now)" }, "limit": { "type": "integer", "description": "Maximum number of log entries to return (default: 100, max: 1000)", "minimum": 1, "maximum": 1000 } }, "required": [] } ), Tool( name="analyze_error_logs", description="Analyze error logs to identify patterns, common issues, and root causes. Great for troubleshooting and incident response.", inputSchema={ "type": "object", "properties": { "project_id": { "type": "string", "description": "GCP project ID (optional)" }, "service_name": { "type": "string", "description": "Service or application name to focus analysis on" }, "time_range": { "type": "string", "description": "Time range for analysis (e.g., '1h', '6h', '1d', '7d')", "default": "1h" }, "severity": { "type": "string", "description": "Minimum severity level (DEBUG, INFO, NOTICE, WARNING, ERROR, CRITICAL, ALERT, EMERGENCY)", "default": "ERROR" } }, "required": [] } ), Tool( name="get_recent_errors", description="Get the most recent error logs for quick troubleshooting. Perfect for immediate incident response.", inputSchema={ "type": "object", "properties": { "project_id": { "type": "string", "description": "GCP project ID (optional)" }, "count": { "type": "integer", "description": "Number of recent errors to retrieve (default: 20, max: 100)", "minimum": 1, "maximum": 100, "default": 20 }, "resource_type": { "type": "string", "description": "GCP resource type to filter by (e.g., 'gce_instance', 'k8s_container', 'cloud_function')" } }, "required": [] } ), Tool( name="search_logs_by_message", description="Search logs by message content or pattern. Useful for finding specific events or tracking particular operations.", inputSchema={ "type": "object", "properties": { "project_id": { "type": "string", "description": "GCP project ID (optional)" }, "search_term": { "type": "string", "description": "Text to search for in log messages" }, "time_range": { "type": "string", "description": "Time range to search (e.g., '1h', '6h', '1d')", "default": "1h" }, "case_sensitive": { "type": "boolean", "description": "Whether search should be case sensitive", "default": False } }, "required": ["search_term"] } ) ] async def handle_tool_call(self, name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Handle tool calls for logging operations.""" try: if name == "query_logs": return await self._query_logs(arguments) elif name == "analyze_error_logs": return await self._analyze_error_logs(arguments) elif name == "get_recent_errors": return await self._get_recent_errors(arguments) elif name == "search_logs_by_message": return await self._search_logs_by_message(arguments) else: raise ValidationError(f"Unknown logging tool: {name}") except Exception as e: logger.error("Logging tool failed", tool=name, error=str(e)) return [TextContent(type="text", text=f"Error executing {name}: {str(e)}")] async def _query_logs(self, args: Dict[str, Any]) -> List[TextContent]: """Query logs with specified filters.""" project_id = args.get("project_id") or self.authenticator.get_project_id() filter_str = args.get("filter", "") limit = min(args.get("limit", 100), self.config.max_results) # Parse time range start_time = self._parse_time(args.get("start_time")) end_time = self._parse_time(args.get("end_time")) if args.get("end_time") else datetime.utcnow() # Build filter filter_parts = [] if filter_str: filter_parts.append(filter_str) if start_time: filter_parts.append(f'timestamp>="{start_time.isoformat()}Z"') if end_time: filter_parts.append(f'timestamp<="{end_time.isoformat()}Z"') final_filter = " AND ".join(filter_parts) if filter_parts else "" # Query logs client = self.authenticator.logging_client entries = client.list_entries( resource_names=[f"projects/{project_id}"], filter_=final_filter, order_by="timestamp desc", page_size=limit ) # Format results results = [] count = 0 try: for entry in entries: if count >= limit: break # Safe payload extraction try: if hasattr(entry.payload, 'get'): # JSON payload message = json.dumps(entry.payload) else: # Text payload message = str(entry.payload) except Exception: message = "<unable to parse log payload>" results.append({ "timestamp": entry.timestamp.isoformat() if entry.timestamp else None, "severity": entry.severity, "resource": { "type": entry.resource.type if entry.resource else None, "labels": dict(entry.resource.labels) if entry.resource else {} }, "message": message, "log_name": entry.log_name, "labels": dict(entry.labels) if entry.labels else {} }) count += 1 except Exception as e: logger.warning("Error processing log entries", error=str(e)) if not results: raise GCPServiceError(f"Failed to process log entries: {e}") summary = f"Found {len(results)} log entries" if final_filter: summary += f" matching filter: {final_filter}" response = { "summary": summary, "count": len(results), "entries": results } return [TextContent(type="text", text=json.dumps(response, indent=2))] async def _analyze_error_logs(self, args: Dict[str, Any]) -> List[TextContent]: """Analyze error logs for patterns.""" project_id = args.get("project_id") or self.authenticator.get_project_id() service_name = args.get("service_name") time_range = args.get("time_range", "1h") severity = args.get("severity", "ERROR") # Build filter for errors start_time = datetime.utcnow() - self._parse_duration(time_range) filter_parts = [ f'timestamp>="{start_time.isoformat()}Z"', f'severity>={severity}' ] if service_name: filter_parts.append(f'resource.labels.service_name="{service_name}"') filter_str = " AND ".join(filter_parts) # Query error logs client = self.authenticator.logging_client entries = client.list_entries( resource_names=[f"projects/{project_id}"], filter_=filter_str, order_by="timestamp desc", page_size=500 ) # Analyze patterns error_patterns = {} error_timeline = [] resource_errors = {} for entry in entries: # Safe payload extraction try: if hasattr(entry.payload, 'get'): # JSON payload message = json.dumps(entry.payload) else: # Text payload message = str(entry.payload) except Exception: message = "<unable to parse log payload>" timestamp = entry.timestamp.isoformat() if entry.timestamp else "unknown" resource_type = entry.resource.type if entry.resource else "unknown" # Count error patterns if message in error_patterns: error_patterns[message] += 1 else: error_patterns[message] = 1 # Track timeline error_timeline.append({ "timestamp": timestamp, "severity": entry.severity, "message": message[:200] + "..." if len(message) > 200 else message }) # Count by resource if resource_type in resource_errors: resource_errors[resource_type] += 1 else: resource_errors[resource_type] = 1 # Sort patterns by frequency top_patterns = sorted(error_patterns.items(), key=lambda x: x[1], reverse=True)[:10] analysis = { "summary": f"Analyzed {len(error_timeline)} error entries in the last {time_range}", "time_range": time_range, "total_errors": len(error_timeline), "top_error_patterns": [ {"message": msg, "count": count} for msg, count in top_patterns ], "errors_by_resource": resource_errors, "recent_errors": error_timeline[:20] } return [TextContent(type="text", text=json.dumps(analysis, indent=2))] async def _get_recent_errors(self, args: Dict[str, Any]) -> List[TextContent]: """Get recent error logs.""" project_id = args.get("project_id") or self.authenticator.get_project_id() count = min(args.get("count", 20), 100) resource_type = args.get("resource_type") # Build filter filter_parts = ["severity>=ERROR"] if resource_type: filter_parts.append(f'resource.type="{resource_type}"') filter_str = " AND ".join(filter_parts) # Query recent errors client = self.authenticator.logging_client entries = client.list_entries( resource_names=[f"projects/{project_id}"], filter_=filter_str, order_by="timestamp desc", page_size=count ) errors = [] for entry in entries: # Safe payload extraction try: if hasattr(entry.payload, 'get'): # JSON payload message = json.dumps(entry.payload) else: # Text payload message = str(entry.payload) except Exception: message = "<unable to parse log payload>" errors.append({ "timestamp": entry.timestamp.isoformat() if entry.timestamp else None, "severity": entry.severity, "resource": { "type": entry.resource.type if entry.resource else None, "labels": dict(entry.resource.labels) if entry.resource else {} }, "message": message, "log_name": entry.log_name }) response = { "summary": f"Retrieved {len(errors)} recent errors", "errors": errors } return [TextContent(type="text", text=json.dumps(response, indent=2))] async def _search_logs_by_message(self, args: Dict[str, Any]) -> List[TextContent]: """Search logs by message content.""" project_id = args.get("project_id") or self.authenticator.get_project_id() search_term = args["search_term"] time_range = args.get("time_range", "1h") case_sensitive = args.get("case_sensitive", False) # Build filter start_time = datetime.utcnow() - self._parse_duration(time_range) filter_parts = [ f'timestamp>="{start_time.isoformat()}Z"' ] # Add text search - Cloud Logging text search syntax # Use contains operator for case-insensitive search if case_sensitive: filter_parts.append(f'textPayload:"{search_term}"') else: # For case-insensitive, we'll search both textPayload and jsonPayload search_conditions = [ f'textPayload:"{search_term}"', f'jsonPayload.message:"{search_term}"', f'protoPayload.description:"{search_term}"' ] filter_parts.append(f'({" OR ".join(search_conditions)})') filter_str = " AND ".join(filter_parts) # Query logs client = self.authenticator.logging_client entries = client.list_entries( resource_names=[f"projects/{project_id}"], filter_=filter_str, order_by="timestamp desc", page_size=100 ) results = [] for entry in entries: # Safe payload extraction try: if hasattr(entry.payload, 'get'): # JSON payload message = json.dumps(entry.payload) else: # Text payload message = str(entry.payload) except Exception: message = "<unable to parse log payload>" results.append({ "timestamp": entry.timestamp.isoformat() if entry.timestamp else None, "severity": entry.severity, "message": message, "resource": entry.resource.type if entry.resource else None, "log_name": entry.log_name }) response = { "summary": f"Found {len(results)} log entries containing '{search_term}'", "search_term": search_term, "time_range": time_range, "results": results } return [TextContent(type="text", text=json.dumps(response, indent=2))] def _parse_time(self, time_str: Optional[str]) -> Optional[datetime]: """Parse time string to datetime.""" if not time_str: return None # Try relative time first (e.g., "1h", "30m", "1d") if time_str.endswith(('h', 'm', 'd', 's')): return datetime.utcnow() - self._parse_duration(time_str) # Try ISO format try: if 'T' in time_str: return datetime.fromisoformat(time_str.replace('Z', '+00:00')) else: # Try parsing as date only return datetime.fromisoformat(time_str + 'T00:00:00+00:00') except ValueError: raise ValidationError(f"Invalid time format: {time_str}. Use ISO format (2024-01-01T00:00:00Z) or relative (1h, 30m, 1d)") def _parse_duration(self, duration_str: str) -> timedelta: """Parse duration string to timedelta.""" try: if duration_str.endswith('s'): seconds = int(duration_str[:-1]) return timedelta(seconds=seconds) elif duration_str.endswith('m'): minutes = int(duration_str[:-1]) return timedelta(minutes=minutes) elif duration_str.endswith('h'): hours = int(duration_str[:-1]) return timedelta(hours=hours) elif duration_str.endswith('d'): days = int(duration_str[:-1]) return timedelta(days=days) else: raise ValidationError(f"Invalid duration format: {duration_str}. Use format like '1h', '30m', '1d', '60s'") except ValueError as e: raise ValidationError(f"Invalid duration format: {duration_str}. Use format like '1h', '30m', '1d', '60s'")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JayRajGoyal/gcp-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server