"""Context compaction tool."""
import json
from typing import Any
from pathfinder_mcp.artifacts import ArtifactWriter
from pathfinder_mcp.context import ContextMonitor
from pathfinder_mcp.session import SessionManager
from pathfinder_mcp.state import PhaseState
def compact_context(
session_id: str,
*,
session_manager: SessionManager,
artifact_writer: ArtifactWriter,
context_monitor: ContextMonitor,
sessions: dict[str, PhaseState],
) -> dict[str, Any]:
"""Compress session context into summary artifacts.
Creates:
- session_summary.md: Markdown summary
- session_state.json: Structured state (updated)
Args:
session_id: Session ID
session_manager: Session manager instance
artifact_writer: Artifact writer instance
context_monitor: Context monitor instance
sessions: Active sessions dict
Returns:
Paths to compressed artifacts
"""
# Validate session
state = sessions.get(session_id)
if not state:
return {"error": "Session not found", "code": "SESSION_NOT_FOUND"}
# Gather all artifacts
research = artifact_writer.read_artifact(session_id, "research.md") or ""
plan = artifact_writer.read_artifact(session_id, "plan.md") or ""
progress = artifact_writer.read_artifact(session_id, "progress.md") or ""
# Create markdown summary
summary_md = f"""# Session Summary: {session_id}
## Current Phase
{state.current_phase.value.upper()}
## Research Summary
{_summarize_section(research, max_lines=20)}
## Plan Summary
{_summarize_section(plan, max_lines=30)}
## Progress Summary
{_summarize_section(progress, max_lines=20)}
## Context Stats
- Previous tokens: {context_monitor.current_tokens}
- Compacted at: {context_monitor.get_utilization():.1f}% utilization
"""
# Write summary
session_path = session_manager.get_session_path(session_id)
summary_path = session_path / "session_summary.md"
summary_path.write_text(summary_md)
# Create structured JSON state
state_json = {
"session_id": session_id,
"phase": state.current_phase.value,
"artifacts": {
"research": bool(research),
"plan": bool(plan),
"progress": bool(progress),
},
"key_findings": _extract_key_points(research),
"plan_phases": _extract_phase_titles(plan),
"completed_work": _extract_completed_items(progress),
"compaction_stats": {
"pre_compaction_tokens": context_monitor.current_tokens,
"pre_compaction_utilization": context_monitor.get_utilization(),
},
}
# Update session state JSON
state_path = session_path / "session_state.json"
state_path.write_text(json.dumps(state_json, indent=2))
# Reset context monitor (summary is much smaller)
summary_tokens = context_monitor.count_tokens(summary_md)
context_monitor.reset(summary_tokens)
return {
"session_id": session_id,
"summary_path": str(summary_path),
"state_path": str(state_path),
"pre_compaction_tokens": state_json["compaction_stats"][
"pre_compaction_tokens"
],
"post_compaction_tokens": summary_tokens,
"context": context_monitor.get_status(),
"message": "Context compacted. Load session_summary.md to continue.",
}
def _summarize_section(content: str, max_lines: int = 20) -> str:
"""Extract key lines from a section."""
if not content:
return "_No content_"
lines = content.strip().split("\n")
# Keep headers and important lines
important = []
for line in lines:
stripped = line.strip()
# Keep headers
if stripped.startswith("#"):
important.append(stripped)
# Keep task items
elif stripped.startswith("- ["):
important.append(stripped)
# Keep key lines (not empty, not just whitespace)
elif stripped and len(important) < max_lines:
important.append(stripped)
if len(important) >= max_lines:
break
return "\n".join(important) if important else "_Content truncated_"
def _extract_key_points(research: str) -> list[str]:
"""Extract key findings from research."""
points: list[str] = []
for line in research.split("\n"):
line = line.strip()
# Look for bullet points or numbered items
if line.startswith("- ") or line.startswith("* "):
points.append(line[2:])
elif len(line) > 2 and line[0].isdigit() and line[1] in ".)":
points.append(line[2:].strip())
return points[:10] # Limit to 10 key points
def _extract_phase_titles(plan: str) -> list[str]:
"""Extract phase titles from plan."""
import re
titles: list[str] = []
for match in re.finditer(r"## Phase (\d+):?\s*(.+)", plan):
titles.append(f"Phase {match.group(1)}: {match.group(2).strip()}")
return titles
def _extract_completed_items(progress: str) -> list[str]:
"""Extract completed items from progress."""
completed: list[str] = []
for line in progress.split("\n"):
if "- [x]" in line.lower():
# Extract just the task text
text = line.split("]", 1)[-1].strip()
completed.append(text)
return completed[:20] # Limit