Skip to main content
Glama

MockLoop MCP Server

Official
by MockLoop
log_analyzer_template.j2โ€ข16 kB
""" Log analyzer module for MockLoop analytics functionality. Provides comprehensive log analysis and insights generation. """ import json import re from datetime import datetime, timedelta from typing import Any, Optional from collections import defaultdict, Counter import statistics class LogAnalyzer: """Advanced log analyzer for MockLoop request logs.""" def __init__(self): self.insights = [] def analyze_logs(self, logs: list[dict[str, Any]]) -> dict[str, Any]: """ Perform comprehensive analysis of request logs. Args: logs: List of log entries from the database Returns: Dictionary containing analysis results and insights """ if not logs: return { "total_requests": 0, "time_range": None, "performance": None, "status_codes": None, "endpoints": None, "methods": None, "insights": [], "analysis_timestamp": datetime.now().isoformat() } analysis = { "total_requests": len(logs), "time_range": self._analyze_time_range(logs), "performance": self._analyze_performance(logs), "status_codes": self._analyze_status_codes(logs), "endpoints": self._analyze_endpoints(logs), "methods": self._analyze_methods(logs), "user_agents": self._analyze_user_agents(logs), "response_sizes": self._analyze_response_sizes(logs), "insights": self._generate_insights(logs), "analysis_timestamp": datetime.now().isoformat() } return analysis def _analyze_time_range(self, logs: list[dict[str, Any]]) -> dict[str, Any]: """Analyze time range and request distribution.""" timestamps = [] for log in logs: if log.get('timestamp'): try: # Handle different timestamp formats ts_str = log['timestamp'] if 'T' in ts_str: # ISO format if ts_str.endswith('Z'): ts = datetime.fromisoformat(ts_str[:-1]) else: ts = datetime.fromisoformat(ts_str) else: # Assume standard format ts = datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S') timestamps.append(ts) except (ValueError, TypeError): continue if not timestamps: return None timestamps.sort() start_time = timestamps[0] end_time = timestamps[-1] duration = end_time - start_time # Analyze request distribution over time hourly_distribution = defaultdict(int) for ts in timestamps: hour_key = ts.strftime('%Y-%m-%d %H:00') hourly_distribution[hour_key] += 1 return { "start_time": start_time.isoformat(), "end_time": end_time.isoformat(), "duration_seconds": duration.total_seconds(), "duration_human": self._format_duration(duration), "total_entries": len(timestamps), "requests_per_hour": dict(hourly_distribution), "peak_hour": max(hourly_distribution.items(), key=lambda x: x[1]) if hourly_distribution else None } def _analyze_performance(self, logs: list[dict[str, Any]]) -> dict[str, Any]: """Analyze response time performance metrics.""" response_times = [] for log in logs: if log.get('process_time_ms') is not None: try: rt = float(log['process_time_ms']) response_times.append(rt) except (ValueError, TypeError): continue if not response_times: return None response_times.sort() return { "total_requests": len(response_times), "average_ms": round(statistics.mean(response_times), 2), "median_ms": round(statistics.median(response_times), 2), "min_ms": min(response_times), "max_ms": max(response_times), "p95_ms": round(self._percentile(response_times, 95), 2), "p99_ms": round(self._percentile(response_times, 99), 2), "std_dev_ms": round(statistics.stdev(response_times) if len(response_times) > 1 else 0, 2), "slow_requests": len([rt for rt in response_times if rt > 1000]), # > 1 second "fast_requests": len([rt for rt in response_times if rt < 100]) # < 100ms } def _analyze_status_codes(self, logs: list[dict[str, Any]]) -> dict[str, Any]: """Analyze HTTP status code distribution.""" status_codes = Counter() for log in logs: if log.get('status_code'): try: status = int(log['status_code']) status_codes[status] += 1 except (ValueError, TypeError): continue if not status_codes: return None total = sum(status_codes.values()) success_codes = sum(count for code, count in status_codes.items() if 200 <= code < 300) error_codes = sum(count for code, count in status_codes.items() if code >= 400) return { "distribution": dict(status_codes), "total_requests": total, "success_count": success_codes, "error_count": error_codes, "success_rate": round((success_codes / total) * 100, 2) if total > 0 else 0, "error_rate": round((error_codes / total) * 100, 2) if total > 0 else 0, "most_common": status_codes.most_common(5) } def _analyze_endpoints(self, logs: list[dict[str, Any]]) -> dict[str, Any]: """Analyze endpoint usage patterns.""" endpoints = Counter() endpoint_performance = defaultdict(list) for log in logs: path = log.get('path', '') if path: endpoints[path] += 1 # Track performance per endpoint if log.get('process_time_ms') is not None: try: rt = float(log['process_time_ms']) endpoint_performance[path].append(rt) except (ValueError, TypeError): pass # Calculate performance stats per endpoint endpoint_stats = {} for path, times in endpoint_performance.items(): if times: endpoint_stats[path] = { "request_count": endpoints[path], "avg_response_time": round(statistics.mean(times), 2), "min_response_time": min(times), "max_response_time": max(times) } return { "distribution": dict(endpoints), "total_unique_endpoints": len(endpoints), "most_popular": endpoints.most_common(10), "performance_by_endpoint": endpoint_stats, "endpoint_categories": self._categorize_endpoints(list(endpoints.keys())) } def _analyze_methods(self, logs: list[dict[str, Any]]) -> dict[str, Any]: """Analyze HTTP method distribution.""" methods = Counter() for log in logs: method = log.get('method', '').upper() if method: methods[method] += 1 total = sum(methods.values()) return { "distribution": dict(methods), "total_requests": total, "most_common": methods.most_common(), "read_write_ratio": self._calculate_read_write_ratio(methods) } def _analyze_user_agents(self, logs: list[dict[str, Any]]) -> dict[str, Any]: """Analyze user agent patterns.""" user_agents = Counter() browsers = Counter() for log in logs: ua = log.get('user_agent', '') if ua: user_agents[ua] += 1 # Categorize browsers/clients if 'curl' in ua.lower(): browsers['curl'] += 1 elif 'firefox' in ua.lower(): browsers['Firefox'] += 1 elif 'chrome' in ua.lower(): browsers['Chrome'] += 1 elif 'safari' in ua.lower(): browsers['Safari'] += 1 elif 'postman' in ua.lower(): browsers['Postman'] += 1 else: browsers['Other'] += 1 return { "unique_user_agents": len(user_agents), "browser_distribution": dict(browsers), "most_common_agents": user_agents.most_common(5) } def _analyze_response_sizes(self, logs: list[dict[str, Any]]) -> dict[str, Any]: """Analyze response size patterns.""" sizes = [] for log in logs: if log.get('response_size') is not None: try: size = int(log['response_size']) sizes.append(size) except (ValueError, TypeError): continue if not sizes: return None total_bytes = sum(sizes) return { "total_responses": len(sizes), "total_bytes": total_bytes, "total_mb": round(total_bytes / (1024 * 1024), 2), "average_bytes": round(statistics.mean(sizes), 2), "median_bytes": statistics.median(sizes), "min_bytes": min(sizes), "max_bytes": max(sizes), "large_responses": len([s for s in sizes if s > 1024 * 1024]) # > 1MB } def _generate_insights(self, logs: list[dict[str, Any]]) -> list[str]: """Generate actionable insights from the analysis.""" insights = [] # Performance insights response_times = [float(log['process_time_ms']) for log in logs if log.get('process_time_ms') is not None] if response_times: avg_time = statistics.mean(response_times) if avg_time > 500: insights.append(f"Average response time is {avg_time:.1f}ms, which may be slow for users") elif avg_time < 50: insights.append(f"Excellent performance with {avg_time:.1f}ms average response time") # Error rate insights status_codes = [log.get('status_code') for log in logs if log.get('status_code')] if status_codes: error_count = len([code for code in status_codes if code >= 400]) error_rate = (error_count / len(status_codes)) * 100 if error_rate > 10: insights.append(f"High error rate detected: {error_rate:.1f}% of requests failed") elif error_rate == 0: insights.append("Perfect success rate - no errors detected") # Traffic patterns endpoints = Counter(log.get('path') for log in logs if log.get('path')) if endpoints: top_endpoint = endpoints.most_common(1)[0] if top_endpoint[1] > len(logs) * 0.5: insights.append(f"Heavy concentration on {top_endpoint[0]} ({top_endpoint[1]} requests)") # Method distribution insights methods = Counter(log.get('method') for log in logs if log.get('method')) if methods: read_methods = methods.get('GET', 0) write_methods = sum(methods.get(m, 0) for m in ['POST', 'PUT', 'DELETE', 'PATCH']) if write_methods == 0: insights.append("All requests are read-only (GET) - consider testing write operations") elif read_methods == 0: insights.append("No read operations detected - unusual traffic pattern") # Time-based insights timestamps = [] for log in logs: if log.get('timestamp'): try: ts_str = log['timestamp'] if 'T' in ts_str: ts = datetime.fromisoformat(ts_str.replace('Z', '')) else: ts = datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S') timestamps.append(ts) except (ValueError, TypeError): continue if len(timestamps) > 1: timestamps.sort() duration = timestamps[-1] - timestamps[0] if duration.total_seconds() < 60: insights.append("High request frequency - all traffic within 1 minute") elif duration.total_seconds() > 3600: insights.append(f"Extended testing period: {self._format_duration(duration)}") return insights def _categorize_endpoints(self, paths: list[str]) -> dict[str, int]: """Categorize endpoints by type.""" categories = defaultdict(int) for path in paths: if '/admin' in path: categories['admin'] += 1 elif '/api' in path: categories['api'] += 1 elif '/auth' in path or '/login' in path or '/token' in path: categories['auth'] += 1 elif '/health' in path or '/status' in path: categories['health'] += 1 elif '/docs' in path or '/openapi' in path: categories['docs'] += 1 else: categories['business'] += 1 return dict(categories) def _calculate_read_write_ratio(self, methods: Counter) -> dict[str, Any]: """Calculate read vs write operation ratio.""" read_methods = methods.get('GET', 0) + methods.get('HEAD', 0) + methods.get('OPTIONS', 0) write_methods = (methods.get('POST', 0) + methods.get('PUT', 0) + methods.get('DELETE', 0) + methods.get('PATCH', 0)) total = read_methods + write_methods if total == 0: return {"read_percentage": 0, "write_percentage": 0, "ratio": "N/A"} read_pct = round((read_methods / total) * 100, 1) write_pct = round((write_methods / total) * 100, 1) return { "read_percentage": read_pct, "write_percentage": write_pct, "ratio": f"{read_methods}:{write_methods}", "read_count": read_methods, "write_count": write_methods } def _percentile(self, data: list[float], percentile: int) -> float: """Calculate percentile value.""" if not data: return 0 data_sorted = sorted(data) index = (percentile / 100) * (len(data_sorted) - 1) if index.is_integer(): return data_sorted[int(index)] else: lower = data_sorted[int(index)] upper = data_sorted[int(index) + 1] return lower + (upper - lower) * (index - int(index)) def _format_duration(self, duration: timedelta) -> str: """Format duration in human-readable format.""" total_seconds = int(duration.total_seconds()) if total_seconds < 60: return f"{total_seconds} seconds" elif total_seconds < 3600: minutes = total_seconds // 60 seconds = total_seconds % 60 return f"{minutes}m {seconds}s" else: hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 return f"{hours}h {minutes}m" def analyze_request_patterns(logs: list[dict[str, Any]]) -> dict[str, Any]: """ Standalone function for quick pattern analysis. Args: logs: List of request log entries Returns: Dictionary with pattern analysis results """ analyzer = LogAnalyzer() return analyzer.analyze_logs(logs)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MockLoop/mockloop-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server