"""
Metrics collection and analytics for Amicus MCP.
Provides lightweight event tracking, time-series storage, and analytics
for cluster monitoring, performance optimization, and fine-tuning feedback.
"""
import json
import sqlite3
import time
from pathlib import Path
from typing import Dict, Any, List, Optional
from contextlib import contextmanager
class MetricsCollector:
"""
Time-series metrics collector using SQLite.
Tracks cluster activity, task execution, performance, and quality metrics.
"""
def __init__(self, db_path: Path):
"""
Initialize metrics collector.
Args:
db_path: Path to metrics SQLite database
"""
self.db_path = db_path
self._ensure_schema()
def _ensure_schema(self):
"""Create database schema if it doesn't exist"""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with self._get_conn() as conn:
# Main events table
conn.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
metric TEXT NOT NULL,
node_id TEXT,
data TEXT
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON events(timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_metric ON events(metric)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_node_id ON events(node_id)")
# Aggregates table
conn.execute("""
CREATE TABLE IF NOT EXISTS aggregates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
period_start REAL NOT NULL,
period_end REAL NOT NULL,
metric TEXT NOT NULL,
aggregation TEXT NOT NULL,
value REAL,
UNIQUE(period_start, metric, aggregation)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_period ON aggregates(period_start, period_end)")
# Metadata table
conn.execute("""
CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
value TEXT
)
""")
# Insert default config if not exists
conn.execute("""
INSERT OR IGNORE INTO metadata (key, value) VALUES
('version', '1.0'),
('enabled', 'true'),
('retention_days', '30')
""")
@contextmanager
def _get_conn(self):
"""Get database connection with context manager"""
conn = sqlite3.connect(self.db_path)
try:
yield conn
conn.commit()
finally:
conn.close()
def record(self, metric: str, data: Dict[str, Any], node_id: Optional[str] = None):
"""
Record a metric event.
Args:
metric: Metric name (e.g., "node.registered", "task.completed")
data: Metric data dictionary
node_id: Optional node ID
"""
# Check if metrics are enabled
if not self.is_enabled():
return
timestamp = time.time()
data_json = json.dumps(self._sanitize_data(data))
with self._get_conn() as conn:
conn.execute(
"INSERT INTO events (timestamp, metric, node_id, data) VALUES (?, ?, ?, ?)",
(timestamp, metric, node_id, data_json)
)
def query(
self,
metric: Optional[str] = None,
since: Optional[float] = None,
until: Optional[float] = None,
node_id: Optional[str] = None,
limit: int = 1000
) -> List[Dict[str, Any]]:
"""
Query metric events.
Args:
metric: Filter by metric name
since: Unix timestamp for start of range
until: Unix timestamp for end of range
node_id: Filter by node ID
limit: Maximum number of results
Returns:
List of metric events
"""
query = "SELECT timestamp, metric, node_id, data FROM events WHERE 1=1"
params = []
if metric:
query += " AND metric = ?"
params.append(metric)
if since:
query += " AND timestamp >= ?"
params.append(since)
if until:
query += " AND timestamp <= ?"
params.append(until)
if node_id:
query += " AND node_id = ?"
params.append(node_id)
query += " ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
with self._get_conn() as conn:
cursor = conn.execute(query, params)
results = []
for row in cursor.fetchall():
results.append({
'timestamp': row[0],
'metric': row[1],
'node_id': row[2],
'data': json.loads(row[3]) if row[3] else {}
})
return results
def aggregate(
self,
metric: str,
field: str,
aggregation: str = "avg",
since: Optional[float] = None,
until: Optional[float] = None
) -> float:
"""
Aggregate a numeric field across metric events.
Args:
metric: Metric name
field: Field name to aggregate
aggregation: Aggregation function (avg, sum, min, max, count)
since: Unix timestamp for start of range
until: Unix timestamp for end of range
Returns:
Aggregated value
"""
events = self.query(metric=metric, since=since, until=until, limit=10000)
values = []
for event in events:
if field in event['data']:
try:
values.append(float(event['data'][field]))
except (ValueError, TypeError):
pass
if not values:
return 0.0
if aggregation == "avg":
return sum(values) / len(values)
elif aggregation == "sum":
return sum(values)
elif aggregation == "min":
return min(values)
elif aggregation == "max":
return max(values)
elif aggregation == "count":
return len(values)
else:
raise ValueError(f"Unknown aggregation: {aggregation}")
def cleanup(self, retention_days: int = 30):
"""
Remove old metrics beyond retention period.
Args:
retention_days: Number of days to retain
"""
cutoff = time.time() - (retention_days * 86400)
with self._get_conn() as conn:
cursor = conn.execute("DELETE FROM events WHERE timestamp < ?", (cutoff,))
deleted_count = cursor.rowcount
return deleted_count
def is_enabled(self) -> bool:
"""Check if metrics collection is enabled"""
with self._get_conn() as conn:
cursor = conn.execute("SELECT value FROM metadata WHERE key = 'enabled'")
row = cursor.fetchone()
return row and row[0] == 'true'
def set_enabled(self, enabled: bool):
"""Enable or disable metrics collection"""
value = 'true' if enabled else 'false'
with self._get_conn() as conn:
conn.execute("UPDATE metadata SET value = ? WHERE key = 'enabled'", (value,))
def get_stats(self) -> Dict[str, Any]:
"""Get database statistics"""
with self._get_conn() as conn:
cursor = conn.execute("SELECT COUNT(*) FROM events")
total_events = cursor.fetchone()[0]
cursor = conn.execute("SELECT MIN(timestamp), MAX(timestamp) FROM events")
row = cursor.fetchone()
min_ts, max_ts = row[0], row[1]
cursor = conn.execute("SELECT COUNT(DISTINCT metric) FROM events")
unique_metrics = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(DISTINCT node_id) FROM events WHERE node_id IS NOT NULL")
unique_nodes = cursor.fetchone()[0]
return {
'total_events': total_events,
'unique_metrics': unique_metrics,
'unique_nodes': unique_nodes,
'earliest_event': min_ts,
'latest_event': max_ts,
'time_range_hours': (max_ts - min_ts) / 3600 if min_ts and max_ts else 0,
'db_size_bytes': self.db_path.stat().st_size if self.db_path.exists() else 0
}
def _sanitize_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Remove sensitive information from metric data"""
sensitive_keys = ['api_key', 'token', 'password', 'secret', 'key']
sanitized = {}
for key, value in data.items():
# Redact sensitive keys
if any(sensitive in key.lower() for sensitive in sensitive_keys):
sanitized[key] = "<redacted>"
# Truncate long strings
elif isinstance(value, str) and len(value) > 1000:
sanitized[key] = value[:100] + "... (truncated)"
else:
sanitized[key] = value
return sanitized
# Singleton instance
_metrics_collector = None
def get_metrics_collector(db_path: Optional[Path] = None) -> MetricsCollector:
"""Get or create global metrics collector instance"""
global _metrics_collector
if _metrics_collector is None:
if db_path is None:
from .core import get_context_bus_dir
db_path = get_context_bus_dir() / "metrics.db"
_metrics_collector = MetricsCollector(db_path)
return _metrics_collector
# Convenience functions
def record_metric(metric: str, data: Dict[str, Any], node_id: Optional[str] = None):
"""Convenience function to record a metric"""
collector = get_metrics_collector()
collector.record(metric, data, node_id)