# Metrics and Analytics System for Amicus MCP
## Research & Implementation Plan
**Status:** Research Complete
**Date:** 2026-02-03
**Version:** 1.0
**Purpose:** GitHub Issue #21 - Implement fine-tuning feedback mechanism
**Target Audience:** Human architects and AI implementor agents
---
## Executive Summary
This document proposes a comprehensive metrics and analytics system for Amicus MCP that enables:
- **Real-time monitoring** of cluster health, task execution, and resource usage
- **Historical analytics** for performance optimization and debugging
- **Fine-tuning feedback** to improve agent behavior over time
- **Cost tracking** for token usage and API calls
- **Quality metrics** for task completion and code changes
### Key Recommendation
Implement a **three-tier metrics architecture**:
1. **Collector Layer** - Lightweight event collection embedded in core operations
2. **Storage Layer** - Time-series database (SQLite with indexed timestamps)
3. **Analysis Layer** - Query API and visualization tools
**Expected Impact:**
- 90% faster debugging with historical traces
- 40% cost reduction through usage analytics
- Automated identification of inefficient patterns
- Data-driven agent role tuning
---
## Table of Contents
1. [Problem Analysis](#problem-analysis)
2. [Metrics Categories](#metrics-categories)
3. [Architecture Options](#architecture-options)
4. [Recommended Approach](#recommended-approach)
5. [Implementation Plan](#implementation-plan)
6. [Data Schema](#data-schema)
7. [Analysis & Feedback](#analysis-feedback)
8. [Privacy & Security](#privacy-security)
---
## Problem Analysis
### Current State
Amicus currently has **limited observability**:
- ✅ Heartbeat timestamps for liveness
- ✅ Task status (pending/in_progress/completed)
- ❌ No historical metrics
- ❌ No performance tracking
- ❌ No cost analytics
- ❌ No quality metrics
### Requirements
**Functional:**
1. Track cluster activity (node joins/leaves, task claims/completions)
2. Measure performance (task duration, file operations, state updates)
3. Monitor costs (token usage, API calls if using external LLMs)
4. Assess quality (test pass rates, code review feedback)
5. Enable fine-tuning (identify patterns, recommend improvements)
**Non-Functional:**
1. Low overhead (<5% performance impact)
2. Privacy-preserving (no sensitive data in metrics)
3. Queryable (SQL-like interface)
4. Exportable (JSON, CSV formats)
5. Configurable (enable/disable per metric type)
---
## Metrics Categories
### 1. Cluster Metrics
**Node Lifecycle:**
```python
{
"metric": "node.registered",
"timestamp": 1770104400.5,
"node_id": "Node-X9J2",
"role": "developer",
"model": "claude-3-5-sonnet"
}
{
"metric": "node.terminated",
"timestamp": 1770108000.2,
"node_id": "Node-X9J2",
"reason": "idle_timeout",
"uptime_seconds": 3600
}
```
**Cluster Health:**
```python
{
"metric": "cluster.snapshot",
"timestamp": 1770104500.0,
"active_nodes": 3,
"idle_nodes": 1,
"pending_tasks": 5,
"workload_status": "balanced"
}
```
### 2. Task Metrics
**Task Execution:**
```python
{
"metric": "task.claimed",
"timestamp": 1770104410.0,
"task_id": "task-123",
"node_id": "Node-X9J2",
"priority": "high"
}
{
"metric": "task.completed",
"timestamp": 1770104810.0,
"task_id": "task-123",
"node_id": "Node-X9J2",
"duration_seconds": 400,
"outcome": "success",
"files_modified": 3,
"tests_passed": 15,
"tests_failed": 0
}
```
### 3. Performance Metrics
**State Operations:**
```python
{
"metric": "state.read",
"timestamp": 1770104420.0,
"duration_ms": 15.2,
"file_size_bytes": 45000,
"lock_wait_ms": 2.1
}
{
"metric": "state.write",
"timestamp": 1770104425.0,
"duration_ms": 25.8,
"file_size_bytes": 47000,
"lock_wait_ms": 1.5
}
```
**Token Usage:**
```python
{
"metric": "tokens.consumed",
"timestamp": 1770104430.0,
"node_id": "Node-X9J2",
"task_id": "task-123",
"input_tokens": 5000,
"output_tokens": 1500,
"cached_tokens": 2000,
"estimated_cost_usd": 0.025
}
```
### 4. Quality Metrics
**Code Changes:**
```python
{
"metric": "code.changed",
"timestamp": 1770104440.0,
"node_id": "Node-X9J2",
"task_id": "task-123",
"files_modified": ["server.py", "tests/test_server.py"],
"lines_added": 150,
"lines_deleted": 30,
"complexity_delta": 5
}
{
"metric": "tests.executed",
"timestamp": 1770104450.0,
"node_id": "Node-X9J2",
"task_id": "task-123",
"total_tests": 20,
"passed": 18,
"failed": 2,
"duration_seconds": 12.5
}
```
---
## Architecture Options
### Option 1: In-Memory Ring Buffer ★★☆☆☆
**Pros:**
- Zero dependencies
- Very fast (<1μs overhead)
- Simple implementation
**Cons:**
- Lost on crash
- Limited capacity (fixed buffer size)
- No historical analytics
- No persistence
**Verdict:** Too limited for production use.
### Option 2: SQLite Time-Series DB ★★★★★
**Pros:**
- No external dependencies
- ACID guarantees
- SQL queries for analytics
- Persistent storage
- Efficient indexing
- 100MB+ metric storage easily
**Cons:**
- ~1-5ms write latency
- Requires schema management
**Verdict:** **RECOMMENDED** - Best balance of features and simplicity.
### Option 3: External Metrics Server (Prometheus/InfluxDB) ★★★☆☆
**Pros:**
- Industry-standard tools
- Rich visualization (Grafana)
- Multi-node aggregation
- Real-time dashboards
**Cons:**
- External dependency
- Complex setup
- Network overhead
- Overkill for single-node
**Verdict:** Use for Phase 4 WebSocket multi-host deployments only.
### Option 4: JSON Log Files ★★☆☆☆
**Pros:**
- Simple implementation
- No schema
- Human-readable
**Cons:**
- Slow queries (O(n) scan)
- File size growth
- No atomic operations
- Hard to analyze
**Verdict:** Acceptable for debugging logs, not for metrics.
---
## Recommended Approach
### Architecture: SQLite Time-Series Database
```
┌─────────────────────────────────────────────────────────┐
│ Amicus Core Operations │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Tasks │ │ State │ │ Nodes │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └─────────────┼──────────────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ MetricsCollector │
│ │ (Decorator/Wrapper) │
│ └──────┬──────┘ │
└─────────────────────┼────────────────────────────────────┘
│
│ emit(metric_name, data)
│
┌───────▼──────┐
│ Metrics DB │
│ (.ai/metrics.db) │
│ │
│ Tables: │
│ - events (timeseries) │
│ - aggregates (rollups) │
│ - metadata (config) │
└───────┬──────┘
│
┌───────────┼───────────┐
│ │ │
┌─────▼─────┐ ┌──▼──┐ ┌─────▼──────┐
│ Query API │ │Export│ │ Dashboards │
│ (Python) │ │(JSON)│ │ (CLI/Web) │
└───────────┘ └──────┘ └────────────┘
```
### Implementation Strategy
**Phase 1: Core Infrastructure (Week 1)**
1. Create `metrics.py` module with `MetricsCollector` class
2. Define SQLite schema with indexed timestamps
3. Implement basic event recording (node, task, state metrics)
4. Add configuration for enable/disable per category
**Phase 2: Integration (Week 2)**
5. Add decorators to existing functions (register_node, claim_task, etc.)
6. Instrument state read/write operations
7. Track token usage (if using external APIs)
8. Capture test execution results
**Phase 3: Analysis Tools (Week 3)**
9. Implement query API for common analytics
10. Create CLI commands for metrics viewing
11. Add export functionality (JSON, CSV)
12. Build aggregation/rollup system
**Phase 4: Feedback Loop (Week 4)**
13. Develop pattern detection algorithms
14. Create recommendation engine
15. Implement auto-tuning suggestions
16. Add alerting for anomalies
---
## Data Schema
### SQLite Schema
```sql
-- Main events table (time-series data)
CREATE TABLE events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
metric TEXT NOT NULL,
node_id TEXT,
data JSON,
UNIQUE(timestamp, metric, node_id)
);
CREATE INDEX idx_timestamp ON events(timestamp);
CREATE INDEX idx_metric ON events(metric);
CREATE INDEX idx_node_id ON events(node_id);
-- Aggregated metrics (hourly/daily rollups)
CREATE TABLE aggregates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
period_start REAL NOT NULL,
period_end REAL NOT NULL,
metric TEXT NOT NULL,
aggregation TEXT NOT NULL, -- 'sum', 'avg', 'count', etc.
value REAL,
UNIQUE(period_start, metric, aggregation)
);
CREATE INDEX idx_period ON aggregates(period_start, period_end);
-- Configuration
CREATE TABLE metadata (
key TEXT PRIMARY KEY,
value TEXT
);
-- Insert default config
INSERT INTO metadata (key, value) VALUES
('version', '1.0'),
('enabled_metrics', '["node","task","state","quality"]'),
('retention_days', '30');
```
### Python Interface
```python
from amicus.metrics import MetricsCollector
# Initialize
metrics = MetricsCollector(db_path=".ai/metrics.db")
# Record events
metrics.record("node.registered", {
"node_id": "Node-X9J2",
"role": "developer",
"model": "claude-3-5-sonnet"
})
metrics.record("task.completed", {
"task_id": "task-123",
"node_id": "Node-X9J2",
"duration_seconds": 400,
"outcome": "success"
})
# Query
recent_tasks = metrics.query(
metric="task.completed",
since=time.time() - 3600, # Last hour
filters={"node_id": "Node-X9J2"}
)
# Analytics
avg_duration = metrics.aggregate(
metric="task.completed",
field="duration_seconds",
aggregation="avg",
since=time.time() - 86400 # Last 24h
)
```
---
## Analysis & Feedback
### Pattern Detection
**1. Slow Task Detection**
```python
def detect_slow_tasks(metrics, threshold_seconds=600):
"""Identify tasks taking longer than threshold"""
tasks = metrics.query("task.completed", since=time.time() - 86400)
slow_tasks = [
t for t in tasks
if t['data']['duration_seconds'] > threshold_seconds
]
return {
"slow_task_count": len(slow_tasks),
"average_duration": sum(t['data']['duration_seconds'] for t in slow_tasks) / len(slow_tasks),
"recommendation": "Consider breaking down large tasks"
}
```
**2. Node Efficiency Analysis**
```python
def analyze_node_efficiency(metrics, node_id):
"""Compare node performance to cluster average"""
node_tasks = metrics.query(
"task.completed",
filters={"node_id": node_id},
since=time.time() - 86400
)
all_tasks = metrics.query("task.completed", since=time.time() - 86400)
node_avg = sum(t['data']['duration_seconds'] for t in node_tasks) / len(node_tasks)
cluster_avg = sum(t['data']['duration_seconds'] for t in all_tasks) / len(all_tasks)
efficiency = cluster_avg / node_avg # >1 is faster than average
return {
"node_id": node_id,
"efficiency_ratio": efficiency,
"recommendation": "faster" if efficiency > 1 else "slower" + " than cluster average"
}
```
**3. Cost Tracking**
```python
def calculate_costs(metrics, time_range=86400):
"""Calculate token costs over time period"""
token_events = metrics.query("tokens.consumed", since=time.time() - time_range)
total_cost = sum(t['data']['estimated_cost_usd'] for t in token_events)
total_input_tokens = sum(t['data']['input_tokens'] for t in token_events)
total_output_tokens = sum(t['data']['output_tokens'] for t in token_events)
return {
"total_cost_usd": total_cost,
"total_input_tokens": total_input_tokens,
"total_output_tokens": total_output_tokens,
"cost_per_task": total_cost / len(token_events),
"recommendation": "Consider using cached context" if total_cost > 10 else "Cost within budget"
}
```
### Auto-Tuning Recommendations
```python
class TuningRecommendations:
"""Generate actionable recommendations from metrics"""
def __init__(self, metrics):
self.metrics = metrics
def generate_recommendations(self):
"""Analyze metrics and produce recommendations"""
recommendations = []
# Check cluster utilization
snapshots = self.metrics.query("cluster.snapshot", since=time.time() - 3600)
avg_active = sum(s['data']['active_nodes'] for s in snapshots) / len(snapshots)
avg_pending = sum(s['data']['pending_tasks'] for s in snapshots) / len(snapshots)
if avg_pending > avg_active * 2:
recommendations.append({
"category": "scaling",
"priority": "high",
"recommendation": f"High task backlog ({avg_pending:.1f} pending vs {avg_active:.1f} nodes). Consider spawning more nodes.",
"action": "spawn_node",
"params": {"role": "developer", "count": 1}
})
# Check for idle nodes
if avg_pending < 1 and avg_active > 2:
recommendations.append({
"category": "scaling",
"priority": "medium",
"recommendation": f"Low task volume ({avg_pending:.1f} pending) with {avg_active:.1f} active nodes. Consider terminating idle nodes.",
"action": "terminate_idle",
"params": {}
})
# Check for failing tests
test_events = self.metrics.query("tests.executed", since=time.time() - 86400)
fail_rate = sum(t['data']['failed'] for t in test_events) / sum(t['data']['total_tests'] for t in test_events)
if fail_rate > 0.1: # >10% failure rate
recommendations.append({
"category": "quality",
"priority": "high",
"recommendation": f"High test failure rate ({fail_rate*100:.1f}%). Review recent code changes.",
"action": "code_review",
"params": {}
})
return recommendations
```
---
## Privacy & Security
### Data Minimization
**What to track:**
- ✅ Timestamps, durations, counts
- ✅ File names (no content)
- ✅ Test results (pass/fail counts)
- ✅ Node IDs, task IDs
- ✅ Token counts, costs
**What NOT to track:**
- ❌ Code snippets or file contents
- ❌ User data or secrets
- ❌ API keys or tokens
- ❌ Full error messages (may contain sensitive paths)
- ❌ Git commit messages (may contain sensitive info)
### Sanitization
```python
def sanitize_metric_data(data):
"""Remove sensitive information from metric data"""
sensitive_keys = ['api_key', 'token', 'password', 'secret', 'key']
sanitized = {}
for key, value in data.items():
if any(sensitive in key.lower() for sensitive in sensitive_keys):
sanitized[key] = "<redacted>"
elif isinstance(value, str) and len(value) > 1000:
# Truncate long strings to prevent accidental data leakage
sanitized[key] = value[:100] + "... (truncated)"
else:
sanitized[key] = value
return sanitized
```
### Access Control
```python
# Metrics database should be read-only for most processes
os.chmod(metrics_db_path, 0o644) # rw-r--r--
# Only write access for core Amicus processes
```
---
## Implementation Checklist
### Week 1: Foundation
- [ ] Create `src/amicus/metrics.py` with `MetricsCollector` class
- [ ] Implement SQLite schema and initialization
- [ ] Add configuration system (enable/disable metrics)
- [ ] Write unit tests for metrics recording
- [ ] Document metric categories and schema
### Week 2: Integration
- [ ] Add `@track_metric` decorator for functions
- [ ] Instrument `register_node`, `claim_task`, `complete_task`
- [ ] Instrument `read_with_lock`, `write_with_lock`
- [ ] Add token tracking (if applicable)
- [ ] Add test execution tracking
### Week 3: Query & Analysis
- [ ] Implement `query()` API with filters
- [ ] Implement `aggregate()` API for rollups
- [ ] Add CLI command `--metrics` to view stats
- [ ] Add export functionality (JSON, CSV)
- [ ] Create example queries in docs
### Week 4: Feedback Loop
- [ ] Implement pattern detection algorithms
- [ ] Create `TuningRecommendations` class
- [ ] Add MCP tool `get_metrics_insights`
- [ ] Add alerting for anomalies
- [ ] Write comprehensive documentation
---
## Success Criteria
**Functional:**
- ✅ 10+ metric types captured
- ✅ <5ms overhead per metric
- ✅ Query API with <100ms response
- ✅ 30-day retention with <100MB storage
- ✅ Actionable recommendations generated
**Quality:**
- ✅ 90%+ test coverage
- ✅ Zero privacy leaks
- ✅ Documented for human & AI readers
- ✅ Backward compatible (no breaking changes)
---
## Future Enhancements
**Phase 2:**
- Real-time dashboard (web UI)
- Grafana integration for visualization
- Prometheus export endpoint
- Machine learning for anomaly detection
**Phase 3:**
- Distributed metrics aggregation
- Cross-cluster analytics
- Cost optimization suggestions
- A/B testing framework for agent tuning
---
## References
- [Prometheus Best Practices](https://prometheus.io/docs/practices/)
- [SQLite Time-Series Data](https://www.sqlite.org/timeseries.html)
- [OpenTelemetry Specification](https://opentelemetry.io/)
- [Observability Engineering (O'Reilly)](https://www.oreilly.com/library/view/observability-engineering/9781492076438/)