logging_tools.py•22.3 kB
"""Cloud Logging tools for MCP server."""
import json
import re
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Union
from collections import defaultdict, Counter
import structlog
from google.cloud import logging as cloud_logging
from google.cloud.logging import Resource
from mcp.types import Tool, TextContent
from pydantic import BaseModel, Field, validator
from ..auth import GCPAuthenticator
from ..config import Config
from ..exceptions import GCPServiceError, ValidationError
logger = structlog.get_logger(__name__)
class LogQuery(BaseModel):
"""Log query parameters."""
project_ids: List[str] = Field(default_factory=list, description="List of GCP project IDs")
filter: Optional[str] = Field(None, description="Cloud Logging filter expression")
start_time: Optional[str] = Field(None, description="Start time (ISO or relative)")
end_time: Optional[str] = Field(None, description="End time (ISO or relative)")
limit: int = Field(100, ge=1, le=10000, description="Maximum results")
order_by: str = Field("timestamp desc", description="Sort order")
resource_types: List[str] = Field(default_factory=list, description="Resource types to filter")
severities: List[str] = Field(default_factory=list, description="Log severities to include")
include_audit_logs: bool = Field(False, description="Include audit logs")
@validator('severities')
def validate_severities(cls, v):
valid_severities = {'DEFAULT', 'DEBUG', 'INFO', 'NOTICE', 'WARNING', 'ERROR', 'CRITICAL', 'ALERT', 'EMERGENCY'}
for severity in v:
if severity.upper() not in valid_severities:
raise ValueError(f"Invalid severity: {severity}")
return [s.upper() for s in v]
class LogAnalysis(BaseModel):
"""Log analysis results."""
total_entries: int
time_range: str
error_patterns: List[Dict[str, Any]]
resource_breakdown: Dict[str, int]
severity_distribution: Dict[str, int]
timeline: List[Dict[str, Any]]
recommendations: List[str]
class SecurityLogQuery(BaseModel):
"""Security-focused log query."""
project_ids: List[str] = Field(default_factory=list)
time_range: str = Field("24h", description="Time range for security analysis")
include_audit: bool = Field(True, description="Include audit logs")
include_vpc: bool = Field(True, description="Include VPC flow logs")
include_dns: bool = Field(True, description="Include DNS logs")
threat_indicators: List[str] = Field(default_factory=list, description="Custom threat indicators")
class LoggingTools:
"""Cloud Logging tools for analyzing GCP logs."""
def __init__(self, authenticator: GCPAuthenticator, config: Config):
"""Initialize logging tools.
Args:
authenticator: GCP authenticator instance
config: Configuration object
"""
self.authenticator = authenticator
self.config = config
async def initialize(self) -> None:
"""Initialize the logging tools."""
if not self.authenticator.logging_client:
raise GCPServiceError("Logging client not initialized")
logger.info("Logging tools initialized")
async def get_tools(self) -> List[Tool]:
"""Get available logging tools."""
return [
Tool(
name="query_logs",
description="Query GCP Cloud Logging for log entries. Useful for investigating issues, finding errors, and analyzing application behavior.",
inputSchema={
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "GCP project ID (optional, uses default if not specified)"
},
"filter": {
"type": "string",
"description": "Log filter using Cloud Logging syntax (e.g., 'severity>=ERROR', 'resource.type=\"gce_instance\"')"
},
"start_time": {
"type": "string",
"description": "Start time in ISO format (e.g., '2024-01-01T00:00:00Z') or relative (e.g., '1h', '30m', '1d')"
},
"end_time": {
"type": "string",
"description": "End time in ISO format (optional, defaults to now)"
},
"limit": {
"type": "integer",
"description": "Maximum number of log entries to return (default: 100, max: 1000)",
"minimum": 1,
"maximum": 1000
}
},
"required": []
}
),
Tool(
name="analyze_error_logs",
description="Analyze error logs to identify patterns, common issues, and root causes. Great for troubleshooting and incident response.",
inputSchema={
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "GCP project ID (optional)"
},
"service_name": {
"type": "string",
"description": "Service or application name to focus analysis on"
},
"time_range": {
"type": "string",
"description": "Time range for analysis (e.g., '1h', '6h', '1d', '7d')",
"default": "1h"
},
"severity": {
"type": "string",
"description": "Minimum severity level (DEBUG, INFO, NOTICE, WARNING, ERROR, CRITICAL, ALERT, EMERGENCY)",
"default": "ERROR"
}
},
"required": []
}
),
Tool(
name="get_recent_errors",
description="Get the most recent error logs for quick troubleshooting. Perfect for immediate incident response.",
inputSchema={
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "GCP project ID (optional)"
},
"count": {
"type": "integer",
"description": "Number of recent errors to retrieve (default: 20, max: 100)",
"minimum": 1,
"maximum": 100,
"default": 20
},
"resource_type": {
"type": "string",
"description": "GCP resource type to filter by (e.g., 'gce_instance', 'k8s_container', 'cloud_function')"
}
},
"required": []
}
),
Tool(
name="search_logs_by_message",
description="Search logs by message content or pattern. Useful for finding specific events or tracking particular operations.",
inputSchema={
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "GCP project ID (optional)"
},
"search_term": {
"type": "string",
"description": "Text to search for in log messages"
},
"time_range": {
"type": "string",
"description": "Time range to search (e.g., '1h', '6h', '1d')",
"default": "1h"
},
"case_sensitive": {
"type": "boolean",
"description": "Whether search should be case sensitive",
"default": False
}
},
"required": ["search_term"]
}
)
]
async def handle_tool_call(self, name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle tool calls for logging operations."""
try:
if name == "query_logs":
return await self._query_logs(arguments)
elif name == "analyze_error_logs":
return await self._analyze_error_logs(arguments)
elif name == "get_recent_errors":
return await self._get_recent_errors(arguments)
elif name == "search_logs_by_message":
return await self._search_logs_by_message(arguments)
else:
raise ValidationError(f"Unknown logging tool: {name}")
except Exception as e:
logger.error("Logging tool failed", tool=name, error=str(e))
return [TextContent(type="text", text=f"Error executing {name}: {str(e)}")]
async def _query_logs(self, args: Dict[str, Any]) -> List[TextContent]:
"""Query logs with specified filters."""
project_id = args.get("project_id") or self.authenticator.get_project_id()
filter_str = args.get("filter", "")
limit = min(args.get("limit", 100), self.config.max_results)
# Parse time range
start_time = self._parse_time(args.get("start_time"))
end_time = self._parse_time(args.get("end_time")) if args.get("end_time") else datetime.utcnow()
# Build filter
filter_parts = []
if filter_str:
filter_parts.append(filter_str)
if start_time:
filter_parts.append(f'timestamp>="{start_time.isoformat()}Z"')
if end_time:
filter_parts.append(f'timestamp<="{end_time.isoformat()}Z"')
final_filter = " AND ".join(filter_parts) if filter_parts else ""
# Query logs
client = self.authenticator.logging_client
entries = client.list_entries(
resource_names=[f"projects/{project_id}"],
filter_=final_filter,
order_by="timestamp desc",
page_size=limit
)
# Format results
results = []
count = 0
try:
for entry in entries:
if count >= limit:
break
# Safe payload extraction
try:
if hasattr(entry.payload, 'get'):
# JSON payload
message = json.dumps(entry.payload)
else:
# Text payload
message = str(entry.payload)
except Exception:
message = "<unable to parse log payload>"
results.append({
"timestamp": entry.timestamp.isoformat() if entry.timestamp else None,
"severity": entry.severity,
"resource": {
"type": entry.resource.type if entry.resource else None,
"labels": dict(entry.resource.labels) if entry.resource else {}
},
"message": message,
"log_name": entry.log_name,
"labels": dict(entry.labels) if entry.labels else {}
})
count += 1
except Exception as e:
logger.warning("Error processing log entries", error=str(e))
if not results:
raise GCPServiceError(f"Failed to process log entries: {e}")
summary = f"Found {len(results)} log entries"
if final_filter:
summary += f" matching filter: {final_filter}"
response = {
"summary": summary,
"count": len(results),
"entries": results
}
return [TextContent(type="text", text=json.dumps(response, indent=2))]
async def _analyze_error_logs(self, args: Dict[str, Any]) -> List[TextContent]:
"""Analyze error logs for patterns."""
project_id = args.get("project_id") or self.authenticator.get_project_id()
service_name = args.get("service_name")
time_range = args.get("time_range", "1h")
severity = args.get("severity", "ERROR")
# Build filter for errors
start_time = datetime.utcnow() - self._parse_duration(time_range)
filter_parts = [
f'timestamp>="{start_time.isoformat()}Z"',
f'severity>={severity}'
]
if service_name:
filter_parts.append(f'resource.labels.service_name="{service_name}"')
filter_str = " AND ".join(filter_parts)
# Query error logs
client = self.authenticator.logging_client
entries = client.list_entries(
resource_names=[f"projects/{project_id}"],
filter_=filter_str,
order_by="timestamp desc",
page_size=500
)
# Analyze patterns
error_patterns = {}
error_timeline = []
resource_errors = {}
for entry in entries:
# Safe payload extraction
try:
if hasattr(entry.payload, 'get'):
# JSON payload
message = json.dumps(entry.payload)
else:
# Text payload
message = str(entry.payload)
except Exception:
message = "<unable to parse log payload>"
timestamp = entry.timestamp.isoformat() if entry.timestamp else "unknown"
resource_type = entry.resource.type if entry.resource else "unknown"
# Count error patterns
if message in error_patterns:
error_patterns[message] += 1
else:
error_patterns[message] = 1
# Track timeline
error_timeline.append({
"timestamp": timestamp,
"severity": entry.severity,
"message": message[:200] + "..." if len(message) > 200 else message
})
# Count by resource
if resource_type in resource_errors:
resource_errors[resource_type] += 1
else:
resource_errors[resource_type] = 1
# Sort patterns by frequency
top_patterns = sorted(error_patterns.items(), key=lambda x: x[1], reverse=True)[:10]
analysis = {
"summary": f"Analyzed {len(error_timeline)} error entries in the last {time_range}",
"time_range": time_range,
"total_errors": len(error_timeline),
"top_error_patterns": [
{"message": msg, "count": count} for msg, count in top_patterns
],
"errors_by_resource": resource_errors,
"recent_errors": error_timeline[:20]
}
return [TextContent(type="text", text=json.dumps(analysis, indent=2))]
async def _get_recent_errors(self, args: Dict[str, Any]) -> List[TextContent]:
"""Get recent error logs."""
project_id = args.get("project_id") or self.authenticator.get_project_id()
count = min(args.get("count", 20), 100)
resource_type = args.get("resource_type")
# Build filter
filter_parts = ["severity>=ERROR"]
if resource_type:
filter_parts.append(f'resource.type="{resource_type}"')
filter_str = " AND ".join(filter_parts)
# Query recent errors
client = self.authenticator.logging_client
entries = client.list_entries(
resource_names=[f"projects/{project_id}"],
filter_=filter_str,
order_by="timestamp desc",
page_size=count
)
errors = []
for entry in entries:
# Safe payload extraction
try:
if hasattr(entry.payload, 'get'):
# JSON payload
message = json.dumps(entry.payload)
else:
# Text payload
message = str(entry.payload)
except Exception:
message = "<unable to parse log payload>"
errors.append({
"timestamp": entry.timestamp.isoformat() if entry.timestamp else None,
"severity": entry.severity,
"resource": {
"type": entry.resource.type if entry.resource else None,
"labels": dict(entry.resource.labels) if entry.resource else {}
},
"message": message,
"log_name": entry.log_name
})
response = {
"summary": f"Retrieved {len(errors)} recent errors",
"errors": errors
}
return [TextContent(type="text", text=json.dumps(response, indent=2))]
async def _search_logs_by_message(self, args: Dict[str, Any]) -> List[TextContent]:
"""Search logs by message content."""
project_id = args.get("project_id") or self.authenticator.get_project_id()
search_term = args["search_term"]
time_range = args.get("time_range", "1h")
case_sensitive = args.get("case_sensitive", False)
# Build filter
start_time = datetime.utcnow() - self._parse_duration(time_range)
filter_parts = [
f'timestamp>="{start_time.isoformat()}Z"'
]
# Add text search - Cloud Logging text search syntax
# Use contains operator for case-insensitive search
if case_sensitive:
filter_parts.append(f'textPayload:"{search_term}"')
else:
# For case-insensitive, we'll search both textPayload and jsonPayload
search_conditions = [
f'textPayload:"{search_term}"',
f'jsonPayload.message:"{search_term}"',
f'protoPayload.description:"{search_term}"'
]
filter_parts.append(f'({" OR ".join(search_conditions)})')
filter_str = " AND ".join(filter_parts)
# Query logs
client = self.authenticator.logging_client
entries = client.list_entries(
resource_names=[f"projects/{project_id}"],
filter_=filter_str,
order_by="timestamp desc",
page_size=100
)
results = []
for entry in entries:
# Safe payload extraction
try:
if hasattr(entry.payload, 'get'):
# JSON payload
message = json.dumps(entry.payload)
else:
# Text payload
message = str(entry.payload)
except Exception:
message = "<unable to parse log payload>"
results.append({
"timestamp": entry.timestamp.isoformat() if entry.timestamp else None,
"severity": entry.severity,
"message": message,
"resource": entry.resource.type if entry.resource else None,
"log_name": entry.log_name
})
response = {
"summary": f"Found {len(results)} log entries containing '{search_term}'",
"search_term": search_term,
"time_range": time_range,
"results": results
}
return [TextContent(type="text", text=json.dumps(response, indent=2))]
def _parse_time(self, time_str: Optional[str]) -> Optional[datetime]:
"""Parse time string to datetime."""
if not time_str:
return None
# Try relative time first (e.g., "1h", "30m", "1d")
if time_str.endswith(('h', 'm', 'd', 's')):
return datetime.utcnow() - self._parse_duration(time_str)
# Try ISO format
try:
if 'T' in time_str:
return datetime.fromisoformat(time_str.replace('Z', '+00:00'))
else:
# Try parsing as date only
return datetime.fromisoformat(time_str + 'T00:00:00+00:00')
except ValueError:
raise ValidationError(f"Invalid time format: {time_str}. Use ISO format (2024-01-01T00:00:00Z) or relative (1h, 30m, 1d)")
def _parse_duration(self, duration_str: str) -> timedelta:
"""Parse duration string to timedelta."""
try:
if duration_str.endswith('s'):
seconds = int(duration_str[:-1])
return timedelta(seconds=seconds)
elif duration_str.endswith('m'):
minutes = int(duration_str[:-1])
return timedelta(minutes=minutes)
elif duration_str.endswith('h'):
hours = int(duration_str[:-1])
return timedelta(hours=hours)
elif duration_str.endswith('d'):
days = int(duration_str[:-1])
return timedelta(days=days)
else:
raise ValidationError(f"Invalid duration format: {duration_str}. Use format like '1h', '30m', '1d', '60s'")
except ValueError as e:
raise ValidationError(f"Invalid duration format: {duration_str}. Use format like '1h', '30m', '1d', '60s'")