"""Implementation phase tools."""
import re
from typing import Any
from fastmcp.server.context import Context
from pathfinder_mcp.artifacts import ArtifactWriter
from pathfinder_mcp.context import ContextMonitor
from pathfinder_mcp.session import SessionManager
from pathfinder_mcp.state import Phase, PhaseState
def _extract_phases(plan_content: str) -> list[dict[str, Any]]:
"""Extract phase sections from plan content."""
phases: list[dict[str, Any]] = []
# Find all "## Phase N:" sections
pattern = (
r"## Phase (\d+):?\s*([^\n]*)\n(.*?)"
r"(?=## Phase \d+|## Implementation Details|$)"
)
matches = re.findall(pattern, plan_content, re.DOTALL)
for match in matches:
phase_num = int(match[0])
title = match[1].strip()
content = match[2].strip()
# Extract tasks (lines starting with - [ ] or - [x])
tasks = re.findall(r"- \[([ x])\] (.+)", content)
phases.append(
{
"number": phase_num,
"title": title,
"content": content,
"tasks": [{"done": t[0] == "x", "text": t[1]} for t in tasks],
"completed": all(t[0] == "x" for t in tasks) if tasks else False,
}
)
return phases
async def implement_phase( # noqa: C901
session_id: str,
phase_number: int | None = None,
ctx: Context | None = None,
*,
session_manager: SessionManager,
artifact_writer: ArtifactWriter,
context_monitor: ContextMonitor,
sessions: dict[str, PhaseState],
) -> dict[str, Any]:
"""Execute an implementation phase.
Args:
session_id: Session ID
phase_number: Specific phase to execute (or next uncompleted)
ctx: FastMCP context for elicitation
session_manager: Session manager instance
artifact_writer: Artifact writer instance
context_monitor: Context monitor instance
sessions: Active sessions dict
Returns:
Phase details and tasks to execute
"""
# Validate session
state = sessions.get(session_id)
if not state:
return {"error": "Session not found", "code": "SESSION_NOT_FOUND"}
# Validate current phase (must be PLAN or IMPLEMENT)
if state.is_research:
return {
"error": "Cannot implement before planning. Use start_plan first.",
"code": "INVALID_PHASE",
}
# Load plan
plan_content = artifact_writer.read_artifact(session_id, "plan.md")
if not plan_content:
return {"error": "Plan not found", "code": "MISSING_PLAN"}
# Extract phases from plan
phases = _extract_phases(plan_content)
if not phases:
return {
"error": "No phases found in plan",
"code": "NO_PHASES",
"hint": "Plan should have sections like '## Phase 1: Setup'",
}
# Determine which phase to execute
if phase_number is not None:
# Specific phase requested
target_phase = next((p for p in phases if p["number"] == phase_number), None)
if not target_phase:
return {
"error": f"Phase {phase_number} not found",
"code": "PHASE_NOT_FOUND",
"available_phases": [p["number"] for p in phases],
}
else:
# Find next uncompleted phase
target_phase = next((p for p in phases if not p["completed"]), None)
if not target_phase:
return {
"session_id": session_id,
"status": "all_phases_completed",
"message": "All implementation phases are complete!",
"phases": phases,
}
# Elicit confirmation if transitioning from PLAN to IMPLEMENT
if state.is_plan and ctx is not None:
try:
result = await ctx.elicit(
(
f"Ready to begin implementation Phase {target_phase['number']}: "
f"{target_phase['title']}?"
),
response_type=["yes", "no"],
)
if result.action != "accept" or result.data == "no":
return {
"session_id": session_id,
"phase": "plan",
"message": "Implementation cancelled. Review plan first.",
"cancelled": True,
}
except Exception:
# Elicit not supported, proceed
pass
# Transition to IMPLEMENT if in PLAN phase
if state.is_plan:
new_state = state.transition_to(Phase.IMPLEMENT)
sessions[session_id] = new_state
# Update progress artifact
progress_entry = f"""
## Phase {target_phase["number"]}: {target_phase["title"]}
**Status**: In Progress
### Tasks
"""
for task in target_phase["tasks"]:
status = "x" if task["done"] else " "
progress_entry += f"- [{status}] {task['text']}\n"
artifact_writer.write_progress(session_id, progress_entry)
# Track context
context_monitor.add_message(progress_entry)
return {
"session_id": session_id,
"phase": "implement",
"executing_phase": {
"number": target_phase["number"],
"title": target_phase["title"],
"tasks": target_phase["tasks"],
},
"total_phases": len(phases),
"completed_phases": sum(1 for p in phases if p["completed"]),
"context": context_monitor.get_status(),
"message": (
f"Executing Phase {target_phase['number']}. "
"Complete tasks and call implement_phase again for next phase."
),
}
def complete_phase(
session_id: str,
phase_number: int,
notes: str = "",
*,
session_manager: SessionManager,
artifact_writer: ArtifactWriter,
context_monitor: ContextMonitor,
sessions: dict[str, PhaseState],
) -> dict[str, Any]:
"""Mark a phase as completed and update progress.
Args:
session_id: Session ID
phase_number: Phase number to mark complete
notes: Optional completion notes
session_manager: Session manager instance
artifact_writer: Artifact writer instance
context_monitor: Context monitor instance
sessions: Active sessions dict
Returns:
Updated phase status
"""
# Validate session
state = sessions.get(session_id)
if not state:
return {"error": "Session not found", "code": "SESSION_NOT_FOUND"}
if not state.is_implement:
return {
"error": "Not in implementation phase",
"code": "INVALID_PHASE",
}
# Update progress
completion_entry = f"""
### Phase {phase_number} Completed
{notes if notes else "Phase completed successfully."}
---
"""
artifact_writer.write_progress(session_id, completion_entry)
# Track context
context_monitor.add_message(completion_entry)
return {
"session_id": session_id,
"phase_completed": phase_number,
"context": context_monitor.get_status(),
"message": (
f"Phase {phase_number} marked complete. "
"Call implement_phase for next phase."
),
}