Skip to main content
Glama
awslabs

Log Analyzer with MCP

Official
by awslabs
analysis_tools.py6.68 kB
#!/usr/bin/env python3 # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import asyncio import boto3 import json from datetime import datetime from . import handle_exceptions from .utils import get_time_range class CloudWatchLogsAnalysisTools: """Tools for analyzing CloudWatch Logs data.""" def __init__(self, profile_name=None, region_name=None): """Initialize the CloudWatch Logs client. Args: profile_name: Optional AWS profile name to use for credentials region_name: Optional AWS region name to use for API calls """ # Initialize boto3 CloudWatch Logs client using specified profile/region or default credential chain self.profile_name = profile_name self.region_name = region_name session = boto3.Session(profile_name=profile_name, region_name=region_name) self.logs_client = session.client("logs") @handle_exceptions async def summarize_log_activity( self, log_group_name: str, hours: int = 24, start_time: str = None, end_time: str = None, ) -> str: """ Generate a summary of log activity over a specified time period. Args: log_group_name: The log group to analyze hours: Number of hours to look back start_time: Start time in ISO8601 format end_time: End time in ISO8601 format Returns: JSON string with activity summary """ start_ts, end_ts = get_time_range(hours, start_time, end_time) # Use CloudWatch Logs Insights to get a summary query = """ stats count(*) as logEvents, count_distinct(stream) as streams | sort @timestamp desc | limit 1000 """ # Start the query start_query_response = self.logs_client.start_query( logGroupName=log_group_name, startTime=start_ts, endTime=end_ts, queryString=query, ) query_id = start_query_response["queryId"] # Poll for query results response = None while response is None or response["status"] == "Running": await asyncio.sleep(1) # Wait before checking again response = self.logs_client.get_query_results(queryId=query_id) # Get the hourly distribution hourly_query = """ stats count(*) as count by bin(1h) | sort @timestamp desc | limit 24 """ # Start the hourly query hourly_query_response = self.logs_client.start_query( logGroupName=log_group_name, startTime=start_ts, endTime=end_ts, queryString=hourly_query, ) hourly_query_id = hourly_query_response["queryId"] # Poll for hourly query results hourly_response = None while hourly_response is None or hourly_response["status"] == "Running": await asyncio.sleep(1) # Wait before checking again hourly_response = self.logs_client.get_query_results( queryId=hourly_query_id ) # Process the main summary results summary = { "timeRange": { "start": datetime.fromtimestamp(start_ts / 1000).isoformat(), "end": datetime.fromtimestamp(end_ts / 1000).isoformat(), "hours": hours, }, "logEvents": 0, "uniqueStreams": 0, "hourlyDistribution": [], } # Extract the main stats for result in response.get("results", []): for field in result: if field["field"] == "logEvents": summary["logEvents"] = int(field["value"]) elif field["field"] == "streams": summary["uniqueStreams"] = int(field["value"]) # Extract the hourly distribution for result in hourly_response.get("results", []): hour_data = {} for field in result: if field["field"] == "bin(1h)": hour_data["hour"] = field["value"] elif field["field"] == "count": hour_data["count"] = int(field["value"]) if hour_data: summary["hourlyDistribution"].append(hour_data) return json.dumps(summary, indent=2) @handle_exceptions async def find_error_patterns( self, log_group_name: str, hours: int = 24, start_time: str = None, end_time: str = None, ) -> str: """ Find common error patterns in logs. Args: log_group_name: The log group to analyze hours: Number of hours to look back start_time: Start time in ISO8601 format end_time: End time in ISO8601 format Returns: JSON string with error patterns """ start_ts, end_ts = get_time_range(hours, start_time, end_time) # Query for error logs error_query = """ filter @message like /(?i)(error|exception|fail|traceback)/ | stats count(*) as errorCount by @message | sort errorCount desc | limit 20 """ # Start the query start_query_response = self.logs_client.start_query( logGroupName=log_group_name, startTime=start_ts, endTime=end_ts, queryString=error_query, ) query_id = start_query_response["queryId"] # Poll for query results response = None while response is None or response["status"] == "Running": await asyncio.sleep(1) # Wait before checking again response = self.logs_client.get_query_results(queryId=query_id) # Process the results error_patterns = { "timeRange": { "start": datetime.fromtimestamp(start_ts / 1000).isoformat(), "end": datetime.fromtimestamp(end_ts / 1000).isoformat(), "hours": hours, }, "errorPatterns": [], } for result in response.get("results", []): pattern = {} for field in result: if field["field"] == "@message": pattern["message"] = field["value"] elif field["field"] == "errorCount": pattern["count"] = int(field["value"]) if pattern: error_patterns["errorPatterns"].append(pattern) return json.dumps(error_patterns, indent=2)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/awslabs/Log-Analyzer-with-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server