"""
Advanced analytics and insights engine for MCP system
Provides data analysis, pattern recognition, and business intelligence
"""
import asyncio
import json
import logging
import statistics
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from collections import defaultdict, Counter
import pandas as pd
import numpy as np
from enum import Enum
logger = logging.getLogger(__name__)
class InsightType(Enum):
"""Types of insights that can be generated"""
USAGE_PATTERN = "usage_pattern"
PERFORMANCE_TREND = "performance_trend"
ERROR_ANALYSIS = "error_analysis"
USER_BEHAVIOR = "user_behavior"
SYSTEM_HEALTH = "system_health"
COST_OPTIMIZATION = "cost_optimization"
SECURITY_ANOMALY = "security_anomaly"
class InsightSeverity(Enum):
"""Severity levels for insights"""
INFO = "info"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class Insight:
"""Represents a generated insight"""
insight_id: str
type: InsightType
severity: InsightSeverity
title: str
description: str
data: Dict[str, Any]
recommendations: List[str]
confidence: float
created_at: datetime = field(default_factory=datetime.now)
tags: List[str] = field(default_factory=list)
@dataclass
class AnalyticsMetric:
"""Analytics metric definition"""
name: str
value: float
unit: str
timestamp: datetime
metadata: Dict[str, Any] = field(default_factory=dict)
class DataCollector:
"""Collects and preprocesses data for analytics"""
def __init__(self):
self.metrics_buffer: List[AnalyticsMetric] = []
self.max_buffer_size = 10000
def collect_query_metrics(self, query_data: Dict[str, Any]) -> AnalyticsMetric:
"""Collect metrics from query execution"""
return AnalyticsMetric(
name="query_execution_time",
value=query_data.get('execution_time', 0),
unit="seconds",
timestamp=datetime.now(),
metadata={
'query_type': query_data.get('query_type'),
'success': query_data.get('success'),
'result_count': query_data.get('result_count', 0)
}
)
def collect_api_metrics(self, api_data: Dict[str, Any]) -> AnalyticsMetric:
"""Collect metrics from API calls"""
return AnalyticsMetric(
name="api_response_time",
value=api_data.get('response_time', 0),
unit="milliseconds",
timestamp=datetime.now(),
metadata={
'endpoint': api_data.get('endpoint'),
'status_code': api_data.get('status_code'),
'user_id': api_data.get('user_id')
}
)
def collect_system_metrics(self, system_data: Dict[str, Any]) -> List[AnalyticsMetric]:
"""Collect system performance metrics"""
metrics = []
for metric_name, value in system_data.items():
if isinstance(value, (int, float)):
metric = AnalyticsMetric(
name=f"system_{metric_name}",
value=value,
unit=self._get_metric_unit(metric_name),
timestamp=datetime.now()
)
metrics.append(metric)
return metrics
def _get_metric_unit(self, metric_name: str) -> str:
"""Get appropriate unit for metric"""
unit_mapping = {
'cpu_percent': 'percent',
'memory_percent': 'percent',
'disk_usage': 'percent',
'response_time': 'milliseconds',
'query_count': 'count',
'error_count': 'count'
}
return unit_mapping.get(metric_name, 'count')
def add_metric(self, metric: AnalyticsMetric):
"""Add metric to buffer"""
self.metrics_buffer.append(metric)
# Maintain buffer size
if len(self.metrics_buffer) > self.max_buffer_size:
self.metrics_buffer = self.metrics_buffer[-self.max_buffer_size:]
def get_metrics(self,
metric_name: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None) -> List[AnalyticsMetric]:
"""Get metrics based on filters"""
filtered_metrics = self.metrics_buffer
if metric_name:
filtered_metrics = [m for m in filtered_metrics if m.name == metric_name]
if start_time:
filtered_metrics = [m for m in filtered_metrics if m.timestamp >= start_time]
if end_time:
filtered_metrics = [m for m in filtered_metrics if m.timestamp <= end_time]
return filtered_metrics
class PatternAnalyzer:
"""Analyzes patterns in data"""
def __init__(self):
self.pattern_cache: Dict[str, Any] = {}
def analyze_usage_patterns(self, metrics: List[AnalyticsMetric]) -> Dict[str, Any]:
"""Analyze usage patterns"""
if not metrics:
return {}
# Group by hour of day
hourly_usage = defaultdict(list)
daily_usage = defaultdict(list)
for metric in metrics:
hour = metric.timestamp.hour
day = metric.timestamp.strftime('%Y-%m-%d')
hourly_usage[hour].append(metric.value)
daily_usage[day].append(metric.value)
# Calculate hourly averages
hourly_averages = {
hour: statistics.mean(values)
for hour, values in hourly_usage.items()
}
# Find peak hours
peak_hour = max(hourly_averages.items(), key=lambda x: x[1]) if hourly_averages else (0, 0)
low_hour = min(hourly_averages.items(), key=lambda x: x[1]) if hourly_averages else (0, 0)
# Calculate trends
daily_totals = {day: sum(values) for day, values in daily_usage.items()}
trend = self._calculate_trend(list(daily_totals.values()))
return {
'hourly_averages': hourly_averages,
'peak_hour': {'hour': peak_hour[0], 'value': peak_hour[1]},
'low_hour': {'hour': low_hour[0], 'value': low_hour[1]},
'daily_totals': daily_totals,
'trend': trend,
'total_requests': len(metrics)
}
def analyze_performance_trends(self, metrics: List[AnalyticsMetric]) -> Dict[str, Any]:
"""Analyze performance trends"""
if not metrics:
return {}
values = [m.value for m in metrics]
timestamps = [m.timestamp for m in metrics]
# Basic statistics
avg_performance = statistics.mean(values)
median_performance = statistics.median(values)
std_dev = statistics.stdev(values) if len(values) > 1 else 0
# Percentiles
p95 = np.percentile(values, 95) if values else 0
p99 = np.percentile(values, 99) if values else 0
# Trend analysis
trend = self._calculate_trend(values)
# Anomaly detection (simple threshold-based)
threshold = avg_performance + 2 * std_dev
anomalies = [
{'timestamp': timestamps[i], 'value': values[i]}
for i, value in enumerate(values)
if value > threshold
]
return {
'average': avg_performance,
'median': median_performance,
'std_dev': std_dev,
'p95': p95,
'p99': p99,
'trend': trend,
'anomalies': anomalies,
'anomaly_count': len(anomalies)
}
def analyze_error_patterns(self, error_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze error patterns"""
if not error_data:
return {}
# Error frequency by type
error_types = Counter(error['type'] for error in error_data if 'type' in error)
# Error frequency by endpoint
endpoints = Counter(error['endpoint'] for error in error_data if 'endpoint' in error)
# Error trends over time
error_timeline = defaultdict(int)
for error in error_data:
if 'timestamp' in error:
hour = error['timestamp'].strftime('%Y-%m-%d %H:00')
error_timeline[hour] += 1
# Most common error messages
messages = Counter(error['message'][:100] for error in error_data if 'message' in error)
return {
'total_errors': len(error_data),
'error_types': dict(error_types.most_common(10)),
'error_endpoints': dict(endpoints.most_common(10)),
'error_timeline': dict(error_timeline),
'common_messages': dict(messages.most_common(5))
}
def _calculate_trend(self, values: List[float]) -> str:
"""Calculate trend direction"""
if len(values) < 2:
return "insufficient_data"
# Simple linear regression slope
n = len(values)
x = list(range(n))
y = values
slope = (n * sum(x[i] * y[i] for i in range(n)) - sum(x) * sum(y)) / \
(n * sum(x[i] ** 2 for i in range(n)) - sum(x) ** 2)
if abs(slope) < 0.01:
return "stable"
elif slope > 0:
return "increasing"
else:
return "decreasing"
class InsightGenerator:
"""Generates actionable insights from analyzed data"""
def __init__(self):
self.insight_rules: Dict[str, callable] = {
'high_error_rate': self._check_high_error_rate,
'performance_degradation': self._check_performance_degradation,
'unusual_usage_pattern': self._check_unusual_usage,
'resource_exhaustion': self._check_resource_exhaustion,
'security_anomaly': self._check_security_anomaly
}
def generate_insights(self, analysis_results: Dict[str, Any]) -> List[Insight]:
"""Generate insights from analysis results"""
insights = []
for rule_name, rule_func in self.insight_rules.items():
try:
insight = rule_func(analysis_results)
if insight:
insights.append(insight)
except Exception as e:
logger.error(f"Error generating insight '{rule_name}': {e}")
return insights
def _check_high_error_rate(self, data: Dict[str, Any]) -> Optional[Insight]:
"""Check for high error rates"""
error_analysis = data.get('error_analysis', {})
performance_data = data.get('performance_analysis', {})
total_errors = error_analysis.get('total_errors', 0)
total_requests = performance_data.get('total_requests', 1)
error_rate = (total_errors / total_requests) * 100
if error_rate > 5: # More than 5% error rate
severity = InsightSeverity.HIGH if error_rate > 15 else InsightSeverity.MEDIUM
return Insight(
insight_id=f"error_rate_{int(datetime.now().timestamp())}",
type=InsightType.ERROR_ANALYSIS,
severity=severity,
title="High Error Rate Detected",
description=f"Error rate is {error_rate:.2f}%, which exceeds the acceptable threshold.",
data={
'error_rate': error_rate,
'total_errors': total_errors,
'total_requests': total_requests,
'top_errors': error_analysis.get('error_types', {})
},
recommendations=[
"Investigate most common error types",
"Check system logs for root causes",
"Consider implementing circuit breakers",
"Review error handling in critical paths"
],
confidence=0.9,
tags=['errors', 'reliability']
)
return None
def _check_performance_degradation(self, data: Dict[str, Any]) -> Optional[Insight]:
"""Check for performance degradation"""
performance = data.get('performance_analysis', {})
trend = performance.get('trend')
avg_performance = performance.get('average', 0)
p95 = performance.get('p95', 0)
if trend == "increasing" and avg_performance > 1.0: # More than 1 second average
severity = InsightSeverity.HIGH if p95 > 5.0 else InsightSeverity.MEDIUM
return Insight(
insight_id=f"perf_degradation_{int(datetime.now().timestamp())}",
type=InsightType.PERFORMANCE_TREND,
severity=severity,
title="Performance Degradation Detected",
description=f"Response times are increasing. Average: {avg_performance:.2f}s, P95: {p95:.2f}s",
data={
'average_response_time': avg_performance,
'p95_response_time': p95,
'trend': trend,
'anomaly_count': performance.get('anomaly_count', 0)
},
recommendations=[
"Check database query performance",
"Review system resource utilization",
"Consider implementing caching",
"Analyze slow query logs",
"Scale resources if needed"
],
confidence=0.8,
tags=['performance', 'optimization']
)
return None
def _check_unusual_usage(self, data: Dict[str, Any]) -> Optional[Insight]:
"""Check for unusual usage patterns"""
usage = data.get('usage_analysis', {})
peak_hour_data = usage.get('peak_hour', {})
peak_value = peak_hour_data.get('value', 0)
low_hour_data = usage.get('low_hour', {})
low_value = low_hour_data.get('value', 0)
if peak_value > 0 and low_value > 0:
ratio = peak_value / low_value
if ratio > 10: # Peak is 10x higher than low
return Insight(
insight_id=f"usage_pattern_{int(datetime.now().timestamp())}",
type=InsightType.USAGE_PATTERN,
severity=InsightSeverity.LOW,
title="Significant Usage Variation Detected",
description=f"Usage varies significantly throughout the day (ratio: {ratio:.1f}x)",
data={
'peak_hour': peak_hour_data.get('hour'),
'peak_value': peak_value,
'low_hour': low_hour_data.get('hour'),
'low_value': low_value,
'ratio': ratio
},
recommendations=[
"Consider auto-scaling based on usage patterns",
"Implement load balancing for peak hours",
"Review resource allocation",
"Consider usage-based pricing models"
],
confidence=0.7,
tags=['usage', 'optimization']
)
return None
def _check_resource_exhaustion(self, data: Dict[str, Any]) -> Optional[Insight]:
"""Check for resource exhaustion signs"""
system_data = data.get('system_metrics', {})
cpu_usage = system_data.get('cpu_percent', 0)
memory_usage = system_data.get('memory_percent', 0)
if cpu_usage > 80 or memory_usage > 85:
severity = InsightSeverity.CRITICAL if (cpu_usage > 95 or memory_usage > 95) else InsightSeverity.HIGH
return Insight(
insight_id=f"resource_exhaustion_{int(datetime.now().timestamp())}",
type=InsightType.SYSTEM_HEALTH,
severity=severity,
title="High Resource Utilization",
description=f"System resources are heavily utilized (CPU: {cpu_usage:.1f}%, Memory: {memory_usage:.1f}%)",
data={
'cpu_usage': cpu_usage,
'memory_usage': memory_usage,
'threshold_cpu': 80,
'threshold_memory': 85
},
recommendations=[
"Scale up system resources",
"Optimize resource-intensive operations",
"Implement resource monitoring alerts",
"Review memory leaks and CPU bottlenecks"
],
confidence=0.95,
tags=['resources', 'scaling', 'critical']
)
return None
def _check_security_anomaly(self, data: Dict[str, Any]) -> Optional[Insight]:
"""Check for security anomalies"""
security_data = data.get('security_metrics', {})
failed_auth_rate = security_data.get('failed_auth_rate', 0)
unusual_access_patterns = security_data.get('unusual_access_patterns', [])
if failed_auth_rate > 10: # More than 10% failed auth rate
return Insight(
insight_id=f"security_anomaly_{int(datetime.now().timestamp())}",
type=InsightType.SECURITY_ANOMALY,
severity=InsightSeverity.HIGH,
title="High Authentication Failure Rate",
description=f"Authentication failure rate is {failed_auth_rate:.2f}%",
data={
'failed_auth_rate': failed_auth_rate,
'unusual_patterns': unusual_access_patterns
},
recommendations=[
"Review authentication logs",
"Implement account lockout policies",
"Check for brute force attacks",
"Enable multi-factor authentication"
],
confidence=0.85,
tags=['security', 'authentication']
)
return None
class InsightsEngine:
"""Main insights engine that coordinates all analytics components"""
def __init__(self):
self.data_collector = DataCollector()
self.pattern_analyzer = PatternAnalyzer()
self.insight_generator = InsightGenerator()
self.insights_history: List[Insight] = []
async def analyze_system_data(self,
time_window_hours: int = 24) -> Dict[str, Any]:
"""Perform comprehensive system analysis"""
end_time = datetime.now()
start_time = end_time - timedelta(hours=time_window_hours)
# Get metrics for analysis
metrics = self.data_collector.get_metrics(
start_time=start_time,
end_time=end_time
)
if not metrics:
return {
'status': 'no_data',
'message': 'No metrics available for analysis'
}
# Perform different types of analysis
analysis_results = {}
# Usage pattern analysis
api_metrics = [m for m in metrics if m.name == 'api_response_time']
if api_metrics:
analysis_results['usage_analysis'] = self.pattern_analyzer.analyze_usage_patterns(api_metrics)
# Performance analysis
query_metrics = [m for m in metrics if m.name == 'query_execution_time']
if query_metrics:
analysis_results['performance_analysis'] = self.pattern_analyzer.analyze_performance_trends(query_metrics)
# System metrics analysis
system_metrics = [m for m in metrics if m.name.startswith('system_')]
if system_metrics:
analysis_results['system_metrics'] = self._aggregate_system_metrics(system_metrics)
# Generate insights
insights = self.insight_generator.generate_insights(analysis_results)
self.insights_history.extend(insights)
return {
'status': 'success',
'analysis_period': {
'start_time': start_time.isoformat(),
'end_time': end_time.isoformat(),
'hours': time_window_hours
},
'metrics_analyzed': len(metrics),
'analysis_results': analysis_results,
'insights': [self._insight_to_dict(insight) for insight in insights],
'insights_count': len(insights)
}
def _aggregate_system_metrics(self, metrics: List[AnalyticsMetric]) -> Dict[str, Any]:
"""Aggregate system metrics"""
aggregated = {}
# Group by metric name
grouped = defaultdict(list)
for metric in metrics:
metric_name = metric.name.replace('system_', '')
grouped[metric_name].append(metric.value)
# Calculate aggregates
for metric_name, values in grouped.items():
aggregated[metric_name] = statistics.mean(values)
return aggregated
def _insight_to_dict(self, insight: Insight) -> Dict[str, Any]:
"""Convert insight to dictionary"""
return {
'insight_id': insight.insight_id,
'type': insight.type.value,
'severity': insight.severity.value,
'title': insight.title,
'description': insight.description,
'data': insight.data,
'recommendations': insight.recommendations,
'confidence': insight.confidence,
'created_at': insight.created_at.isoformat(),
'tags': insight.tags
}
async def get_dashboard_data(self) -> Dict[str, Any]:
"""Get data for analytics dashboard"""
recent_insights = [
self._insight_to_dict(insight)
for insight in self.insights_history[-10:] # Last 10 insights
]
# Insight summary
insight_counts = Counter(insight.severity.value for insight in self.insights_history)
insight_types = Counter(insight.type.value for insight in self.insights_history)
# Recent metrics summary
recent_metrics = self.data_collector.get_metrics(
start_time=datetime.now() - timedelta(hours=1)
)
return {
'summary': {
'total_insights': len(self.insights_history),
'critical_insights': insight_counts.get('critical', 0),
'high_insights': insight_counts.get('high', 0),
'recent_metrics_count': len(recent_metrics)
},
'insight_distribution': {
'by_severity': dict(insight_counts),
'by_type': dict(insight_types)
},
'recent_insights': recent_insights,
'health_status': self._calculate_health_status()
}
def _calculate_health_status(self) -> str:
"""Calculate overall system health status"""
recent_critical = sum(
1 for insight in self.insights_history[-20:]
if insight.severity == InsightSeverity.CRITICAL
)
recent_high = sum(
1 for insight in self.insights_history[-20:]
if insight.severity == InsightSeverity.HIGH
)
if recent_critical > 0:
return "critical"
elif recent_high > 2:
return "warning"
elif recent_high > 0:
return "caution"
else:
return "healthy"
def add_metric(self, metric: AnalyticsMetric):
"""Add a metric for analysis"""
self.data_collector.add_metric(metric)
def get_insights(self,
insight_type: Optional[InsightType] = None,
severity: Optional[InsightSeverity] = None,
limit: int = 50) -> List[Dict[str, Any]]:
"""Get insights with filters"""
filtered_insights = self.insights_history
if insight_type:
filtered_insights = [i for i in filtered_insights if i.type == insight_type]
if severity:
filtered_insights = [i for i in filtered_insights if i.severity == severity]
# Sort by creation time (newest first)
filtered_insights.sort(key=lambda x: x.created_at, reverse=True)
return [
self._insight_to_dict(insight)
for insight in filtered_insights[:limit]
]
# Global insights engine instance
_insights_engine: Optional[InsightsEngine] = None
def get_insights_engine() -> InsightsEngine:
"""Get global insights engine instance"""
global _insights_engine
if _insights_engine is None:
_insights_engine = InsightsEngine()
return _insights_engine
def setup_analytics() -> InsightsEngine:
"""Setup and configure analytics engine"""
global _insights_engine
_insights_engine = InsightsEngine()
return _insights_engine