"""Base handler for phase operations."""
from abc import ABC, abstractmethod
from typing import Any
from pathfinder_mcp.artifacts import ArtifactWriter
from pathfinder_mcp.context import ContextMonitor
from pathfinder_mcp.session import SessionManager
from pathfinder_mcp.state import Phase, PhaseState
class BaseHandler(ABC):
"""Base class for phase handlers.
Provides common functionality and defines interface for phase operations.
Designed to be subclassed for each workflow phase.
"""
phase: Phase # Must be set by subclass
def __init__(
self,
session_manager: SessionManager,
artifact_writer: ArtifactWriter,
context_monitor: ContextMonitor,
sessions: dict[str, PhaseState],
):
"""Initialize handler with shared resources.
Args:
session_manager: Session persistence manager
artifact_writer: Artifact file writer
context_monitor: Token usage monitor
sessions: Active sessions dictionary
"""
self.session_manager = session_manager
self.artifact_writer = artifact_writer
self.context_monitor = context_monitor
self.sessions = sessions
def get_session(self, session_id: str) -> PhaseState | None:
"""Get session state by ID."""
return self.sessions.get(session_id)
def set_session(self, session_id: str, state: PhaseState) -> None:
"""Set session state."""
self.sessions[session_id] = state
def validate_session(self, session_id: str) -> dict[str, Any] | None:
"""Validate session exists.
Returns:
Error dict if invalid, None if valid
"""
if session_id not in self.sessions:
return {"error": "Session not found", "code": "SESSION_NOT_FOUND"}
return None
def validate_phase(
self, session_id: str, allowed_phases: list[Phase]
) -> dict[str, Any] | None:
"""Validate session is in an allowed phase.
Returns:
Error dict if invalid, None if valid
"""
state = self.get_session(session_id)
if not state:
return {"error": "Session not found", "code": "SESSION_NOT_FOUND"}
if state.current_phase not in allowed_phases:
return {
"error": f"Invalid phase: {state.current_phase.value}",
"code": "INVALID_PHASE",
"current_phase": state.current_phase.value,
"allowed_phases": [p.value for p in allowed_phases],
}
return None
@abstractmethod
async def execute(self, session_id: str, **kwargs: Any) -> dict[str, Any]:
"""Execute the handler's main operation.
Args:
session_id: Session ID
**kwargs: Handler-specific arguments
Returns:
Result dictionary
"""
pass
def get_context_status(self) -> dict[str, Any]:
"""Get current context utilization status."""
return self.context_monitor.get_status()