Skip to main content
Glama
awslabs

Log Analyzer with MCP

Official
by awslabs
cloudwatch_logs_resource.py20.6 kB
#!/usr/bin/env python3 # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import boto3 import json from datetime import datetime, timedelta from typing import Dict, List import re from collections import Counter class CloudWatchLogsResource: """Resource class for handling CloudWatch Logs resources.""" def __init__(self, profile_name=None, region_name=None): """Initialize the CloudWatch Logs resource client. Args: profile_name: Optional AWS profile name to use for credentials region_name: Optional AWS region name to use for API calls """ # Store the profile name and region for later use self.profile_name = profile_name self.region_name = region_name # Initialize boto3 CloudWatch Logs client using specified profile/region or default credential chain session = boto3.Session(profile_name=profile_name, region_name=region_name) self.logs_client = session.client("logs") def get_log_groups( self, prefix: str = None, limit: int = 50, next_token: str = None ) -> str: """ Get a list of CloudWatch Log Groups with optional filtering and pagination. Args: prefix: Optional prefix to filter log groups by name limit: Maximum number of log groups to return (default: 50) next_token: Token for pagination to get the next set of results Returns: JSON string with log groups information """ kwargs = {"limit": limit} if prefix: kwargs["logGroupNamePrefix"] = prefix if next_token: kwargs["nextToken"] = next_token response = self.logs_client.describe_log_groups(**kwargs) log_groups = response.get("logGroups", []) # Format the log groups information formatted_groups = [] for group in log_groups: formatted_groups.append( { "name": group.get("logGroupName"), "arn": group.get("arn"), "storedBytes": group.get("storedBytes"), "creationTime": datetime.fromtimestamp( group.get("creationTime", 0) / 1000 ).isoformat(), } ) # Include the nextToken if available result = {"logGroups": formatted_groups} if "nextToken" in response: result["nextToken"] = response["nextToken"] return json.dumps(result, indent=2) def get_log_group_details(self, log_group_name: str) -> str: """Get detailed information about a specific log group.""" try: response = self.logs_client.describe_log_groups( logGroupNamePrefix=log_group_name, limit=1 ) log_groups = response.get("logGroups", []) if not log_groups: return json.dumps( {"error": f"Log group '{log_group_name}' not found"}, indent=2 ) log_group = log_groups[0] # Get retention policy retention = "Never Expire" if "retentionInDays" in log_group: retention = f"{log_group['retentionInDays']} days" # Get metrics for the log group session = boto3.Session( profile_name=self.profile_name, region_name=self.region_name ) cloudwatch = session.client("cloudwatch") end_time = datetime.utcnow() start_time = end_time - timedelta(days=1) metrics_response = cloudwatch.get_metric_statistics( Namespace="AWS/Logs", MetricName="IncomingBytes", Dimensions=[ {"Name": "LogGroupName", "Value": log_group_name}, ], StartTime=start_time, EndTime=end_time, Period=3600, Statistics=["Sum"], ) # Format the detailed information details = { "name": log_group.get("logGroupName"), "arn": log_group.get("arn"), "storedBytes": log_group.get("storedBytes"), "creationTime": datetime.fromtimestamp( log_group.get("creationTime", 0) / 1000 ).isoformat(), "retentionPolicy": retention, "metricFilterCount": log_group.get("metricFilterCount", 0), "kmsKeyId": log_group.get("kmsKeyId", "Not encrypted with KMS"), "dailyIncomingBytes": [ {"timestamp": point["Timestamp"].isoformat(), "bytes": point["Sum"]} for point in metrics_response.get("Datapoints", []) ], } return json.dumps(details, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) def get_log_streams(self, log_group_name: str, limit: int = 20) -> str: """ Get a list of log streams for a specific log group. Args: log_group_name: The name of the log group limit: Maximum number of streams to return (default: 20) """ try: response = self.logs_client.describe_log_streams( logGroupName=log_group_name, orderBy="LastEventTime", descending=True, limit=limit, ) log_streams = response.get("logStreams", []) formatted_streams = [] for stream in log_streams: last_event_time = stream.get("lastEventTimestamp", 0) first_event_time = stream.get("firstEventTimestamp", 0) formatted_streams.append( { "name": stream.get("logStreamName"), "firstEventTime": datetime.fromtimestamp( first_event_time / 1000 ).isoformat() if first_event_time else None, "lastEventTime": datetime.fromtimestamp( last_event_time / 1000 ).isoformat() if last_event_time else None, "storedBytes": stream.get("storedBytes"), } ) return json.dumps(formatted_streams, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) def get_log_events( self, log_group_name: str, log_stream_name: str, limit: int = 100 ) -> str: """ Get log events from a specific log stream. Args: log_group_name: The name of the log group log_stream_name: The name of the log stream limit: Maximum number of events to return (default: 100) """ try: response = self.logs_client.get_log_events( logGroupName=log_group_name, logStreamName=log_stream_name, limit=limit, startFromHead=False, ) events = response.get("events", []) formatted_events = [] for event in events: formatted_events.append( { "timestamp": datetime.fromtimestamp( event.get("timestamp", 0) / 1000 ).isoformat(), "message": event.get("message"), "ingestionTime": datetime.fromtimestamp( event.get("ingestionTime", 0) / 1000 ).isoformat(), } ) return json.dumps(formatted_events, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) def get_log_sample(self, log_group_name: str, limit: int = 10) -> str: """Get a sample of recent logs from a log group.""" try: # First get the most recent stream stream_response = self.logs_client.describe_log_streams( logGroupName=log_group_name, orderBy="LastEventTime", descending=True, limit=1, ) log_streams = stream_response.get("logStreams", []) if not log_streams: return json.dumps( {"error": f"No streams found in log group '{log_group_name}'"}, indent=2, ) # Get events from the most recent stream log_stream_name = log_streams[0].get("logStreamName") response = self.logs_client.get_log_events( logGroupName=log_group_name, logStreamName=log_stream_name, limit=limit, startFromHead=False, ) events = response.get("events", []) formatted_events = [] for event in events: formatted_events.append( { "timestamp": datetime.fromtimestamp( event.get("timestamp", 0) / 1000 ).isoformat(), "message": event.get("message"), "streamName": log_stream_name, } ) return json.dumps( { "description": f"Sample of {len(formatted_events)} recent logs from '{log_group_name}'", "logStream": log_stream_name, "events": formatted_events, }, indent=2, ) except Exception as e: return json.dumps({"error": str(e)}, indent=2) def get_recent_errors(self, log_group_name: str, hours: int = 24) -> str: """Get recent error logs from a log group.""" try: # Calculate start time end_time = int(datetime.now().timestamp() * 1000) start_time = int( (datetime.now() - timedelta(hours=hours)).timestamp() * 1000 ) # Use filter_log_events to search for errors across all streams # Common error patterns to search for error_patterns = [ "ERROR", "Error", "error", "exception", "Exception", "EXCEPTION", "fail", "Fail", "FAIL", ] filter_pattern = " ".join([f'"{pattern}"' for pattern in error_patterns]) response = self.logs_client.filter_log_events( logGroupName=log_group_name, filterPattern=f"{filter_pattern}", startTime=start_time, endTime=end_time, limit=100, ) events = response.get("events", []) formatted_events = [] for event in events: formatted_events.append( { "timestamp": datetime.fromtimestamp( event.get("timestamp", 0) / 1000 ).isoformat(), "message": event.get("message"), "logStreamName": event.get("logStreamName"), } ) return json.dumps( { "description": f"Recent errors from '{log_group_name}' in the last {hours} hours", "totalErrors": len(formatted_events), "events": formatted_events, }, indent=2, ) except Exception as e: return json.dumps({"error": str(e)}, indent=2) def get_log_metrics(self, log_group_name: str, hours: int = 24) -> str: """Get log volume metrics for a log group.""" try: # Create CloudWatch client session = boto3.Session( profile_name=self.profile_name, region_name=self.region_name ) cloudwatch = session.client("cloudwatch") # Calculate start and end times end_time = datetime.utcnow() start_time = end_time - timedelta(hours=hours) # Get incoming bytes incoming_bytes = cloudwatch.get_metric_statistics( Namespace="AWS/Logs", MetricName="IncomingBytes", Dimensions=[ {"Name": "LogGroupName", "Value": log_group_name}, ], StartTime=start_time, EndTime=end_time, Period=3600, # 1 hour periods Statistics=["Sum"], ) # Get incoming log events incoming_events = cloudwatch.get_metric_statistics( Namespace="AWS/Logs", MetricName="IncomingLogEvents", Dimensions=[ {"Name": "LogGroupName", "Value": log_group_name}, ], StartTime=start_time, EndTime=end_time, Period=3600, # 1 hour periods Statistics=["Sum"], ) # Format metrics data bytes_datapoints = incoming_bytes.get("Datapoints", []) events_datapoints = incoming_events.get("Datapoints", []) bytes_datapoints.sort(key=lambda x: x["Timestamp"]) events_datapoints.sort(key=lambda x: x["Timestamp"]) bytes_data = [ {"timestamp": point["Timestamp"].isoformat(), "bytes": point["Sum"]} for point in bytes_datapoints ] events_data = [ {"timestamp": point["Timestamp"].isoformat(), "events": point["Sum"]} for point in events_datapoints ] # Calculate totals total_bytes = sum(point["Sum"] for point in bytes_datapoints) total_events = sum(point["Sum"] for point in events_datapoints) return json.dumps( { "description": f"Log metrics for '{log_group_name}' over the last {hours} hours", "totalBytes": total_bytes, "totalEvents": total_events, "bytesByHour": bytes_data, "eventsByHour": events_data, }, indent=2, ) except Exception as e: return json.dumps({"error": str(e)}, indent=2) def analyze_log_structure(self, log_group_name: str) -> str: """Analyze and provide information about the structure of logs.""" try: # Get a sample of logs to analyze sample_data = json.loads(self.get_log_sample(log_group_name, 50)) if "error" in sample_data: return json.dumps(sample_data, indent=2) events = sample_data.get("events", []) if not events: return json.dumps( {"error": "No log events found for analysis"}, indent=2 ) # Analyze the structure structure_info = { "description": f"Log structure analysis for '{log_group_name}'", "sampleSize": len(events), "format": self._detect_log_format(events), "commonPatterns": self._extract_common_patterns(events), "fieldAnalysis": self._analyze_fields(events), } return json.dumps(structure_info, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) def _detect_log_format(self, events: List[Dict]) -> str: """Detect the format of logs (JSON, plaintext, etc.).""" json_count = 0 key_value_count = 0 xml_count = 0 for event in events: message = event.get("message", "") # Check for JSON format if message.strip().startswith("{") and message.strip().endswith("}"): try: json.loads(message) json_count += 1 continue except json.JSONDecodeError: pass # Check for XML format if message.strip().startswith("<") and message.strip().endswith(">"): xml_count += 1 continue # Check for key-value pairs if re.search(r"\w+=[\'\"][^\'\"]*[\'\"]|\w+=\S+", message): key_value_count += 1 total = len(events) if json_count > total * 0.7: return "JSON" elif xml_count > total * 0.7: return "XML" elif key_value_count > total * 0.7: return "Key-Value Pairs" else: return "Plaintext/Unstructured" def _extract_common_patterns(self, events: List[Dict]) -> Dict: """Extract common patterns from log messages.""" # Look for common log patterns level_pattern = re.compile( r"\b(DEBUG|INFO|WARN|WARNING|ERROR|FATAL|CRITICAL)\b" ) timestamp_patterns = [ re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"), # ISO format re.compile( r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}" ), # Common datetime format re.compile(r"\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2}"), # MM/DD/YYYY format ] # Count occurrences levels = Counter() has_timestamp = 0 for event in events: message = event.get("message", "") # Check log levels level_match = level_pattern.search(message) if level_match: levels[level_match.group(0)] += 1 # Check timestamps in message content (not event timestamp) for pattern in timestamp_patterns: if pattern.search(message): has_timestamp += 1 break return { "logLevels": dict(levels), "containsTimestamp": has_timestamp, "timestampPercentage": round((has_timestamp / len(events)) * 100, 2) if events else 0, } def _analyze_fields(self, events: List[Dict]) -> Dict: """Analyze fields in structured log messages.""" format_type = self._detect_log_format(events) if format_type == "JSON": # Try to extract fields from JSON logs fields_count = Counter() for event in events: message = event.get("message", "") try: json_data = json.loads(message) for key in json_data.keys(): fields_count[key] += 1 except json.JSONDecodeError: continue # Get the most common fields common_fields = [ { "field": field, "occurrences": count, "percentage": round((count / len(events)) * 100, 2), } for field, count in fields_count.most_common(10) ] return {"commonFields": common_fields, "uniqueFields": len(fields_count)} elif format_type == "Key-Value Pairs": # Try to extract key-value pairs key_pattern = re.compile(r"(\w+)=[\'\"]?([^\'\"\s]*)[\'\"]?") fields_count = Counter() for event in events: message = event.get("message", "") matches = key_pattern.findall(message) for key, _ in matches: fields_count[key] += 1 # Get the most common fields common_fields = [ { "field": field, "occurrences": count, "percentage": round((count / len(events)) * 100, 2), } for field, count in fields_count.most_common(10) ] return {"commonFields": common_fields, "uniqueFields": len(fields_count)} else: return { "analysis": f"Field analysis not applicable for {format_type} format" }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/awslabs/Log-Analyzer-with-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server