Skip to main content
Glama
awslabs

Log Analyzer with MCP

Official
by awslabs
client.py24.1 kB
#!/usr/bin/env python3 # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import asyncio import argparse import json import sys import os from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client # Set up argument parser for the CLI parser = argparse.ArgumentParser(description="CloudWatch Logs MCP Client") parser.add_argument( "--profile", type=str, help="AWS profile name to use for credentials" ) parser.add_argument("--region", type=str, help="AWS region name to use for API calls") subparsers = parser.add_subparsers(dest="command", help="Command to execute") # List log groups command list_groups_parser = subparsers.add_parser( "list-groups", help="List CloudWatch log groups" ) list_groups_parser.add_argument("--prefix", help="Filter log groups by name prefix") list_groups_parser.add_argument( "--limit", type=int, default=50, help="Maximum number of log groups to return (default: 50)", ) list_groups_parser.add_argument( "--next-token", help="Token for pagination to get the next set of results" ) list_groups_parser.add_argument( "--use-tool", action="store_true", help="Use the tool interface instead of resource" ) list_groups_parser.add_argument( "--profile", help="AWS profile name to use for credentials" ) list_groups_parser.add_argument("--region", help="AWS region name to use for API calls") # Get log group details command group_details_parser = subparsers.add_parser( "group-details", help="Get detailed information about a log group" ) group_details_parser.add_argument("log_group_name", help="The name of the log group") group_details_parser.add_argument( "--profile", help="AWS profile name to use for credentials" ) group_details_parser.add_argument( "--region", help="AWS region name to use for API calls" ) # List log streams command list_streams_parser = subparsers.add_parser( "list-streams", help="List log streams for a specific log group" ) list_streams_parser.add_argument("log_group_name", help="The name of the log group") list_streams_parser.add_argument( "--profile", help="AWS profile name to use for credentials" ) list_streams_parser.add_argument( "--region", help="AWS region name to use for API calls" ) # Get log events command get_events_parser = subparsers.add_parser( "get-events", help="Get log events from a specific log stream" ) get_events_parser.add_argument("log_group_name", help="The name of the log group") get_events_parser.add_argument("log_stream_name", help="The name of the log stream") get_events_parser.add_argument( "--profile", help="AWS profile name to use for credentials" ) get_events_parser.add_argument("--region", help="AWS region name to use for API calls") # Get log sample command sample_parser = subparsers.add_parser( "sample", help="Get a sample of recent logs from a log group" ) sample_parser.add_argument("log_group_name", help="The name of the log group") sample_parser.add_argument( "--limit", type=int, default=10, help="Number of logs to sample (default: 10)" ) sample_parser.add_argument("--profile", help="AWS profile name to use for credentials") sample_parser.add_argument("--region", help="AWS region name to use for API calls") # Get recent errors command errors_parser = subparsers.add_parser( "recent-errors", help="Get recent error logs from a log group" ) errors_parser.add_argument( "log_group_name", help="The name of the log group to analyze" ) errors_parser.add_argument( "--hours", type=int, default=24, help="Number of hours to look back (default: 24)" ) errors_parser.add_argument("--profile", help="AWS profile name to use for credentials") errors_parser.add_argument("--region", help="AWS region name to use for API calls") # Get log metrics command metrics_parser = subparsers.add_parser( "metrics", help="Get log volume metrics for a log group" ) metrics_parser.add_argument( "log_group_name", help="The name of the log group to analyze" ) metrics_parser.add_argument( "--hours", type=int, default=24, help="Number of hours to look back (default: 24)" ) metrics_parser.add_argument("--profile", help="AWS profile name to use for credentials") metrics_parser.add_argument("--region", help="AWS region name to use for API calls") # Analyze log structure command structure_parser = subparsers.add_parser( "structure", help="Analyze the structure of logs in a log group" ) structure_parser.add_argument( "log_group_name", help="The name of the log group to analyze" ) structure_parser.add_argument( "--profile", help="AWS profile name to use for credentials" ) structure_parser.add_argument("--region", help="AWS region name to use for API calls") # Get analyze logs prompt command prompt_parser = subparsers.add_parser( "get-prompt", help="Get a prompt for analyzing CloudWatch logs" ) prompt_parser.add_argument( "log_group_name", help="The name of the log group to analyze" ) prompt_parser.add_argument("--profile", help="AWS profile name to use for credentials") prompt_parser.add_argument("--region", help="AWS region name to use for API calls") # Get list groups prompt command list_prompt_parser = subparsers.add_parser( "list-prompt", help="Get a prompt for listing CloudWatch log groups" ) list_prompt_parser.add_argument( "--prefix", help="Optional prefix to filter log groups by name" ) list_prompt_parser.add_argument( "--profile", help="AWS profile name to use for credentials" ) list_prompt_parser.add_argument("--region", help="AWS region name to use for API calls") # Search logs command search_parser = subparsers.add_parser( "search", help="Search for patterns in CloudWatch logs" ) search_parser.add_argument("log_group_name", help="The name of the log group to search") search_parser.add_argument( "query", help="The search query (CloudWatch Logs Insights syntax)" ) search_parser.add_argument( "--hours", type=int, default=24, help="Number of hours to look back (default: 24)" ) search_parser.add_argument( "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)" ) search_parser.add_argument( "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)" ) search_parser.add_argument("--profile", help="AWS profile name to use for credentials") search_parser.add_argument("--region", help="AWS region name to use for API calls") # Search multiple log groups command search_multi_parser = subparsers.add_parser( "search-multi", help="Search for patterns across multiple CloudWatch log groups" ) search_multi_parser.add_argument( "log_group_names", nargs="+", help="List of log group names to search" ) search_multi_parser.add_argument( "query", help="The search query (CloudWatch Logs Insights syntax)" ) search_multi_parser.add_argument( "--hours", type=int, default=24, help="Number of hours to look back (default: 24)" ) search_multi_parser.add_argument( "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)" ) search_multi_parser.add_argument( "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)" ) search_multi_parser.add_argument( "--profile", help="AWS profile name to use for credentials" ) search_multi_parser.add_argument( "--region", help="AWS region name to use for API calls" ) # Summarize log activity command summarize_parser = subparsers.add_parser( "summarize", help="Generate a summary of log activity" ) summarize_parser.add_argument( "log_group_name", help="The name of the log group to analyze" ) summarize_parser.add_argument( "--hours", type=int, default=24, help="Number of hours to look back (default: 24)" ) summarize_parser.add_argument( "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)" ) summarize_parser.add_argument( "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)" ) summarize_parser.add_argument( "--profile", help="AWS profile name to use for credentials" ) summarize_parser.add_argument("--region", help="AWS region name to use for API calls") # Find error patterns command errors_parser = subparsers.add_parser( "find-errors", help="Find common error patterns in logs" ) errors_parser.add_argument( "log_group_name", help="The name of the log group to analyze" ) errors_parser.add_argument( "--hours", type=int, default=24, help="Number of hours to look back (default: 24)" ) errors_parser.add_argument( "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)" ) errors_parser.add_argument( "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)" ) errors_parser.add_argument("--profile", help="AWS profile name to use for credentials") errors_parser.add_argument("--region", help="AWS region name to use for API calls") # Correlate logs command correlate_parser = subparsers.add_parser( "correlate", help="Correlate logs across multiple AWS services" ) correlate_parser.add_argument( "log_group_names", nargs="+", help="List of log group names to search" ) correlate_parser.add_argument("search_term", help="Term to search for in logs") correlate_parser.add_argument( "--hours", type=int, default=24, help="Number of hours to look back (default: 24)" ) correlate_parser.add_argument( "--start-time", type=str, help="Start time (ISO8601, e.g. 2024-06-01T00:00:00Z)" ) correlate_parser.add_argument( "--end-time", type=str, help="End time (ISO8601, e.g. 2024-06-01T23:59:59Z)" ) correlate_parser.add_argument( "--profile", help="AWS profile name to use for credentials" ) correlate_parser.add_argument("--region", help="AWS region name to use for API calls") def add_aws_config_args(tool_args, args): """Add profile and region arguments to tool calls if specified.""" if args.profile: tool_args["profile"] = args.profile if args.region: tool_args["region"] = args.region return tool_args async def main(): """Main function to run the CloudWatch Logs MCP client.""" args = parser.parse_args() # Determine the server path (relative or absolute) script_dir = os.path.dirname(os.path.abspath(__file__)) server_path = os.path.join(script_dir, "cw-mcp-server", "server.py") # Prepare server arguments server_args = [server_path] if args.profile: server_args.extend(["--profile", args.profile]) if args.region: server_args.extend(["--region", args.region]) # Create server parameters server_params = StdioServerParameters(command="python3", args=server_args, env=None) # Connect to the server async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # Initialize the client session await session.initialize() # Check if a command was specified if args.command is None: parser.print_help() return try: # Execute the requested command if args.command == "list-groups": if args.use_tool: # Use the tool interface tool_args = {} if args.prefix: tool_args["prefix"] = args.prefix if args.limit: tool_args["limit"] = args.limit if args.next_token: tool_args["next_token"] = args.next_token tool_args = add_aws_config_args(tool_args, args) result = await session.call_tool( "list_log_groups", arguments=tool_args ) print_json_response(result) else: # Use the resource interface # Build query string for parameters if provided if args.prefix: # If prefix is provided, use the filtered endpoint resource_uri = f"logs://groups/filter/{args.prefix}" else: resource_uri = "logs://groups" content, _ = await session.read_resource(resource_uri) print_json_response(content) elif args.command == "group-details": resource_uri = f"logs://groups/{args.log_group_name}" content, _ = await session.read_resource(resource_uri) print_json_response(content) elif args.command == "list-streams": resource_uri = f"logs://groups/{args.log_group_name}/streams" content, _ = await session.read_resource(resource_uri) print_json_response(content) elif args.command == "get-events": resource_uri = f"logs://groups/{args.log_group_name}/streams/{args.log_stream_name}" content, _ = await session.read_resource(resource_uri) print_json_response(content) elif args.command == "sample": resource_uri = ( f"logs://groups/{args.log_group_name}/sample?limit={args.limit}" ) content, _ = await session.read_resource(resource_uri) print_json_response(content) elif args.command == "recent-errors": resource_uri = f"logs://groups/{args.log_group_name}/recent-errors?hours={args.hours}" content, _ = await session.read_resource(resource_uri) print_json_response(content) elif args.command == "metrics": resource_uri = f"logs://groups/{args.log_group_name}/metrics?hours={args.hours}" content, _ = await session.read_resource(resource_uri) print_json_response(content) elif args.command == "structure": resource_uri = f"logs://groups/{args.log_group_name}/structure" content, _ = await session.read_resource(resource_uri) print_json_response(content) elif args.command == "get-prompt": # Get the analyze logs prompt from the server arguments = {"log_group_name": args.log_group_name} arguments = add_aws_config_args(arguments, args) result = await session.get_prompt( "analyze_cloudwatch_logs", arguments=arguments, ) # Extract and print the prompt text prompt_messages = result.messages if prompt_messages and len(prompt_messages) > 0: message = prompt_messages[0] if hasattr(message, "content") and hasattr( message.content, "text" ): print(message.content.text) else: print( json.dumps( message, default=lambda x: x.__dict__, indent=2 ) ) else: print("No prompt received.") elif args.command == "list-prompt": # Get arguments for the prompt arguments = {} if args.prefix: arguments["prefix"] = args.prefix arguments = add_aws_config_args(arguments, args) # Get the list logs prompt from the server result = await session.get_prompt( "list_cloudwatch_log_groups", arguments=arguments ) # Extract and print the prompt text prompt_messages = result.messages if prompt_messages and len(prompt_messages) > 0: message = prompt_messages[0] if hasattr(message, "content") and hasattr( message.content, "text" ): print(message.content.text) else: print( json.dumps( message, default=lambda x: x.__dict__, indent=2 ) ) else: print("No prompt received.") elif args.command == "search": tool_args = { "log_group_name": args.log_group_name, "query": args.query, } if args.start_time: tool_args["start_time"] = args.start_time if args.end_time: tool_args["end_time"] = args.end_time if not (args.start_time or args.end_time): tool_args["hours"] = args.hours tool_args = add_aws_config_args(tool_args, args) result = await session.call_tool( "search_logs", arguments=tool_args, ) print_json_response(result) elif args.command == "search-multi": tool_args = { "log_group_names": args.log_group_names, "query": args.query, } if args.start_time: tool_args["start_time"] = args.start_time if args.end_time: tool_args["end_time"] = args.end_time if not (args.start_time or args.end_time): tool_args["hours"] = args.hours tool_args = add_aws_config_args(tool_args, args) result = await session.call_tool( "search_logs_multi", arguments=tool_args, ) print_json_response(result) elif args.command == "summarize": tool_args = { "log_group_name": args.log_group_name, } if args.start_time: tool_args["start_time"] = args.start_time if args.end_time: tool_args["end_time"] = args.end_time if not (args.start_time or args.end_time): tool_args["hours"] = args.hours tool_args = add_aws_config_args(tool_args, args) result = await session.call_tool( "summarize_log_activity", arguments=tool_args, ) print_json_response(result) elif args.command == "find-errors": tool_args = { "log_group_name": args.log_group_name, } if args.start_time: tool_args["start_time"] = args.start_time if args.end_time: tool_args["end_time"] = args.end_time if not (args.start_time or args.end_time): tool_args["hours"] = args.hours tool_args = add_aws_config_args(tool_args, args) result = await session.call_tool( "find_error_patterns", arguments=tool_args, ) print_json_response(result) elif args.command == "correlate": tool_args = { "log_group_names": args.log_group_names, "search_term": args.search_term, } if args.start_time: tool_args["start_time"] = args.start_time if args.end_time: tool_args["end_time"] = args.end_time if not (args.start_time or args.end_time): tool_args["hours"] = args.hours tool_args = add_aws_config_args(tool_args, args) result = await session.call_tool( "correlate_logs", arguments=tool_args, ) print_json_response(result) except Exception as e: print(f"Error: {str(e)}", file=sys.stderr) sys.exit(1) def print_json_response(content: str | tuple | object | None): """Print JSON content in a formatted way. Args: content: The content to print, which could be: - String (direct JSON content) - Tuple (from read_resource, where the first element is the content) - Object with .content or .text attributes (from CallToolResult) - None """ try: # Handle None case if content is None: print("No content received.") return # For Session.read_resource responses, which returns tuple (meta, content) # but we found that sometimes content is None if isinstance(content, tuple): meta, content_text = ( content if len(content) >= 2 else (content[0] if len(content) == 1 else None, None) ) # If we have usable content in the second element, use it if content_text is not None: content = content_text # Otherwise, if meta looks usable, try that elif isinstance(meta, str) and meta != "meta": content = meta # We don't have usable content in the tuple else: print("No usable content found in the response.") return # Handle object with content attribute (from CallToolResult) if hasattr(content, "content"): content = content.content # Handle object with text attribute if hasattr(content, "text"): content = content.text # Handle CallToolResult content from mcp_types which can be a list if isinstance(content, list) and all(hasattr(item, "text") for item in content): # Extract text from each item extracted_texts = [item.text for item in content if item.text] if extracted_texts: content = extracted_texts[0] # Use the first text element # Handle if content is a custom object with __str__ method if not isinstance(content, (str, bytes, bytearray)) and hasattr( content, "__str__" ): content = str(content) # Try to handle various formats if isinstance(content, str): try: # Try to parse as JSON parsed = json.loads(content) print(json.dumps(parsed, indent=2)) except json.JSONDecodeError: # Not valid JSON, just print the string print(content) elif isinstance(content, (dict, list)): # Direct Python objects print(json.dumps(content, indent=2, default=lambda x: str(x))) else: # Fall back to string representation print(content) except Exception as e: # Catch-all for any unexpected errors print(f"Error processing response: {e}") print(content) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/awslabs/Log-Analyzer-with-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server