"""Deterministic SQL-first local coach powered by llama-cpp."""
from __future__ import annotations
import json
import os
from uuid import uuid4
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
from .constants import (
COACH_CONTEXT_RECENT_RUNS,
COACH_CONTEXT_TOP_SKILLS,
COACH_MAX_RESPONSE_TOKENS,
COACH_MAX_TOOL_CALLS,
COACH_REPEAT_PENALTY,
COACH_STOP_SEQUENCES,
COACH_SYSTEM_PROMPT,
COACH_TEMPERATURE,
COACH_TOP_P,
)
from .model_manager import LocalModelManager
QueryCallback = Callable[[str], Dict[str, Any]]
FALLBACK_SQL = (
"SELECT skill_name, SUM(damage) AS total_damage, COUNT(*) AS total_hits,"
" SUM(CASE WHEN is_crit THEN 1 ELSE 0 END) AS crit_hits"
" FROM events WHERE event_type = 'DamageDone'"
" GROUP BY skill_name ORDER BY total_damage DESC LIMIT 5"
)
class SQLCoach:
"""Plans a single SQL call (at most) and summarizes the result."""
def __init__(
self,
model_manager: LocalModelManager,
*,
max_tool_calls: int = COACH_MAX_TOOL_CALLS,
max_tokens: int = COACH_MAX_RESPONSE_TOKENS,
temperature: float = COACH_TEMPERATURE,
top_p: float = COACH_TOP_P,
repeat_penalty: float = COACH_REPEAT_PENALTY,
stop_sequences: Sequence[str] = COACH_STOP_SEQUENCES,
) -> None:
self._model_manager = model_manager
self._max_tool_calls = max_tool_calls
self._max_tokens = max_tokens
self._temperature = temperature
self._top_p = top_p
self._repeat_penalty = repeat_penalty
self._stop_sequences = list(stop_sequences)
# Stable session ID for trace tracking
self._coach_session_id = uuid4().hex
# State for conversational follow-ups
self._state: Dict[str, Any] = {
"last_intent": None,
"last_run_id": None,
"last_focus_skill": None,
"last_skill_deltas": None,
}
def answer(
self,
question: str,
payload: Dict[str, Any],
schema: Dict[str, Any],
query_callback: QueryCallback,
*,
analysis_callback: Callable[[], Dict[str, Any]] | None = None,
) -> Tuple[str, List[Dict[str, Any]]]:
tool_trace: List[Dict[str, Any]] = []
# Detect intent and route accordingly
intent = self._detect_intent(question)
# GREETING intent: no tool calls, no model inference
if intent == "GREETING":
response = (
"Hello! I'm DPS Coach (Qwen2.5-7B Q4_K_M). "
"Ask me about your combat performance—e.g., 'Improve DPS', "
"'Why did crits drop?', 'Crit rate trend?', or 'Which skill to focus?'."
)
return response, tool_trace
# META intent: capability card, no tool calls
if intent == "META":
return self._capability_card(), tool_trace
# DATA intents need analysis packet
use_legacy = os.getenv("DPSCOACH_USE_LEGACY_SQL") == "1"
if not use_legacy and analysis_callback is not None:
packet = analysis_callback()
tool_trace.append(self._format_analysis_trace(packet))
# Update state for conversational follow-ups
meta = packet.get("meta") or {}
self._state["last_run_id"] = meta.get("run_id")
# Route based on specialized intent
if intent == "FOLLOWUP_WHY":
rendered = self._render_followup_why(packet, question)
self._state["last_intent"] = "FOLLOWUP_WHY"
elif intent == "ACTION_PLAN":
rendered = self._render_action_plan(packet, question)
self._state["last_intent"] = "ACTION_PLAN"
elif intent == "SPIKE_ANALYSIS":
rendered = self._render_spike_analysis(packet, question)
self._state["last_intent"] = "SPIKE_ANALYSIS"
elif intent == "FRONTLOAD":
rendered = self._render_frontload(packet, question)
self._state["last_intent"] = "FRONTLOAD"
elif intent == "FOLLOWUP_MORE":
rendered = self._render_followup_more(packet, question)
self._state["last_intent"] = "FOLLOWUP_MORE"
elif intent == "SKILL_DELTA":
rendered = self._render_skill_delta(packet, question)
# Store state for follow-ups
self._state["last_intent"] = "SKILL_DELTA"
elif intent == "CRIT_BUCKET_TREND":
rendered = self._render_crit_bucket_trend(packet, question)
self._state["last_intent"] = "CRIT_BUCKET_TREND"
elif intent == "SKILLS":
rendered = self._render_skills_focus(packet, question)
self._state["last_intent"] = "SKILLS"
elif intent == "RUNS":
rendered = self._render_runs_analysis(packet, question)
self._state["last_intent"] = "RUNS"
elif intent == "REPORT":
rendered = self._render_full_report(packet, question)
self._state["last_intent"] = "REPORT"
else: # DEFAULT
rendered = self._render_from_packet(packet, question)
self._state["last_intent"] = "DEFAULT"
return rendered, tool_trace
# Legacy SQL path (unchanged)
context_json = self._context_json(payload, schema)
plan_messages = self._plan_messages(question, context_json)
plan_response = self._call_model(plan_messages)
plan_intent, detail = self._parse_plan(plan_response)
if plan_intent == "answer" and detail:
return detail, tool_trace
if plan_intent == "sql" and detail and self._max_tool_calls > 0:
sql = detail
result = query_callback(sql)
trace_entry = self._format_trace(sql, result)
trace_entry["coach_session_id"] = self._coach_session_id
tool_trace.append(trace_entry)
answer_messages = self._answer_messages(question, context_json, sql, result)
answer_raw = self._call_model(answer_messages)
answer_text = self._parse_answer(answer_raw)
if answer_text:
return answer_text, tool_trace
fallback = self._fallback_from_result(result, reason="Model response missing ANSWER prefix.")
return fallback, tool_trace
fallback_result = query_callback(FALLBACK_SQL)
trace_entry = self._format_trace(FALLBACK_SQL, fallback_result, fallback=True)
trace_entry["coach_session_id"] = self._coach_session_id
tool_trace.append(trace_entry)
return self._fallback_from_result(fallback_result, reason="Using deterministic fallback."), tool_trace
def _detect_intent(self, question: str) -> str:
"""Detect question intent to route to specialized renderer.
Returns one of: GREETING, META, FOLLOWUP_WHY, ACTION_PLAN, SPIKE_ANALYSIS,
FRONTLOAD, CRIT_BUCKET_TREND, SKILL_DELTA, SKILLS, RUNS, REPORT, DEFAULT.
"""
text = (question or "").strip().lower()
if not text:
return "DEFAULT"
# GREETING: friendly test messages
greeting_keywords = ("hello", "hi", "test", "hey", "greetings")
if any(text.startswith(kw) or text == kw for kw in greeting_keywords):
return "GREETING"
# META: help/capability questions
meta_keywords = (
"help",
"what can you do",
"how does this work",
"examples",
"capabilities",
"how to",
)
if any(text.startswith(kw) or text == kw for kw in meta_keywords):
return "META"
# FOLLOWUP_WHY: follow-up about top fall-off skill
followup_keywords = ("why", "that skill", "top fall-off", "biggest", "explain")
if self._state.get("last_intent") == "SKILL_DELTA" and any(kw in text for kw in followup_keywords):
if "skill" in text or "fall" in text or "drop" in text:
return "FOLLOWUP_WHY"
# ACTION_PLAN: top 3 changes request
if "action plan" in text:
return "ACTION_PLAN"
action_keywords = ("top 3", "three changes", "3 changes", "what should i")
if any(kw in text for kw in action_keywords) and ("change" in text or "next" in text or "improve" in text):
return "ACTION_PLAN"
# SPIKE_ANALYSIS: damage spike detection
spike_keywords = ("spike", "burst", "peak damage", "highest damage")
if any(kw in text for kw in spike_keywords) and "damage" in text:
return "SPIKE_ANALYSIS"
# FRONTLOAD: front-load damage analysis
frontload_keywords = ("front-load", "frontload", "opener", "opening", "reorder rotation")
if any(kw in text for kw in frontload_keywords) or ("early" in text and "rotation" in text):
return "FRONTLOAD"
# FOLLOWUP_MORE: request for additional insights
more_keywords = ("anything else", "what else", "more insights", "any other insights", "more recommendations")
if any(kw in text for kw in more_keywords):
return "FOLLOWUP_MORE"
# SKILL_DELTA: skill fall-offs or declines
delta_keywords = ("fell off", "dropped", "decline", "down", "worse", "degraded")
if "skill" in text and any(kw in text for kw in delta_keywords):
return "SKILL_DELTA"
# REPORT: full coaching report
report_keywords = ("report", "full review", "audit", "coach me", "full analysis")
if any(kw in text for kw in report_keywords):
return "REPORT"
# CRIT_BUCKET_TREND: crit rate analysis by time bucket
crit_keywords = ("crit", "crit rate", "trend", "bucket")
if any(kw in text for kw in crit_keywords):
return "CRIT_BUCKET_TREND"
# SKILLS: top skills or damage by skill
skills_keywords = ("skill", "damage", "top", "which skill", "rotation")
if any(kw in text for kw in skills_keywords):
return "SKILLS"
# RUNS: run comparison or best run
runs_keywords = ("run", "best run", "improve", "better", "previous")
if any(kw in text for kw in runs_keywords):
return "RUNS"
# DEFAULT: generic summary
return "DEFAULT"
@staticmethod
def _is_meta_question(question: str) -> bool:
"""Legacy compatibility; prefer _detect_intent."""
return SQLCoach._detect_intent(question) == "META"
@staticmethod
def _safe_float(value: Any) -> Optional[float]:
try:
return float(value)
except (TypeError, ValueError):
return None
@staticmethod
def _coerce_runs_list(runs_last_n: Any) -> List[Dict[str, Any]]:
"""Normalize runs_last_n to list[dict] with stable keys."""
expected = [
"run_id",
"total_hits",
"total_damage",
"duration_seconds",
"dps",
"crit_rate_pct",
"last_ts",
]
if isinstance(runs_last_n, list):
return [
{k: row.get(k) for k in expected}
for row in runs_last_n
if isinstance(row, dict)
]
if isinstance(runs_last_n, dict):
rows = runs_last_n.get("rows") or []
columns = runs_last_n.get("columns") or []
def _row_to_dict(row: Any) -> Dict[str, Any]:
if isinstance(row, dict):
return {k: row.get(k) for k in expected}
mapping: Dict[str, Any] = {}
for idx, col_name in enumerate(columns):
mapping[col_name] = row[idx] if isinstance(row, (list, tuple)) and idx < len(row) else None
positional = [
("run_id", 0),
("total_hits", 1),
("total_damage", 2),
("duration_seconds", 3),
("dps", 4),
("crit_rate_pct", 5),
("last_ts", 6),
]
for key, idx in positional:
if key not in mapping:
mapping[key] = row[idx] if isinstance(row, (list, tuple)) and idx < len(row) else None
return {k: mapping.get(k) for k in expected}
return [_row_to_dict(r) for r in rows]
return []
@staticmethod
def _capability_card() -> str:
return (
"DPS Coach (Qwen2.5-7B Q4_K_M)\n"
"Capability card:\n"
"Capabilities:\n"
"- Parses TL combat logs via safe MCP templates (no direct SQL).\n"
"- Summarizes runs, skills, crits, timeline buckets, and top actions.\n"
"- Use prompts like: 'Improve DPS', 'Why did crits drop?', 'Best run today?'.\n"
"- Legacy SQL mode available with DPSCOACH_USE_LEGACY_SQL=1 for debugging."
)
def _render_from_packet(self, packet: Dict[str, Any], question: str) -> str:
meta = packet.get("meta") or {}
limits = meta.get("limits") or {}
run_id = meta.get("run_id", "?")
run_summary = packet.get("run_summary") or {}
runs_last_n = packet.get("runs_last_n") or []
top_skills = packet.get("top_skills") or {}
skill_eff = packet.get("skill_efficiency") or {}
timeline = packet.get("timeline") or {}
notes = packet.get("notes") or []
insights = self._build_insights(run_summary, top_skills, skill_eff, timeline)
evidence_lines = self._build_evidence(run_summary, runs_last_n, top_skills, timeline, limits)
actions = self._build_actions(top_skills, skill_eff)
next_qs = self._build_next_questions(question)
preview = self._render_table(top_skills, max_rows=5)
notes_text = "\n".join(f"- Note: {item}" for item in notes if item)
sections = [
"Insights:",
*insights,
"",
"Evidence:",
*evidence_lines,
"",
"Actions:",
*actions,
"",
"Next questions:",
*next_qs,
"",
"Top skills preview (run_id={}):".format(run_id),
preview,
]
if notes_text:
sections.extend(["", notes_text])
return "\n".join(sections)
def _render_crit_bucket_trend(self, packet: Dict[str, Any], question: str) -> str:
"""Analyze crit rates across time buckets with noise control."""
MIN_HITS_FOR_BUCKET = 10
TREND_THRESHOLD_PP = 2.0
timeline = packet.get("timeline") or {}
run_summary = packet.get("run_summary") or {}
meta = packet.get("meta") or {}
rows = run_summary.get("rows") or []
run_row = rows[0] if rows else []
run_id = run_row[0] if len(run_row) > 0 else "?"
overall_crit_pct = run_row[5] if len(run_row) > 5 else None
overall_crit_str = self._format_number(overall_crit_pct)
timeline_rows = timeline.get("rows") or []
if not timeline_rows:
return f"Run {run_id}: No timeline data available. Overall crit rate: {overall_crit_str}%."
# Parse timeline: bucket_s, hits, crit_hits, crit_rate_pct, damage
bucket_data = []
for row in timeline_rows:
bucket_s = row[0] if len(row) > 0 else 0
hits = row[1] if len(row) > 1 else 0
crit_hits = row[2] if len(row) > 2 else 0
crit_rate_pct = row[3] if len(row) > 3 else 0.0
damage = row[4] if len(row) > 4 else 0
bucket_data.append({
"bucket_s": bucket_s,
"hits": hits,
"crit_hits": crit_hits,
"crit_rate_pct": float(crit_rate_pct) if crit_rate_pct else 0.0,
"damage": damage,
})
# Filter significant buckets only
significant = [b for b in bucket_data if b["hits"] >= MIN_HITS_FOR_BUCKET]
if not significant:
return (
f"Run {run_id}: All buckets have <{MIN_HITS_FOR_BUCKET} hits (noise). "
f"Overall crit rate: {overall_crit_str}%."
)
# Analyze trend with significant buckets
early = significant[:10] if len(significant) >= 10 else significant[:len(significant)//2]
late = significant[-10:] if len(significant) >= 10 else significant[len(significant)//2:]
early_avg = sum(b["crit_rate_pct"] for b in early) / len(early) if early else 0.0
late_avg = sum(b["crit_rate_pct"] for b in late) / len(late) if late else 0.0
overall_crit_numeric = float(overall_crit_pct) if overall_crit_pct is not None else 0.0
delta = late_avg - early_avg
if abs(delta) < TREND_THRESHOLD_PP:
trend_dir = "→ stable"
trend_desc = f"{trend_dir} (early: {early_avg:.1f}% ≈ late: {late_avg:.1f}%)"
elif delta > 0:
trend_dir = "↑ rising"
trend_desc = f"{trend_dir} (early: {early_avg:.1f}% → late: {late_avg:.1f}%, +{delta:.1f}pp)"
else:
trend_dir = "↓ falling"
trend_desc = f"{trend_dir} (early: {early_avg:.1f}% → late: {late_avg:.1f}%, {delta:.1f}pp)"
peak_bucket = max(significant, key=lambda x: x["crit_rate_pct"]) if significant else None
peak_info = (
f"Peak crit: {self._format_mm_ss(peak_bucket['bucket_s'])} "
f"({peak_bucket['crit_rate_pct']}%, {peak_bucket['crit_hits']}/{peak_bucket['hits']} hits)"
if peak_bucket
else "No peak detected"
)
# Sample representative buckets (not "+61 more")
sample_interval = max(1, len(significant) // 12)
sampled = significant[::sample_interval][:12]
# Build table preview
table_lines = [" Time | Hits | Crits | Crit% | Trend"]
table_lines.append("-" * 45)
for b in sampled:
trend_marker = "▲" if b["crit_rate_pct"] > overall_crit_numeric + 1.0 else "▼" if b["crit_rate_pct"] < overall_crit_numeric - 1.0 else "─"
table_lines.append(
f"{self._format_mm_ss(b['bucket_s']):>6} | {b['hits']:>4} | {b['crit_hits']:>5} | {b['crit_rate_pct']:>5.1f}% | {trend_marker}"
)
sections = [
"Crit Rate Timeline:",
f"- Run {run_id}: overall {overall_crit_str}% crits, {len(timeline_rows)} buckets ({len(significant)} significant).",
f"- Trend: {trend_desc}",
f"- {peak_info}",
"",
"Bucket Details:",
"\n".join(table_lines),
"",
"Next Questions:",
"- Which skill drove the crit spike at peak time?",
"- Any buff uptime gaps in falling phases?",
"- How does early rotation differ from late?",
]
return "\n".join(sections)
def _render_skills_focus(self, packet: Dict[str, Any], question: str) -> str:
"""Analyze skill damage and efficiency."""
top_skills = packet.get("top_skills") or {}
skill_eff = packet.get("skill_efficiency") or {}
run_summary = packet.get("run_summary") or {}
meta = packet.get("meta") or {}
rows = run_summary.get("rows") or []
run_row = rows[0] if rows else []
run_id = run_row[0] if len(run_row) > 0 else "?"
total_dmg = self._format_number(run_row[2] if len(run_row) > 2 else None)
skill_rows = top_skills.get("rows") or []
eff_rows = skill_eff.get("rows") or []
# Build skill damage table
skill_table = ["Skill | Damage | Hits | Avg Hit | % of Run"]
skill_table.append("-" * 50)
for row in skill_rows[:8]:
name = row[0] if len(row) > 0 else "?"
damage = self._format_number(row[1] if len(row) > 1 else 0)
hits = row[2] if len(row) > 2 else 0
avg = self._format_number(row[3] if len(row) > 3 else 0)
pct = self._format_number(row[5] if len(row) > 5 else 0)
skill_table.append(f"{name:>20} | {damage:>10} | {hits:>4} | {avg:>7} | {pct:>6}%")
# Top performer highlights
top_dmg_skill = skill_rows[0][0] if skill_rows else "?"
top_avg_skill = eff_rows[0][0] if eff_rows else "?"
insights = [
f"- Top damage: {top_dmg_skill} (highest share of run).",
f"- Best efficiency: {top_avg_skill} (highest avg hit per cast).",
]
if len(skill_rows) > 1:
insights.append(f"- Rotation backbone: {top_dmg_skill} + {skill_rows[1][0]} combo.")
actions = [
f"- Lean into {top_dmg_skill} in burst windows.",
f"- Use {top_avg_skill} for spike phases.",
"- Tighten rotation timing to maximize skill uptime.",
]
sections = [
"Skill Focus & Efficiency:",
f"- Run {run_id}: {total_dmg} total damage across {len(skill_rows)} skills.",
*insights,
"",
"Top Skills by Damage:",
"\n".join(skill_table),
"",
"Recommended Actions:",
*actions,
"",
"Next Questions:",
"- Which skill had the most misses or downtime?",
"- Should I reorder rotation to front-load damage?",
"- How do my top skills compare to class benchmarks?",
]
return "\n".join(sections)
def _render_runs_analysis(self, packet: Dict[str, Any], question: str) -> str:
"""Analyze run performance trends."""
runs_last_n = packet.get("runs_last_n") or []
run_summary = packet.get("run_summary") or {}
meta = packet.get("meta") or {}
rows = run_summary.get("rows") or []
run_row = rows[0] if rows else []
current_dps_numeric = self._safe_float(run_row[4] if len(run_row) > 4 else None)
current_dps = self._format_number(run_row[4] if len(run_row) > 4 else None)
current_duration = self._format_number(run_row[3] if len(run_row) > 3 else None)
run_rows = self._coerce_runs_list(runs_last_n)
if not run_rows:
return f"Run analysis: No historical runs found. Current run DPS: {current_dps}."
# Extract DPS from each run using dict keys
dps_values = [self._safe_float(row.get("dps")) for row in run_rows]
dps_values = [v for v in dps_values if v is not None and v > 0]
if not dps_values:
return "Run analysis: No DPS data in historical runs."
avg_dps = sum(dps_values) / len(dps_values)
max_dps = max(dps_values)
min_dps = min(dps_values)
current_vs_avg = (
"↑ above average" if current_dps_numeric and current_dps_numeric > avg_dps else "↓ below average"
)
# Build run table: Duration(s), DPS, Total Damage, Crit% (dict-based)
run_table = ["Run # | Duration(s) | DPS | Total Dmg | Crit%"]
run_table.append("-" * 60)
for idx, row in enumerate(run_rows[-8:], 1):
run_id = row.get("run_id") or "?"
duration = self._format_number(row.get("duration_seconds"))
dps_val = self._safe_float(row.get("dps")) or 0
total_dmg = self._format_number(row.get("total_damage"))
crit_pct = self._format_number(row.get("crit_rate_pct"))
status = "★ best" if dps_val == max_dps else "…" if dps_val == min_dps else "✓"
run_table.append(
f"{idx:>5} | {duration:>10} | {int(dps_val):>7,} | {total_dmg:>11} | {crit_pct:>5} {status}"
)
insights = [
f"- Current run: {current_dps} DPS over {current_duration}s, {current_vs_avg}.",
f"- Best run by DPS: {int(max_dps):,} DPS; worst: {int(min_dps):,} DPS.",
f"- Variability: {round((max_dps - min_dps) / avg_dps * 100, 1)}% spread.",
]
actions = [
f"- Replicate the {self._format_number(max_dps)} DPS run rotation.",
"- Identify what caused low DPS runs and avoid.",
"- Focus on consistent positioning and skill timing.",
]
sections = [
"Run Performance Trends:",
f"- Analyzed {len(run_rows)} recent runs; average DPS: {self._format_number(avg_dps)}.",
*insights,
"",
"Recent Run History:",
"\n".join(run_table),
"",
"Improvement Strategy:",
*actions,
"",
"Next Questions:",
"- What made my best run succeed?",
"- Which skills fell off in low DPS runs?",
"- How can I reduce DPS variability?",
]
return "\n".join(sections)
def _render_skill_delta(self, packet: Dict[str, Any], question: str) -> str:
"""Analyze skill fall-offs vs prior runs."""
skill_deltas = packet.get("skill_deltas") or {}
run_summary = packet.get("run_summary") or {}
meta = packet.get("meta") or {}
rows = run_summary.get("rows") or []
run_row = rows[0] if rows else []
run_id = run_row[0] if len(run_row) > 0 else "?"
delta_rows = skill_deltas.get("rows") or []
notes = skill_deltas.get("notes") or []
if notes:
return f"Run {run_id}: {notes[0]}"
if not delta_rows:
return f"Run {run_id}: No skill delta data available."
# Extract fall-offs (negative deltas)
falloffs = [r for r in delta_rows if r[3] < -1.0] # delta_share_pp < -1.0
improvements = [r for r in delta_rows if r[3] > 1.0] # delta_share_pp > 1.0
sections = [
"Skill Performance vs Prior Runs:",
f"- Run {run_id} compared to average of prior runs.",
"",
]
if falloffs:
sections.append("Biggest Fall-Offs:")
for row in falloffs[:5]:
skill = row[0]
delta_share = row[3]
delta_hits = row[6]
delta_crit = row[9]
sections.append(
f"- {skill}: {delta_share:+.1f}pp share ({delta_hits:+d} hits, {delta_crit:+.1f}pp crit)"
)
sections.append("")
if improvements:
sections.append("Improvements:")
for row in improvements[:3]:
skill = row[0]
delta_share = row[3]
sections.append(f"- {skill}: {delta_share:+.1f}pp share (stronger this run)")
sections.append("")
if not falloffs and not improvements:
sections.append("- No significant skill deltas (±1pp threshold).")
sections.append("")
# Store top fall-off skill for potential follow-up
if falloffs:
self._state["last_focus_skill"] = falloffs[0][0]
self._state["last_skill_deltas"] = falloffs[0]
# Detail table
sections.append("Skill Delta Detail:")
table_lines = ["Skill | Last% | Prior% | Δ Share | Δ Hits | Δ Crit"]
table_lines.append("-" * 60)
for row in delta_rows[:8]:
skill = row[0]
last_share = row[1]
prior_share = row[2]
delta_share = row[3]
delta_hits = row[6]
delta_crit = row[9]
table_lines.append(
f"{skill:>20} | {last_share:>5.1f}% | {prior_share:>6.1f}% | {delta_share:>7.1f}pp | {delta_hits:>6d} | {delta_crit:>6.1f}pp"
)
sections.append("\n".join(table_lines))
sections.append("")
sections.extend([
"Next Questions:",
"- Why did the top fall-off skill lose damage share?",
"- Did rotation timing change compared to prior runs?",
"- Should I drop low-performing skills from rotation?",
])
return "\n".join(sections)
def _render_full_report(self, packet: Dict[str, Any], question: str) -> str:
"""Full coaching report with strengths, leaks, and action plan."""
run_summary = packet.get("run_summary") or {}
runs_last_n = packet.get("runs_last_n") or []
top_skills = packet.get("top_skills") or {}
skill_eff = packet.get("skill_efficiency") or {}
timeline = packet.get("timeline") or {}
skill_deltas = packet.get("skill_deltas") or {}
meta = packet.get("meta") or {}
rows = run_summary.get("rows") or []
run_row = rows[0] if rows else []
run_id = run_row[0] if len(run_row) > 0 else "?"
total_hits = run_row[1] if len(run_row) > 1 else 0
total_damage = self._format_number(run_row[2] if len(run_row) > 2 else None)
duration = self._format_number(run_row[3] if len(run_row) > 3 else None)
dps = self._format_number(run_row[4] if len(run_row) > 4 else None)
crit_rate = self._format_number(run_row[5] if len(run_row) > 5 else None)
# Consistency check
run_rows = self._coerce_runs_list(runs_last_n)
dps_values = [self._safe_float(row.get("dps")) for row in run_rows if row.get("dps") is not None]
dps_values = [v for v in dps_values if v is not None]
avg_dps = sum(dps_values) / len(dps_values) if dps_values else 0
max_dps = max(dps_values) if dps_values else 0
min_dps = min(dps_values) if dps_values else 0
variability = round((max_dps - min_dps) / avg_dps * 100, 1) if avg_dps > 0 else 0
current_dps_numeric = float(dps.replace(",", "")) if dps != "–" else 0
# Strengths
strengths = []
if current_dps_numeric > avg_dps:
strengths.append(f"Above-average DPS ({dps} vs avg {self._format_number(avg_dps)})")
if float(crit_rate) > 25:
strengths.append(f"Strong crit rate ({crit_rate}%)")
skill_rows = top_skills.get("rows") or []
if skill_rows and len(skill_rows) > 0:
top_skill = skill_rows[0][0]
top_share = skill_rows[0][5] if len(skill_rows[0]) > 5 else 0
if top_share > 40:
strengths.append(f"{top_skill} carries damage ({self._format_number(top_share)}% share)")
if not strengths:
strengths.append("Consistent rotation maintained")
# Leaks
leaks = []
if variability > 15:
leaks.append(f"High DPS variability ({variability}% spread across runs)")
if float(crit_rate) < 20:
leaks.append(f"Low crit rate ({crit_rate}%)")
# Check for fall-offs
delta_rows = skill_deltas.get("rows") or []
falloffs = [r for r in delta_rows if r[3] < -2.0] # delta > -2pp
if falloffs:
leak_skill = falloffs[0][0]
leak_delta = falloffs[0][3]
leaks.append(f"{leak_skill} fell off ({leak_delta:.1f}pp vs prior runs)")
# Check timeline momentum
timeline_rows = timeline.get("rows") or []
if len(timeline_rows) >= 3:
first_bucket_dmg = timeline_rows[0][4] if len(timeline_rows[0]) > 4 else 0
last_bucket_dmg = timeline_rows[-1][4] if len(timeline_rows[-1]) > 4 else 0
median_dmg = sorted([r[4] for r in timeline_rows if len(r) > 4])[len(timeline_rows)//2] if timeline_rows else 0
if last_bucket_dmg < median_dmg * 0.7:
leaks.append(f"Momentum drop-off (last bucket {self._format_number(last_bucket_dmg)} vs median {self._format_number(median_dmg)})")
if not leaks:
leaks.append("No major leaks detected")
# Top 3 changes
changes = []
if falloffs:
changes.append(f"Restore {falloffs[0][0]} usage (biggest fall-off)")
if float(crit_rate) < 25:
changes.append("Boost crit uptime with buffs/positioning")
if variability > 15:
changes.append("Standardize opener and rotation for consistency")
if len(skill_rows) > 3:
low_skill = skill_rows[-1][0] if skill_rows else "?"
changes.append(f"Drop or optimize {low_skill} (lowest damage share)")
while len(changes) < 3:
changes.append("Maintain current rotation patterns")
sections = [
f"Full Coaching Report: Run {run_id}",
"",
"Summary Stats:",
f"- Total Damage: {total_damage} over {duration}s ({dps} DPS)",
f"- Hits: {total_hits}, Crit Rate: {crit_rate}%",
f"- Consistency: {len(run_rows)} runs analyzed, {variability}% DPS spread",
"",
"Strengths (Data-Backed):",
]
sections.extend(f"- {s}" for s in strengths[:3])
sections.append("")
sections.append("Leaks (Data-Backed):")
sections.extend(f"- {leak}" for leak in leaks[:3])
sections.append("")
sections.append("Top 3 Changes for Next Run:")
sections.extend(f"{i+1}. {change}" for i, change in enumerate(changes[:3]))
sections.append("")
sections.extend([
"Next Questions:",
"- How do I fix the biggest leak?",
"- Which skills should I prioritize in opener?",
"- What caused my best DPS run to succeed?",
])
return "\n".join(sections)
def _render_followup_why(self, packet: Dict[str, Any], question: str) -> str:
"""Explain why the top fall-off skill lost damage share."""
skill_deltas = packet.get("skill_deltas") or {}
run_summary = packet.get("run_summary") or {}
rows = run_summary.get("rows") or []
run_row = rows[0] if rows else []
run_id = run_row[0] if len(run_row) > 0 else "?"
delta_rows = skill_deltas.get("rows") or []
if not delta_rows:
return f"Run {run_id}: No skill delta data to analyze."
# Get top fall-off skill (most negative delta)
falloffs = [r for r in delta_rows if r[3] < -1.0]
if not falloffs:
return f"Run {run_id}: No significant skill fall-offs detected (all deltas > -1.0pp)."
focus = falloffs[0]
skill_name = focus[0]
last_share = focus[1]
prior_share = focus[2]
delta_share = focus[3]
last_hits = focus[4]
prior_hits = focus[5]
delta_hits = focus[6]
last_crit = focus[7]
prior_crit = focus[8]
delta_crit = focus[9]
# Store for future follow-ups
self._state["last_focus_skill"] = skill_name
sections = [
f"Why did {skill_name} fall off?",
f"- Run {run_id} vs prior runs comparison:",
"",
"Delta Analysis:",
f"- Damage share: {last_share:.1f}% (last) vs {prior_share:.1f}% (prior avg) = {delta_share:.1f}pp drop",
f"- Hit count: {last_hits} (last) vs {self._format_number(prior_hits)} (prior avg) = {delta_hits:+d} hits",
f"- Crit rate: {last_crit:.1f}% (last) vs {prior_crit:.1f}% (prior avg) = {delta_crit:.1f}pp change",
"",
"Plausible Causes (Hypotheses):",
"1. Rotation timing changed—skill used less frequently or later in fight",
"2. Uptime gap—skill on cooldown during high-damage windows",
"",
"Recommendation:",
f"- Review opener and mid-fight rotation to ensure {skill_name} usage matches prior runs",
f"- Check if {skill_name} cooldown aligns with buff windows",
"",
"Next Questions:",
"- Where did my damage spike and why?",
"- Should I reorder rotation to front-load damage?",
"- Give me top 3 changes next run to increase DPS",
]
return "\n".join(sections)
def _render_action_plan(self, packet: Dict[str, Any], question: str) -> str:
"""Provide top 3 actionable changes backed by numbers."""
skill_deltas = packet.get("skill_deltas") or {}
skill_eff = packet.get("skill_efficiency") or {}
runs_last_n = packet.get("runs_last_n") or {}
top_skills = packet.get("top_skills") or {}
run_summary = packet.get("run_summary") or {}
actions_data = packet.get("actions") or {}
rows = run_summary.get("rows") or []
run_row = rows[0] if rows else []
run_id = run_row[0] if len(run_row) > 0 else "?"
# Extract data for action plan
delta_rows = skill_deltas.get("rows") or []
eff_rows = skill_eff.get("rows") or []
run_rows = self._coerce_runs_list(runs_last_n)
skill_rows = top_skills.get("rows") or []
# Variability
dps_values = [self._safe_float(row.get("dps")) for row in run_rows if row.get("dps") is not None]
dps_values = [v for v in dps_values if v is not None]
avg_dps = sum(dps_values) / len(dps_values) if dps_values else 0
max_dps = max(dps_values) if dps_values else 0
min_dps = min(dps_values) if dps_values else 0
variability = round((max_dps - min_dps) / avg_dps * 100, 1) if avg_dps > 0 else 0
changes = []
# Change 1: Restore biggest fall-off
falloffs = [r for r in delta_rows if r[3] < -2.0]
if falloffs:
skill = falloffs[0][0]
delta_share = falloffs[0][3]
delta_hits = falloffs[0][6]
changes.append(
f"Restore {skill} usage: fell {delta_share:.1f}pp share ({delta_hits:+d} hits vs prior runs)"
)
# Change 2: Boost efficiency
if eff_rows and len(eff_rows) > 1:
low_eff_skill = eff_rows[-1][0]
low_eff_avg = eff_rows[-1][1]
changes.append(
f"Drop or combo {low_eff_skill}: only {self._format_number(low_eff_avg)} avg dmg/hit"
)
# Change 3: Consistency
if variability > 15:
changes.append(
f"Standardize rotation: {variability}% DPS spread indicates inconsistent execution"
)
elif skill_rows and len(skill_rows) > 3:
top_skill = skill_rows[0][0]
top_share = skill_rows[0][5] if len(skill_rows[0]) > 5 else 0
changes.append(
f"Prioritize {top_skill}: already carries {self._format_number(top_share)}% damage share"
)
# Fill to 3 items
while len(changes) < 3:
changes.append("Maintain current rotation and improve positioning")
sections = [
f"Top 3 Changes for Next Run (Run {run_id}):",
"",
]
sections.extend(f"{i+1}. {change}" for i, change in enumerate(changes[:3]))
sections.append("")
sections.extend([
"Next Questions:",
"- Should I reorder rotation to front-load damage?",
"- Where did my damage spike and why?",
"- How do I fix the biggest leak?",
])
return "\n".join(sections)
def _render_spike_analysis(self, packet: Dict[str, Any], question: str) -> str:
"""Analyze damage spikes and attribute to skills."""
timeline = packet.get("timeline") or {}
top_skills = packet.get("top_skills") or {}
run_summary = packet.get("run_summary") or {}
windows_data = packet.get("windows") or {}
rows = run_summary.get("rows") or []
run_row = rows[0] if rows else []
run_id = run_row[0] if len(run_row) > 0 else "?"
timeline_rows = timeline.get("rows") or []
if not timeline_rows:
return f"Run {run_id}: No timeline data for spike analysis."
# Find top damage windows
top_windows = windows_data.get("top_damage_windows") or []
if not top_windows:
# Fallback: compute from timeline
sorted_buckets = sorted(timeline_rows, key=lambda r: r[4] if len(r) > 4 else 0, reverse=True)
top_windows = []
for bucket in sorted_buckets[:3]:
bucket_s = bucket[0] if len(bucket) > 0 else 0
damage = bucket[4] if len(bucket) > 4 else 0
top_windows.append({
"start_s": bucket_s,
"end_s": bucket_s + 5,
"damage": damage,
"top_skills": [] # No per-bucket skill breakdown in basic timeline
})
sections = [
f"Damage Spike Analysis (Run {run_id}):",
"",
"Top Damage Windows:",
]
for i, window in enumerate(top_windows[:3], 1):
start_s = window.get("start_s", 0)
end_s = window.get("end_s", start_s + 5)
damage = window.get("damage", 0)
skills = window.get("top_skills", [])
sections.append(
f"{i}. {self._format_mm_ss(start_s)}–{self._format_mm_ss(end_s)}: "
f"{self._format_number(damage)} damage"
)
if skills:
skill_list = ", ".join(skills[:3])
sections.append(f" Driven by: {skill_list}")
sections.append("")
# Overall top skills context
skill_rows = top_skills.get("rows") or []
if skill_rows:
sections.append("Overall Top Skills:")
for row in skill_rows[:3]:
skill = row[0]
share = row[5] if len(row) > 5 else 0
sections.append(f"- {skill}: {self._format_number(share)}% damage share")
sections.append("")
sections.extend([
"Insight:",
"- Spikes likely caused by burst window skill combos or buff stacking",
"- Compare spike timing to cooldown rotations and buff durations",
"",
"Next Questions:",
"- Should I reorder rotation to front-load damage?",
"- Which skill drove the crit spike at peak time?",
"- Give me top 3 changes next run to increase DPS",
])
return "\n".join(sections)
def _render_followup_more(self, packet: Dict[str, Any], question: str) -> str:
"""Provide one additional insight + one action from packet.
Prioritize: (a) biggest spike window OR (b) downtime gap OR (c) next skill delta.
"""
run_summary = packet.get("run_summary") or {}
windows = packet.get("windows") or {}
skill_deltas = packet.get("skill_deltas") or {}
timeline = packet.get("timeline") or {}
rows = run_summary.get("rows") or []
run_row = rows[0] if rows else []
run_id = run_row[0] if len(run_row) > 0 else "?"
sections = ["Additional Insight:"]
# Priority 1: Biggest spike window
spike_windows = windows.get("top_damage_windows") or []
if spike_windows:
spike = spike_windows[0]
start_s = spike.get("start_s", 0)
end_s = spike.get("end_s", 0)
damage = spike.get("damage", 0)
skills = spike.get("top_skills", [])
sections.append(f"- Biggest spike: {self._format_mm_ss(start_s)}–{self._format_mm_ss(end_s)} ({damage:,} dmg, led by {', '.join(skills[:2])})")
else:
# Priority 2: Downtime gap (biggest drop in timeline damage)
timeline_rows = timeline.get("rows") or []
if len(timeline_rows) >= 2:
damages = [row[4] if len(row) > 4 else 0 for row in timeline_rows]
drops = [(i, damages[i-1] - damages[i]) for i in range(1, len(damages))]
if drops:
max_drop_idx, max_drop = max(drops, key=lambda x: x[1])
bucket_s = timeline_rows[max_drop_idx][0] if len(timeline_rows[max_drop_idx]) > 0 else max_drop_idx * 5
sections.append(f"- Biggest downtime gap: {int(max_drop):,} dmg drop at {bucket_s}s mark")
# Priority 3: Next biggest skill delta
delta_rows = skill_deltas.get("rows") or []
if delta_rows and len(delta_rows) > 1:
falloffs = sorted([r for r in delta_rows if r[3] < -1.0], key=lambda x: x[3])
if falloffs:
skill = falloffs[0][0]
delta_share = falloffs[0][3]
sections.append(f"- Secondary fall-off: {skill} dropped {delta_share:.1f}pp share (consider rotating earlier)")
sections.extend(["", "Next Step:", "- Review the run VOD at the identified spike/gap to refine positioning"])
return "\n".join(sections)
def _render_frontload(self, packet: Dict[str, Any], question: str) -> str:
"""Analyze early damage vs overall and suggest opener reorder."""
timeline = packet.get("timeline") or {}
top_skills = packet.get("top_skills") or {}
run_summary = packet.get("run_summary") or {}
windows_data = packet.get("windows") or {}
rows = run_summary.get("rows") or []
run_row = rows[0] if rows else []
run_id = run_row[0] if len(run_row) > 0 else "?"
total_damage = run_row[2] if len(run_row) > 2 else 0
timeline_rows = timeline.get("rows") or []
if not timeline_rows:
return f"Run {run_id}: No timeline data for front-load analysis."
# Compute early window (0-60s) damage
early_window = windows_data.get("early_window") or {}
if not early_window:
# Fallback: compute from timeline
early_buckets = [r for r in timeline_rows if (r[0] if len(r) > 0 else 999) < 60]
early_damage = sum(r[4] for r in early_buckets if len(r) > 4)
early_window = {
"start_s": 0,
"end_s": 60,
"damage": early_damage,
"top_skills": []
}
early_damage = early_window.get("damage", 0)
early_share = (early_damage / total_damage * 100) if total_damage > 0 else 0
early_skills = early_window.get("top_skills", [])
# Overall context
skill_rows = top_skills.get("rows") or []
overall_top_skill = skill_rows[0][0] if skill_rows else "?"
sections = [
f"Front-Load Analysis (Run {run_id}):",
"",
"Early Damage (0–60s):",
f"- Dealt {self._format_number(early_damage)} damage ({early_share:.1f}% of total)",
f"- Overall total: {self._format_number(total_damage)} damage",
"",
]
if early_skills:
sections.append("Early Window Top Skills:")
sections.extend(f"- {skill}" for skill in early_skills[:3])
sections.append("")
# Recommendation
if early_share < 35:
sections.append("Recommendation:")
sections.append(f"- ⚠️ Early damage is LOW ({early_share:.1f}% share)")
sections.append(f"- Reorder opener to prioritize {overall_top_skill} and high-damage skills")
sections.append("- Front-load burst during buff windows (0–30s)")
elif early_share > 45:
sections.append("Recommendation:")
sections.append(f"- ✓ Strong front-load ({early_share:.1f}% share in first 60s)")
sections.append("- Maintain current opener sequence")
else:
sections.append("Recommendation:")
sections.append(f"- Balanced damage distribution ({early_share:.1f}% early share)")
sections.append("- Consider slight opener optimization for faster ramp")
sections.append("")
sections.extend([
"Next Questions:",
"- Where did my damage spike and why?",
"- Give me top 3 changes next run to increase DPS",
"- Which skills should I prioritize in opener?",
])
return "\n".join(sections)
def _format_mm_ss(self, seconds: int) -> str:
"""Format seconds as mm:ss."""
mins = seconds // 60
secs = seconds % 60
return f"{mins:02d}:{secs:02d}"
def _build_insights(
self,
run_summary: Dict[str, Any],
top_skills: Dict[str, Any],
skill_eff: Dict[str, Any],
timeline: Dict[str, Any],
) -> List[str]:
rows = run_summary.get("rows") or []
run_row = rows[0] if rows else []
run_id = run_row[0] if len(run_row) > 0 else "?"
total_damage = self._format_number(run_row[2] if len(run_row) > 2 else None)
dps = self._format_number(run_row[4] if len(run_row) > 4 else None)
crit = self._format_number(run_row[5] if len(run_row) > 5 else None)
top_skill = top_skills.get("rows", [["?", 0]])[0][0] if top_skills.get("rows") else "?"
eff_rows = skill_eff.get("rows") or []
high_avg = eff_rows[0][0] if eff_rows else top_skill
burst = timeline.get("rows")[-1][2] if timeline.get("rows") else None
insights = [
f"- Run {run_id} dealt {total_damage} dmg at {dps} DPS with {crit}% crits.",
f"- Top skill: {top_skill}; highest avg hit: {high_avg}.",
]
if burst is not None:
insights.append(f"- Latest bucket damage: {self._format_number(burst)} (momentum indicator).")
if top_skills.get("rows") and len(top_skills["rows"]) > 3:
share = top_skills["rows"][0][5] if len(top_skills["rows"][0]) > 5 else None
insights.append(
f"- Damage is front-loaded: {top_skill} holds {self._format_number(share)}% of run damage."
)
while len(insights) < 3:
insights.append("- Stable rotation detected over analyzed runs.")
return insights[:6]
def _build_evidence(
self,
run_summary: Dict[str, Any],
runs_last_n: Dict[str, Any],
top_skills: Dict[str, Any],
timeline: Dict[str, Any],
limits: Dict[str, Any],
) -> List[str]:
runs_count = len(self._coerce_runs_list(runs_last_n))
buckets = len(timeline.get("rows") or [])
top_count = len(top_skills.get("rows") or [])
dps = None
duration = None
crit = None
rows = run_summary.get("rows") or []
if rows:
row = rows[0]
dps = row[4] if len(row) > 4 else None
duration = row[3] if len(row) > 3 else None
crit = row[5] if len(row) > 5 else None
lines = [
f"- MCP packet: last_n_runs={limits.get('last_n_runs', '?')}, top_k={limits.get('top_k_skills', '?')}, bucket={limits.get('bucket_seconds', '?')}s.",
f"- Runs analyzed: {runs_count}; timeline buckets: {buckets}; skills scored: {top_count}.",
]
if dps is not None:
lines.append(f"- Run DPS: {self._format_number(dps)} over {self._format_number(duration)}s; crit {self._format_number(crit)}%.")
return lines
def _build_actions(self, top_skills: Dict[str, Any], skill_eff: Dict[str, Any]) -> List[str]:
actions: List[str] = []
top_rows = top_skills.get("rows") or []
eff_rows = skill_eff.get("rows") or []
if top_rows:
actions.append(f"- Lean into {top_rows[0][0]} (highest damage share).")
if eff_rows:
actions.append(f"- Prioritize {eff_rows[0][0]} for burst windows (best avg hit).")
if len(top_rows) > 1:
actions.append(f"- Tighten rotation between {top_rows[0][0]} and {top_rows[1][0]} for steadier DPS.")
while len(actions) < 2:
actions.append("- Keep crit uptime stable; avoid downtime between pulls.")
return actions[:4]
@staticmethod
def _build_next_questions(question: str) -> List[str]:
"""Curated follow-up questions (never echo user's question)."""
return [
"- Which skill fell off compared to prior runs?",
"- How do crit rates trend across fight time?",
"- What made my best DPS run succeed?",
]
def _format_analysis_trace(self, packet: Dict[str, Any]) -> Dict[str, Any]:
meta = packet.get("meta") or {}
limits = meta.get("limits") or {}
runs_last_n = packet.get("runs_last_n") or []
top_skills = packet.get("top_skills") or {}
timeline = packet.get("timeline") or {}
return {
"route": "DATA",
"tool": "get_analysis_packet",
"coach_session_id": self._coach_session_id,
"run_id": meta.get("run_id", "last"),
"top_k": limits.get("top_k_skills", 10),
"bucket_seconds": limits.get("bucket_seconds", 5),
"counts": {
"runs_last_n": self._count_rows(runs_last_n),
"top_skills_last_run": self._count_rows(top_skills),
"timeline_buckets_last_run": self._count_rows(timeline),
},
}
# ------------------------------------------------------------------ prompt helpers
def _plan_messages(self, question: str, context_json: str) -> List[Dict[str, str]]:
user_content = (
f"Context:\n{context_json}\n\n"
"Decide next step. Respond with exactly one line: either `SQL: <single SELECT query>`"
" or `ANSWER: <concise explanation>`. SQL must query the events table, stay read-only,"
" and remain under 20 lines."
f"\n\nQuestion: {question}"
)
return [
{"role": "system", "content": COACH_SYSTEM_PROMPT},
{"role": "user", "content": user_content},
]
def _answer_messages(
self,
question: str,
context_json: str,
sql: str,
result: Dict[str, Any],
) -> List[Dict[str, str]]:
table_preview = self._render_table(result)
user_content = (
f"Context:\n{context_json}\n\n"
f"Executed SQL:\n{sql}\n\n"
f"Result table:\n{table_preview}\n\n"
"Respond with `ANSWER: <short explanation>` referencing the numbers above."
f"\n\nQuestion: {question}"
)
return [
{"role": "system", "content": COACH_SYSTEM_PROMPT},
{"role": "user", "content": user_content},
]
def _call_model(self, messages: List[Dict[str, str]]) -> str:
response = self._model_manager.generate(
messages,
max_tokens=self._max_tokens,
temperature=self._temperature,
top_p=self._top_p,
repeat_penalty=self._repeat_penalty,
stop=self._stop_sequences,
)
return response.strip()
# ------------------------------------------------------------------ parsing helpers
@staticmethod
def _count_rows(payload: Any) -> int:
"""Robust row counter for dict-with-rows or list payloads."""
if isinstance(payload, list):
return len(payload)
if isinstance(payload, dict):
return len(payload.get("rows") or [])
return 0
@staticmethod
def _parse_plan(plan_response: str) -> Tuple[str, str | None]:
text = plan_response.strip()
upper = text.upper()
if upper.startswith("SQL:"):
sql = text[4:].strip()
if sql and (sql.lower().startswith("select") or sql.lower().startswith("with")):
return "sql", sql
return "invalid", None
if upper.startswith("ANSWER:"):
answer = text[7:].strip()
return ("answer", answer or None)
return "invalid", None
@staticmethod
def _parse_answer(answer_response: str) -> str | None:
trimmed = answer_response.strip()
if trimmed.upper().startswith("ANSWER:"):
answer = trimmed[7:].strip()
return answer or None
return None
# ------------------------------------------------------------------ context helpers
def _context_json(self, payload: Dict[str, Any], schema: Dict[str, Any]) -> str:
summary = payload.get("summary") or {}
aggregates = {
"total_runs": summary.get("total_runs"),
"total_damage": summary.get("total_damage"),
"overall_dps": summary.get("overall_dps"),
"overall_dpm": summary.get("overall_dpm"),
"overall_crit_rate_pct": summary.get("overall_crit_rate_pct"),
"overall_heavy_rate_pct": summary.get("overall_heavy_rate_pct"),
}
runs = list(payload.get("runs") or [])
recent = runs[-COACH_CONTEXT_RECENT_RUNS :]
run_entries: List[Dict[str, Any]] = []
for run in recent:
run_entries.append(
{
"run_id": run.get("run_id"),
"dps": run.get("dps"),
"dpm": run.get("dpm"),
"duration_seconds": run.get("duration_seconds"),
"crit_rate_pct": run.get("crit_rate_pct"),
}
)
skills = summary.get("top_skills_by_damage") or summary.get("top_skills") or []
columns = schema.get("columns") if isinstance(schema, dict) else None
column_names = [col.get("name") for col in columns or [] if isinstance(col, dict)]
compact = {
"aggregates": aggregates,
"recent_runs": run_entries,
"top_skills_by_damage": skills[:COACH_CONTEXT_TOP_SKILLS],
"schema": column_names,
}
return json.dumps(compact, indent=2)
# ------------------------------------------------------------------ formatting helpers
@staticmethod
def _render_table(result: Dict[str, Any], max_rows: int = 10) -> str:
columns = result.get("columns") or []
rows = result.get("rows") or []
if not columns:
return "(no columns)"
headers = [str(col) for col in columns]
row_text: List[List[str]] = []
for row in rows[:max_rows]:
row_text.append([SQLCoach._format_cell(value) for value in row])
widths = [len(header) for header in headers]
for row in row_text:
for idx, cell in enumerate(row):
widths[idx] = max(widths[idx], len(cell))
lines = [
" | ".join(header.ljust(widths[idx]) for idx, header in enumerate(headers)),
"-+-".join("-" * width for width in widths),
]
for row in row_text:
lines.append(" | ".join(cell.ljust(widths[idx]) for idx, cell in enumerate(row)))
remaining = max(0, len(rows) - len(row_text))
if remaining:
lines.append(f"(+{remaining} more rows)")
return "\n".join(lines)
@staticmethod
def _format_cell(value: Any) -> str:
if value is None:
return "NULL"
if isinstance(value, float):
return f"{value:.3f}"
return str(value)
@staticmethod
def _format_number(value: Any) -> str:
if value is None:
return "–"
try:
num = float(value)
except Exception:
return str(value)
if abs(num - int(num)) < 1e-6:
return f"{int(round(num)):,}"
return f"{num:.2f}"
@staticmethod
def _format_trace(sql: str, result: Dict[str, Any], fallback: bool = False) -> Dict[str, Any]:
rows = result.get("rows") or []
columns = result.get("columns") or []
return {
"tool_name": "query_dps",
"fallback": fallback,
"sql": sql.strip(),
"rows_returned": len(rows),
"columns": columns,
}
@staticmethod
def _fallback_from_result(result: Dict[str, Any], *, reason: str) -> str:
preview = SQLCoach._render_table(result)
return f"{reason}\n{preview}"
__all__ = ["SQLCoach"]