"""MCP server exposing TL DPS analysis as tools."""
from __future__ import annotations
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import FastMCP
from dps_logs.event_store import DuckDBEventStore, EventStoreError
from server import DEFAULT_SAMPLE_PATH, analyze_logs_with_events
app = FastMCP("tl-dps-mcp")
EVENT_STORE = DuckDBEventStore()
ALLOWED_BUCKET_SECONDS = (1, 2, 5, 10, 15, 30, 60)
RUNS_COLUMNS = [
"run_id",
"total_hits",
"total_damage",
"duration_seconds",
"dps",
"crit_rate_pct",
"last_ts",
]
def _resolve_log_path(log_dir: Optional[str]) -> Path:
if log_dir:
return Path(log_dir).expanduser()
return DEFAULT_SAMPLE_PATH
def _clamp_int(value: int, *, min_value: int, max_value: int) -> int:
return max(min_value, min(max_value, value))
def _coerce_bucket_seconds(value: int) -> int:
return value if value in ALLOWED_BUCKET_SECONDS else 5
def _quote(value: str) -> str:
return value.replace("'", "''")
@app.tool()
async def analyze_dps_logs(
log_dir: Optional[str] = None,
limit_runs: Optional[int] = None,
) -> Dict[str, Any]:
"""Return the DPS summary payload for TL logs."""
if limit_runs is not None and limit_runs <= 0:
raise ValueError("limit_runs must be positive when provided")
input_path = _resolve_log_path(log_dir)
payload, event_rows = analyze_logs_with_events(
input_path,
limit_runs=limit_runs,
progress_callback=_emit_progress,
)
_emit_progress("Inserting events into DuckDB…")
EVENT_STORE.refresh(event_rows)
_emit_progress("DuckDB refresh complete")
return payload
@app.tool()
async def get_events_schema() -> Dict[str, Any]:
"""Return the DuckDB events schema exposed for ad-hoc queries."""
return EVENT_STORE.get_schema_payload()
@app.tool()
async def query_dps(sql: str) -> Dict[str, Any]:
"""Execute a safe SELECT-only query against the in-memory events table."""
try:
result = EVENT_STORE.query(sql)
except EventStoreError as exc: # pragma: no cover - surfaced to caller
raise ValueError(str(exc)) from exc
return result.to_payload()
@app.tool()
async def get_analysis_packet(
run_id: Any = "last",
last_n_runs: int = 10,
top_k_skills: int = 10,
bucket_seconds: int = 5,
) -> Dict[str, Any]:
"""Return a coach-ready analysis packet using only safe SELECT templates."""
last_n_runs = _clamp_int(int(last_n_runs), min_value=1, max_value=50)
top_k_skills = _clamp_int(int(top_k_skills), min_value=1, max_value=50)
bucket_seconds = _coerce_bucket_seconds(int(bucket_seconds))
runs_sql = (
"SELECT run_id, COUNT(*) AS total_hits, SUM(damage) AS total_damage, "
"ROUND(CAST(EXTRACT(EPOCH FROM (MAX(ts) - MIN(ts))) AS DOUBLE), 2) AS duration_seconds, "
"ROUND(SUM(damage) / NULLIF(CAST(EXTRACT(EPOCH FROM (MAX(ts) - MIN(ts))) AS DOUBLE), 0), 1) AS dps, "
"ROUND(100.0 * SUM(CASE WHEN is_crit THEN 1 ELSE 0 END) / COUNT(*), 1) AS crit_rate_pct, "
"MAX(ts) AS last_ts "
"FROM events WHERE event_type = 'DamageDone' AND damage > 0 "
"GROUP BY run_id ORDER BY last_ts DESC LIMIT {limit}"
).format(limit=last_n_runs)
runs_payload = _run_query(runs_sql)
selected_run_id = _select_run_id(run_id, runs_payload)
if not selected_run_id:
raise ValueError("No runs available to build analysis packet.")
return _build_analysis_packet(
selected_run_id,
last_n_runs=last_n_runs,
top_k_skills=top_k_skills,
bucket_seconds=bucket_seconds,
)
def main() -> None:
app.run()
def _emit_progress(message: str) -> None:
if not message:
return
print(message, file=sys.stderr, flush=True)
if __name__ == "__main__":
main()
# ---------------------------------------------------------------------------
# Internal helpers
def _run_query(sql: str) -> Dict[str, Any]:
result = EVENT_STORE.query(sql)
return result.to_payload()
def _rows_to_runs_dicts(payload: Any) -> List[Dict[str, Any]]:
"""Coerce runs payload (columns/rows or list of dicts) to list[dict]."""
if isinstance(payload, list): # Already a list of dicts
return [
{
"run_id": row.get("run_id"),
"duration_seconds": row.get("duration_seconds"),
"total_damage": row.get("total_damage"),
"dps": row.get("dps"),
"total_hits": row.get("total_hits"),
"crit_rate_pct": row.get("crit_rate_pct"),
"last_ts": row.get("last_ts"),
}
for row in payload
if isinstance(row, dict)
]
rows = (payload or {}).get("rows") or []
columns = (payload or {}).get("columns") or []
def _row_to_dict(row: Any) -> Dict[str, Any]:
if isinstance(row, dict):
return {
"run_id": row.get("run_id"),
"duration_seconds": row.get("duration_seconds"),
"total_damage": row.get("total_damage"),
"dps": row.get("dps"),
"total_hits": row.get("total_hits"),
"crit_rate_pct": row.get("crit_rate_pct"),
"last_ts": row.get("last_ts"),
}
mapping: Dict[str, Any] = {}
for idx, col_name in enumerate(columns):
mapping[col_name] = row[idx] if isinstance(row, (list, tuple)) and idx < len(row) else None
# Fallback positional mapping if columns are missing or incomplete
positional_keys = [
("run_id", 0),
("total_hits", 1),
("total_damage", 2),
("duration_seconds", 3),
("dps", 4),
("crit_rate_pct", 5),
("last_ts", 6),
]
for key, idx in positional_keys:
if key not in mapping:
mapping[key] = row[idx] if isinstance(row, (list, tuple)) and idx < len(row) else None
return {
"run_id": mapping.get("run_id"),
"duration_seconds": mapping.get("duration_seconds"),
"total_damage": mapping.get("total_damage"),
"dps": mapping.get("dps"),
"total_hits": mapping.get("total_hits"),
"crit_rate_pct": mapping.get("crit_rate_pct"),
"last_ts": mapping.get("last_ts"),
}
return [_row_to_dict(row) for row in rows]
def _select_run_id(run_spec: Any, runs_payload: Dict[str, Any]) -> Optional[str]:
rows = runs_payload.get("rows") or []
if not rows:
return None
# Default to latest
if run_spec is None or str(run_spec).lower() == "last":
return rows[0][0]
try:
idx = int(run_spec)
except Exception:
return rows[0][0]
idx = max(1, idx)
if idx <= len(rows):
return rows[idx - 1][0]
return rows[0][0]
def _compute_windows(
run_id_sql: str, bucket_seconds: int
) -> Dict[str, Any]:
"""Compute early/late/top damage windows."""
# Get timeline buckets
timeline_sql = (
"WITH fight_start AS ("
" SELECT MIN(ts) AS t0 FROM events WHERE event_type='DamageDone' AND damage>0 AND run_id='{run_id}'"
") "
"SELECT "
" CAST(FLOOR(EXTRACT(EPOCH FROM (e.ts - f.t0)) / {bucket}) * {bucket} AS INTEGER) AS bucket_s, "
" SUM(e.damage) AS damage "
"FROM events e, fight_start f "
"WHERE e.event_type='DamageDone' AND e.damage>0 AND e.run_id='{run_id}' "
"GROUP BY bucket_s ORDER BY bucket_s ASC"
).format(bucket=bucket_seconds, run_id=run_id_sql)
timeline_result = _run_query(timeline_sql)
timeline_rows = timeline_result.get("rows") or []
if not timeline_rows:
return {
"early_window": {},
"late_window": {},
"top_damage_windows": []
}
# Early window: 0-60s
early_buckets = [r for r in timeline_rows if (r[0] if len(r) > 0 else 999) < 60]
early_damage = sum(r[1] for r in early_buckets if len(r) > 1)
# Late window: last 60s
max_time = max(r[0] for r in timeline_rows if len(r) > 0)
late_start = max(0, max_time - 60)
late_buckets = [r for r in timeline_rows if (r[0] if len(r) > 0 else 0) >= late_start]
late_damage = sum(r[1] for r in late_buckets if len(r) > 1)
# Top 3 damage windows
sorted_buckets = sorted(timeline_rows, key=lambda r: r[1] if len(r) > 1 else 0, reverse=True)
top_windows = []
for bucket in sorted_buckets[:3]:
bucket_s = bucket[0] if len(bucket) > 0 else 0
damage = bucket[1] if len(bucket) > 1 else 0
# Get top skills in this bucket
bucket_skills_sql = (
"WITH fight_start AS ("
" SELECT MIN(ts) AS t0 FROM events WHERE event_type='DamageDone' AND damage>0 AND run_id='{run_id}'"
") "
"SELECT skill_name, SUM(damage) AS damage "
"FROM events e, fight_start f "
"WHERE e.event_type='DamageDone' AND e.damage>0 AND e.run_id='{run_id}' "
" AND CAST(FLOOR(EXTRACT(EPOCH FROM (e.ts - f.t0)) / {bucket}) * {bucket} AS INTEGER) = {bucket_s} "
"GROUP BY skill_name ORDER BY damage DESC LIMIT 3"
).format(run_id=run_id_sql, bucket=bucket_seconds, bucket_s=bucket_s)
skills_result = _run_query(bucket_skills_sql)
top_skills = [row[0] for row in (skills_result.get("rows") or [])]
top_windows.append({
"start_s": bucket_s,
"end_s": bucket_s + bucket_seconds,
"damage": damage,
"top_skills": top_skills
})
return {
"early_window": {
"start_s": 0,
"end_s": 60,
"damage": early_damage,
"top_skills": [] # Could add per-skill breakdown if needed
},
"late_window": {
"start_s": late_start,
"end_s": max_time + bucket_seconds,
"damage": late_damage,
"top_skills": []
},
"top_damage_windows": top_windows
}
def _compute_actions(
skill_deltas: Dict[str, Any],
skill_efficiency: Dict[str, Any]
) -> Dict[str, Any]:
"""Compute top action levers from deltas and efficiency."""
delta_rows = skill_deltas.get("rows") or []
eff_rows = skill_efficiency.get("rows") or []
levers = []
# Lever 1: Biggest fall-off
falloffs = [r for r in delta_rows if r[3] < -2.0]
if falloffs:
skill = falloffs[0][0]
delta_share = falloffs[0][3]
levers.append(f"Restore {skill} usage ({delta_share:.1f}pp fall-off)")
# Lever 2: Low efficiency skills to optimize
if eff_rows and len(eff_rows) > 1:
low_eff = eff_rows[-1][0]
levers.append(f"Optimize or drop {low_eff} (low efficiency)")
# Lever 3: Consistency
levers.append("Standardize rotation for consistency")
return {
"top_levers": levers
}
def _compute_skill_deltas(
run_id_sql: str, last_n_runs: int, top_k_skills: int
) -> Dict[str, Any]:
"""Compare last run skills vs average of prior runs."""
# Get last run total damage for share calculation
last_run_total_sql = (
"SELECT SUM(damage) FROM events "
"WHERE event_type='DamageDone' AND damage>0 AND run_id='{run_id}'"
).format(run_id=run_id_sql)
last_total_result = _run_query(last_run_total_sql)
last_total_damage = float(last_total_result["rows"][0][0] or 0) if last_total_result.get("rows") else 0.0
# Get last run skills
last_skills_sql = (
"SELECT skill_name, SUM(damage) AS damage, COUNT(*) AS hits, "
"ROUND(100.0 * SUM(CASE WHEN is_crit THEN 1 ELSE 0 END) / COUNT(*), 1) AS crit_rate_pct "
"FROM events WHERE event_type='DamageDone' AND damage>0 AND run_id='{run_id}' "
"GROUP BY skill_name"
).format(run_id=run_id_sql)
last_skills_result = _run_query(last_skills_sql)
# Get prior runs (all but last)
prior_runs_sql = (
"SELECT run_id FROM ("
" SELECT run_id, MAX(ts) AS last_ts FROM events "
" WHERE event_type='DamageDone' AND damage>0 "
" GROUP BY run_id ORDER BY last_ts DESC LIMIT {limit}"
") WHERE run_id != '{current_run}'"
).format(limit=last_n_runs, current_run=run_id_sql)
prior_runs_result = _run_query(prior_runs_sql)
prior_run_ids = [row[0] for row in (prior_runs_result.get("rows") or [])]
if not prior_run_ids:
return {"columns": [], "rows": [], "notes": ["No prior runs for comparison"]}
prior_ids_list = ",".join(f"'{rid}'" for rid in prior_run_ids)
# Get prior runs aggregated skills
prior_skills_sql = (
"SELECT skill_name, "
"AVG(run_damage) AS avg_damage, AVG(run_hits) AS avg_hits, "
"AVG(run_crit_rate) AS avg_crit_rate, AVG(run_damage_share) AS avg_damage_share "
"FROM ("
" SELECT run_id, skill_name, SUM(damage) AS run_damage, COUNT(*) AS run_hits, "
" ROUND(100.0 * SUM(CASE WHEN is_crit THEN 1 ELSE 0 END) / COUNT(*), 1) AS run_crit_rate, "
" 100.0 * SUM(damage) / NULLIF((SELECT SUM(damage) FROM events e2 WHERE e2.run_id=e.run_id AND e2.event_type='DamageDone' AND e2.damage>0), 0) AS run_damage_share "
" FROM events e WHERE event_type='DamageDone' AND damage>0 AND run_id IN ({prior_ids}) "
" GROUP BY run_id, skill_name"
") GROUP BY skill_name"
).format(prior_ids=prior_ids_list)
prior_skills_result = _run_query(prior_skills_sql)
# Build delta rows
last_skills_map = {}
for row in (last_skills_result.get("rows") or []):
skill, dmg, hits, crit = row[0], float(row[1] or 0), int(row[2] or 0), float(row[3] or 0)
share = round((dmg * 100.0) / last_total_damage, 2) if last_total_damage > 0 else 0.0
last_skills_map[skill] = {"damage": dmg, "hits": hits, "crit_rate": crit, "share": share}
prior_skills_map = {}
for row in (prior_skills_result.get("rows") or []):
skill = row[0]
prior_skills_map[skill] = {
"avg_damage": float(row[1] or 0),
"avg_hits": float(row[2] or 0),
"avg_crit_rate": float(row[3] or 0),
"avg_share": float(row[4] or 0),
}
all_skills = set(last_skills_map.keys()) | set(prior_skills_map.keys())
delta_rows = []
for skill in all_skills:
last = last_skills_map.get(skill, {"damage": 0, "hits": 0, "crit_rate": 0, "share": 0})
prior = prior_skills_map.get(skill, {"avg_damage": 0, "avg_hits": 0, "avg_crit_rate": 0, "avg_share": 0})
delta_share = round(last["share"] - prior["avg_share"], 2)
delta_hits = int(last["hits"] - prior["avg_hits"])
delta_crit = round(last["crit_rate"] - prior["avg_crit_rate"], 1)
delta_rows.append([
skill,
last["share"],
prior["avg_share"],
delta_share,
last["hits"],
int(prior["avg_hits"]),
delta_hits,
last["crit_rate"],
prior["avg_crit_rate"],
delta_crit,
])
# Sort by biggest negative share delta (fall-offs first)
delta_rows.sort(key=lambda r: r[3])
return {
"columns": [
"skill_name", "last_share_pct", "prior_avg_share_pct", "delta_share_pp",
"last_hits", "prior_avg_hits", "delta_hits",
"last_crit_rate_pct", "prior_avg_crit_rate_pct", "delta_crit_pp"
],
"rows": delta_rows[:10], # Top 10 biggest deltas
}
def _build_analysis_packet(
run_id: str,
*,
last_n_runs: int,
top_k_skills: int,
bucket_seconds: int,
) -> Dict[str, Any]:
notes: List[str] = []
run_id_sql = _quote(run_id)
runs_sql = (
"SELECT run_id, COUNT(*) AS total_hits, SUM(damage) AS total_damage, "
"ROUND(CAST(EXTRACT(EPOCH FROM (MAX(ts) - MIN(ts))) AS DOUBLE), 2) AS duration_seconds, "
"ROUND(SUM(damage) / NULLIF(CAST(EXTRACT(EPOCH FROM (MAX(ts) - MIN(ts))) AS DOUBLE), 0), 1) AS dps, "
"ROUND(100.0 * SUM(CASE WHEN is_crit THEN 1 ELSE 0 END) / COUNT(*), 1) AS crit_rate_pct, "
"MAX(ts) AS last_ts "
"FROM events WHERE event_type = 'DamageDone' AND damage > 0 "
"GROUP BY run_id ORDER BY last_ts DESC LIMIT {limit}"
).format(limit=last_n_runs)
runs_payload = _rows_to_runs_dicts(_run_query(runs_sql))
run_summary_sql = (
"SELECT run_id, COUNT(*) AS total_hits, SUM(damage) AS total_damage, "
"ROUND(CAST(EXTRACT(EPOCH FROM (MAX(ts) - MIN(ts))) AS DOUBLE), 2) AS duration_seconds, "
"ROUND(SUM(damage) / NULLIF(CAST(EXTRACT(EPOCH FROM (MAX(ts) - MIN(ts))) AS DOUBLE), 0), 1) AS dps, "
"ROUND(100.0 * SUM(CASE WHEN is_crit THEN 1 ELSE 0 END) / COUNT(*), 1) AS crit_rate_pct "
"FROM events WHERE event_type = 'DamageDone' AND damage > 0 AND run_id = '{run_id}' "
"GROUP BY run_id"
).format(run_id=run_id_sql)
run_summary = _run_query(run_summary_sql)
total_damage = 0.0
if run_summary.get("rows"):
try:
total_damage = float(run_summary["rows"][0][2] or 0)
except Exception:
total_damage = 0.0
else:
notes.append("No damage events found for selected run")
top_skills_sql = (
"SELECT skill_name, SUM(damage) AS total_damage, COUNT(*) AS total_hits, "
"ROUND(AVG(damage), 2) AS avg_damage, "
"ROUND(100.0 * SUM(CASE WHEN is_crit THEN 1 ELSE 0 END) / COUNT(*), 1) AS crit_rate_pct "
"FROM events WHERE event_type='DamageDone' AND damage>0 AND run_id = '{run_id}' "
"GROUP BY skill_name ORDER BY total_damage DESC LIMIT {limit}"
).format(run_id=run_id_sql, limit=top_k_skills)
top_skills = _run_query(top_skills_sql)
if total_damage > 0 and top_skills.get("rows"):
updated_rows = []
for row in top_skills["rows"]:
dmg = float(row[1] or 0)
share = round((dmg * 100.0) / total_damage, 2) if total_damage else 0.0
updated_rows.append(row + [share])
top_skills["columns"] = list(top_skills.get("columns") or []) + ["damage_share_pct"]
top_skills["rows"] = updated_rows
skill_eff_sql = (
"SELECT skill_name, COUNT(*) AS total_hits, SUM(damage) AS total_damage, "
"ROUND(AVG(damage), 2) AS avg_damage_per_hit, "
"SUM(CASE WHEN is_crit THEN 1 ELSE 0 END) AS crit_hits, "
"ROUND(100.0 * SUM(CASE WHEN is_crit THEN 1 ELSE 0 END) / COUNT(*), 1) AS crit_rate_pct "
"FROM events WHERE event_type='DamageDone' AND damage>0 AND run_id = '{run_id}' "
"GROUP BY skill_name ORDER BY avg_damage_per_hit DESC LIMIT {limit}"
).format(run_id=run_id_sql, limit=top_k_skills)
skill_efficiency = _run_query(skill_eff_sql)
# Timeline with fight-relative bucket times
timeline_sql = (
"WITH fight_start AS ("
" SELECT MIN(ts) AS t0 FROM events WHERE event_type='DamageDone' AND damage>0 AND run_id='{run_id}'"
") "
"SELECT "
" CAST(FLOOR(EXTRACT(EPOCH FROM (e.ts - f.t0)) / {bucket}) * {bucket} AS INTEGER) AS bucket_s, "
" COUNT(*) AS hits, "
" SUM(CASE WHEN e.is_crit THEN 1 ELSE 0 END) AS crit_hits, "
" ROUND(100.0 * SUM(CASE WHEN e.is_crit THEN 1 ELSE 0 END) / COUNT(*), 1) AS crit_rate_pct, "
" SUM(e.damage) AS damage "
"FROM events e, fight_start f "
"WHERE e.event_type='DamageDone' AND e.damage>0 AND e.run_id='{run_id}' "
"GROUP BY bucket_s ORDER BY bucket_s ASC LIMIT 200"
).format(bucket=bucket_seconds, run_id=run_id_sql)
timeline = _run_query(timeline_sql)
# Skill deltas: compare last run vs prior runs average
skill_deltas = _compute_skill_deltas(run_id_sql, last_n_runs, top_k_skills)
# Windows analysis: early/late/top damage windows
windows = _compute_windows(run_id_sql, bucket_seconds)
# Action levers: computed from deltas and efficiency
actions = _compute_actions(skill_deltas, skill_efficiency)
packet = {
"meta": {
"run_id": run_id,
"generated_at": datetime.utcnow().isoformat() + "Z",
"limits": {
"last_n_runs": last_n_runs,
"top_k_skills": top_k_skills,
"bucket_seconds": bucket_seconds,
},
},
"run_summary": run_summary,
"runs_last_n": runs_payload,
"top_skills": top_skills,
"skill_efficiency": skill_efficiency,
"timeline": timeline,
"skill_deltas": skill_deltas,
"windows": windows,
"actions": actions,
"notes": notes,
}
return packet