Skip to main content
Glama

CloudWatch Logs MCP Server

by serkanh
main.py8.58 kB
#!/usr/bin/env python3 # /// script # dependencies = [ # "mcp[cli]>=1.6.0", # Note: Comma is required between dependencies # "boto3>=1.28.0" # ] # /// """ CloudWatch Logs MCP Server An MCP server that provides tools for accessing AWS CloudWatch logs. """ import json import logging import sys from datetime import datetime, timedelta from typing import Dict, Optional, Any import traceback import os import boto3 from botocore.exceptions import BotoCoreError, ClientError from mcp.server.fastmcp import FastMCP # Configure logging to stderr logging.basicConfig( level=logging.INFO, # Default to INFO level format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stderr) ] ) logger = logging.getLogger("cloudwatch-logs-mcp") # Initialize the MCP server logger.info("Starting CloudWatch Logs MCP Server") logger.debug(f"Python version: {sys.version}") logger.debug(f"Current working directory: {os.getcwd()}") try: logger.info("Initializing CloudWatch Logs MCP") mcp = FastMCP("cloudwatch-logs") logger.info("MCP server initialized successfully") except Exception as e: logger.error(f"Failed to initialize MCP server: {str(e)}", exc_info=True) sys.exit(1) @mcp.tool( description="List available CloudWatch log groups" ) async def list_groups( prefix: Optional[str] = None, region: Optional[str] = None, accessKeyId: Optional[str] = None, secretAccessKey: Optional[str] = None, sessionToken: Optional[str] = None, ) -> str: """List available CloudWatch log groups.""" try: logger.info(f"Listing CloudWatch log groups (prefix: {prefix}, region: {region})") client = _get_cloudwatch_client( region=region, access_key_id=accessKeyId, secret_access_key=secretAccessKey, session_token=sessionToken, ) # List log groups kwargs = {} if prefix: kwargs["logGroupNamePrefix"] = prefix response = client.describe_log_groups(**kwargs) log_groups = response.get("logGroups", []) # Format the response formatted_groups = [] for group in log_groups: formatted_groups.append( { "logGroupName": group.get("logGroupName"), "creationTime": group.get("creationTime"), "storedBytes": group.get("storedBytes"), } ) response_json = json.dumps(formatted_groups, ensure_ascii=True) logger.info(f"Returning {len(formatted_groups)} log groups") return response_json except Exception as e: logger.error(f"Error in list_groups: {str(e)}", exc_info=True) # Return error as JSON instead of raising return json.dumps({"error": str(e)}, ensure_ascii=True) @mcp.tool( description="Get CloudWatch logs from a specific log group and stream" ) async def get_logs( logGroupName: str, logStreamName: Optional[str] = None, startTime: Optional[str] = None, endTime: Optional[str] = None, filterPattern: Optional[str] = None, region: Optional[str] = None, accessKeyId: Optional[str] = None, secretAccessKey: Optional[str] = None, sessionToken: Optional[str] = None, ) -> str: """Get CloudWatch logs from a specific log group and stream.""" try: logger.info( f"Getting CloudWatch logs for group: {logGroupName}, stream: {logStreamName}, " f"startTime: {startTime}, endTime: {endTime}, filterPattern: {filterPattern}, " f"region: {region}" ) client = _get_cloudwatch_client( region=region, access_key_id=accessKeyId, secret_access_key=secretAccessKey, session_token=sessionToken, ) # Parse start and end times start_time_ms = None if startTime: start_time_ms = _parse_relative_time(startTime) end_time_ms = None if endTime: end_time_ms = _parse_relative_time(endTime) # Get logs kwargs = { "logGroupName": logGroupName, } if logStreamName: kwargs["logStreamNames"] = [logStreamName] if filterPattern: kwargs["filterPattern"] = filterPattern if start_time_ms: kwargs["startTime"] = start_time_ms if end_time_ms: kwargs["endTime"] = end_time_ms # Use filter_log_events for more flexible querying response = client.filter_log_events(**kwargs) events = response.get("events", []) # Format the response formatted_events = [] for event in events: timestamp = event.get("timestamp") if timestamp: try: timestamp = datetime.fromtimestamp(timestamp / 1000).isoformat() except Exception: timestamp = str(timestamp) formatted_events.append( { "timestamp": timestamp, "message": event.get("message"), "logStreamName": event.get("logStreamName"), } ) response_json = json.dumps(formatted_events, ensure_ascii=True, default=str) logger.info(f"Returning {len(formatted_events)} log events") return response_json except Exception as e: logger.error(f"Error in get_logs: {str(e)}", exc_info=True) # Return error as JSON return json.dumps({"error": str(e)}, ensure_ascii=True) def _get_cloudwatch_client( region: Optional[str] = None, access_key_id: Optional[str] = None, secret_access_key: Optional[str] = None, session_token: Optional[str] = None, ) -> Any: """Get a CloudWatch Logs client.""" # Create session with credentials if provided session_kwargs = {} if region: session_kwargs["region_name"] = region logger.debug(f"Using region: {region}") else: logger.debug("Using default region") if access_key_id and secret_access_key: logger.debug("Using provided AWS credentials") session_kwargs["aws_access_key_id"] = access_key_id session_kwargs["aws_secret_access_key"] = secret_access_key if session_token: session_kwargs["aws_session_token"] = session_token else: logger.debug("Using default AWS credentials") # Create session and client try: session = boto3.Session(**session_kwargs) return session.client("logs") except Exception as e: logger.error(f"Error creating CloudWatch client: {str(e)}", exc_info=True) raise def _parse_relative_time(time_str: str) -> Optional[int]: """Parse a relative time string into a timestamp.""" if not time_str: return None logger.debug(f"Parsing time string: {time_str}") # Check if it's an ISO format date try: dt = datetime.fromisoformat(time_str.replace("Z", "+00:00")) timestamp = int(dt.timestamp() * 1000) logger.debug(f"Parsed ISO format date: {dt.isoformat()}, timestamp: {timestamp}") return timestamp except ValueError: logger.debug(f"Not an ISO format date, trying relative time format") pass # Parse relative time units = {"s": 1, "m": 60, "h": 3600, "d": 86400} if time_str[-1] in units and time_str[:-1].isdigit(): value = int(time_str[:-1]) unit = time_str[-1] seconds = value * units[unit] dt = datetime.now() - timedelta(seconds=seconds) timestamp = int(dt.timestamp() * 1000) logger.debug(f"Parsed relative time: {value}{unit}, timestamp: {timestamp}") return timestamp error_msg = f"Invalid time format: {time_str}" logger.error(error_msg) raise ValueError(error_msg) if __name__ == "__main__": try: logger.info("Starting main execution block") # Skip AWS auth validation for now logger.info("Skipping AWS auth check to avoid early failures") # Run the MCP server without any extras logger.info("Starting CloudWatch Logs MCP server with stdio transport") # Explicitly handle synchronous initialization logger.info("Running MCP server...") mcp.run(transport="stdio") except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"FATAL ERROR: {str(e)}", exc_info=True) sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/serkanh/cloudwatch-logs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server