log_analyzer_template.j2โข16 kB
"""
Log analyzer module for MockLoop analytics functionality.
Provides comprehensive log analysis and insights generation.
"""
import json
import re
from datetime import datetime, timedelta
from typing import Any, Optional
from collections import defaultdict, Counter
import statistics
class LogAnalyzer:
"""Advanced log analyzer for MockLoop request logs."""
def __init__(self):
self.insights = []
def analyze_logs(self, logs: list[dict[str, Any]]) -> dict[str, Any]:
"""
Perform comprehensive analysis of request logs.
Args:
logs: List of log entries from the database
Returns:
Dictionary containing analysis results and insights
"""
if not logs:
return {
"total_requests": 0,
"time_range": None,
"performance": None,
"status_codes": None,
"endpoints": None,
"methods": None,
"insights": [],
"analysis_timestamp": datetime.now().isoformat()
}
analysis = {
"total_requests": len(logs),
"time_range": self._analyze_time_range(logs),
"performance": self._analyze_performance(logs),
"status_codes": self._analyze_status_codes(logs),
"endpoints": self._analyze_endpoints(logs),
"methods": self._analyze_methods(logs),
"user_agents": self._analyze_user_agents(logs),
"response_sizes": self._analyze_response_sizes(logs),
"insights": self._generate_insights(logs),
"analysis_timestamp": datetime.now().isoformat()
}
return analysis
def _analyze_time_range(self, logs: list[dict[str, Any]]) -> dict[str, Any]:
"""Analyze time range and request distribution."""
timestamps = []
for log in logs:
if log.get('timestamp'):
try:
# Handle different timestamp formats
ts_str = log['timestamp']
if 'T' in ts_str:
# ISO format
if ts_str.endswith('Z'):
ts = datetime.fromisoformat(ts_str[:-1])
else:
ts = datetime.fromisoformat(ts_str)
else:
# Assume standard format
ts = datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S')
timestamps.append(ts)
except (ValueError, TypeError):
continue
if not timestamps:
return None
timestamps.sort()
start_time = timestamps[0]
end_time = timestamps[-1]
duration = end_time - start_time
# Analyze request distribution over time
hourly_distribution = defaultdict(int)
for ts in timestamps:
hour_key = ts.strftime('%Y-%m-%d %H:00')
hourly_distribution[hour_key] += 1
return {
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"duration_seconds": duration.total_seconds(),
"duration_human": self._format_duration(duration),
"total_entries": len(timestamps),
"requests_per_hour": dict(hourly_distribution),
"peak_hour": max(hourly_distribution.items(), key=lambda x: x[1]) if hourly_distribution else None
}
def _analyze_performance(self, logs: list[dict[str, Any]]) -> dict[str, Any]:
"""Analyze response time performance metrics."""
response_times = []
for log in logs:
if log.get('process_time_ms') is not None:
try:
rt = float(log['process_time_ms'])
response_times.append(rt)
except (ValueError, TypeError):
continue
if not response_times:
return None
response_times.sort()
return {
"total_requests": len(response_times),
"average_ms": round(statistics.mean(response_times), 2),
"median_ms": round(statistics.median(response_times), 2),
"min_ms": min(response_times),
"max_ms": max(response_times),
"p95_ms": round(self._percentile(response_times, 95), 2),
"p99_ms": round(self._percentile(response_times, 99), 2),
"std_dev_ms": round(statistics.stdev(response_times) if len(response_times) > 1 else 0, 2),
"slow_requests": len([rt for rt in response_times if rt > 1000]), # > 1 second
"fast_requests": len([rt for rt in response_times if rt < 100]) # < 100ms
}
def _analyze_status_codes(self, logs: list[dict[str, Any]]) -> dict[str, Any]:
"""Analyze HTTP status code distribution."""
status_codes = Counter()
for log in logs:
if log.get('status_code'):
try:
status = int(log['status_code'])
status_codes[status] += 1
except (ValueError, TypeError):
continue
if not status_codes:
return None
total = sum(status_codes.values())
success_codes = sum(count for code, count in status_codes.items() if 200 <= code < 300)
error_codes = sum(count for code, count in status_codes.items() if code >= 400)
return {
"distribution": dict(status_codes),
"total_requests": total,
"success_count": success_codes,
"error_count": error_codes,
"success_rate": round((success_codes / total) * 100, 2) if total > 0 else 0,
"error_rate": round((error_codes / total) * 100, 2) if total > 0 else 0,
"most_common": status_codes.most_common(5)
}
def _analyze_endpoints(self, logs: list[dict[str, Any]]) -> dict[str, Any]:
"""Analyze endpoint usage patterns."""
endpoints = Counter()
endpoint_performance = defaultdict(list)
for log in logs:
path = log.get('path', '')
if path:
endpoints[path] += 1
# Track performance per endpoint
if log.get('process_time_ms') is not None:
try:
rt = float(log['process_time_ms'])
endpoint_performance[path].append(rt)
except (ValueError, TypeError):
pass
# Calculate performance stats per endpoint
endpoint_stats = {}
for path, times in endpoint_performance.items():
if times:
endpoint_stats[path] = {
"request_count": endpoints[path],
"avg_response_time": round(statistics.mean(times), 2),
"min_response_time": min(times),
"max_response_time": max(times)
}
return {
"distribution": dict(endpoints),
"total_unique_endpoints": len(endpoints),
"most_popular": endpoints.most_common(10),
"performance_by_endpoint": endpoint_stats,
"endpoint_categories": self._categorize_endpoints(list(endpoints.keys()))
}
def _analyze_methods(self, logs: list[dict[str, Any]]) -> dict[str, Any]:
"""Analyze HTTP method distribution."""
methods = Counter()
for log in logs:
method = log.get('method', '').upper()
if method:
methods[method] += 1
total = sum(methods.values())
return {
"distribution": dict(methods),
"total_requests": total,
"most_common": methods.most_common(),
"read_write_ratio": self._calculate_read_write_ratio(methods)
}
def _analyze_user_agents(self, logs: list[dict[str, Any]]) -> dict[str, Any]:
"""Analyze user agent patterns."""
user_agents = Counter()
browsers = Counter()
for log in logs:
ua = log.get('user_agent', '')
if ua:
user_agents[ua] += 1
# Categorize browsers/clients
if 'curl' in ua.lower():
browsers['curl'] += 1
elif 'firefox' in ua.lower():
browsers['Firefox'] += 1
elif 'chrome' in ua.lower():
browsers['Chrome'] += 1
elif 'safari' in ua.lower():
browsers['Safari'] += 1
elif 'postman' in ua.lower():
browsers['Postman'] += 1
else:
browsers['Other'] += 1
return {
"unique_user_agents": len(user_agents),
"browser_distribution": dict(browsers),
"most_common_agents": user_agents.most_common(5)
}
def _analyze_response_sizes(self, logs: list[dict[str, Any]]) -> dict[str, Any]:
"""Analyze response size patterns."""
sizes = []
for log in logs:
if log.get('response_size') is not None:
try:
size = int(log['response_size'])
sizes.append(size)
except (ValueError, TypeError):
continue
if not sizes:
return None
total_bytes = sum(sizes)
return {
"total_responses": len(sizes),
"total_bytes": total_bytes,
"total_mb": round(total_bytes / (1024 * 1024), 2),
"average_bytes": round(statistics.mean(sizes), 2),
"median_bytes": statistics.median(sizes),
"min_bytes": min(sizes),
"max_bytes": max(sizes),
"large_responses": len([s for s in sizes if s > 1024 * 1024]) # > 1MB
}
def _generate_insights(self, logs: list[dict[str, Any]]) -> list[str]:
"""Generate actionable insights from the analysis."""
insights = []
# Performance insights
response_times = [float(log['process_time_ms']) for log in logs
if log.get('process_time_ms') is not None]
if response_times:
avg_time = statistics.mean(response_times)
if avg_time > 500:
insights.append(f"Average response time is {avg_time:.1f}ms, which may be slow for users")
elif avg_time < 50:
insights.append(f"Excellent performance with {avg_time:.1f}ms average response time")
# Error rate insights
status_codes = [log.get('status_code') for log in logs if log.get('status_code')]
if status_codes:
error_count = len([code for code in status_codes if code >= 400])
error_rate = (error_count / len(status_codes)) * 100
if error_rate > 10:
insights.append(f"High error rate detected: {error_rate:.1f}% of requests failed")
elif error_rate == 0:
insights.append("Perfect success rate - no errors detected")
# Traffic patterns
endpoints = Counter(log.get('path') for log in logs if log.get('path'))
if endpoints:
top_endpoint = endpoints.most_common(1)[0]
if top_endpoint[1] > len(logs) * 0.5:
insights.append(f"Heavy concentration on {top_endpoint[0]} ({top_endpoint[1]} requests)")
# Method distribution insights
methods = Counter(log.get('method') for log in logs if log.get('method'))
if methods:
read_methods = methods.get('GET', 0)
write_methods = sum(methods.get(m, 0) for m in ['POST', 'PUT', 'DELETE', 'PATCH'])
if write_methods == 0:
insights.append("All requests are read-only (GET) - consider testing write operations")
elif read_methods == 0:
insights.append("No read operations detected - unusual traffic pattern")
# Time-based insights
timestamps = []
for log in logs:
if log.get('timestamp'):
try:
ts_str = log['timestamp']
if 'T' in ts_str:
ts = datetime.fromisoformat(ts_str.replace('Z', ''))
else:
ts = datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S')
timestamps.append(ts)
except (ValueError, TypeError):
continue
if len(timestamps) > 1:
timestamps.sort()
duration = timestamps[-1] - timestamps[0]
if duration.total_seconds() < 60:
insights.append("High request frequency - all traffic within 1 minute")
elif duration.total_seconds() > 3600:
insights.append(f"Extended testing period: {self._format_duration(duration)}")
return insights
def _categorize_endpoints(self, paths: list[str]) -> dict[str, int]:
"""Categorize endpoints by type."""
categories = defaultdict(int)
for path in paths:
if '/admin' in path:
categories['admin'] += 1
elif '/api' in path:
categories['api'] += 1
elif '/auth' in path or '/login' in path or '/token' in path:
categories['auth'] += 1
elif '/health' in path or '/status' in path:
categories['health'] += 1
elif '/docs' in path or '/openapi' in path:
categories['docs'] += 1
else:
categories['business'] += 1
return dict(categories)
def _calculate_read_write_ratio(self, methods: Counter) -> dict[str, Any]:
"""Calculate read vs write operation ratio."""
read_methods = methods.get('GET', 0) + methods.get('HEAD', 0) + methods.get('OPTIONS', 0)
write_methods = (methods.get('POST', 0) + methods.get('PUT', 0) +
methods.get('DELETE', 0) + methods.get('PATCH', 0))
total = read_methods + write_methods
if total == 0:
return {"read_percentage": 0, "write_percentage": 0, "ratio": "N/A"}
read_pct = round((read_methods / total) * 100, 1)
write_pct = round((write_methods / total) * 100, 1)
return {
"read_percentage": read_pct,
"write_percentage": write_pct,
"ratio": f"{read_methods}:{write_methods}",
"read_count": read_methods,
"write_count": write_methods
}
def _percentile(self, data: list[float], percentile: int) -> float:
"""Calculate percentile value."""
if not data:
return 0
data_sorted = sorted(data)
index = (percentile / 100) * (len(data_sorted) - 1)
if index.is_integer():
return data_sorted[int(index)]
else:
lower = data_sorted[int(index)]
upper = data_sorted[int(index) + 1]
return lower + (upper - lower) * (index - int(index))
def _format_duration(self, duration: timedelta) -> str:
"""Format duration in human-readable format."""
total_seconds = int(duration.total_seconds())
if total_seconds < 60:
return f"{total_seconds} seconds"
elif total_seconds < 3600:
minutes = total_seconds // 60
seconds = total_seconds % 60
return f"{minutes}m {seconds}s"
else:
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
return f"{hours}h {minutes}m"
def analyze_request_patterns(logs: list[dict[str, Any]]) -> dict[str, Any]:
"""
Standalone function for quick pattern analysis.
Args:
logs: List of request log entries
Returns:
Dictionary with pattern analysis results
"""
analyzer = LogAnalyzer()
return analyzer.analyze_logs(logs)