"""Tests for session manager."""
from pathfinder_mcp.session import (
SessionManager,
SessionSnapshot,
_sanitize_session_id,
)
from pathfinder_mcp.state import Phase
class TestSanitizeSessionId:
"""Tests for session ID sanitization."""
def test_normal_id_unchanged(self) -> None:
"""Normal session IDs pass through."""
assert _sanitize_session_id("my_session_123") == "my_session_123"
def test_removes_path_separators(self) -> None:
"""Path separators are removed."""
assert "/" not in _sanitize_session_id("../../../etc/passwd")
assert "\\" not in _sanitize_session_id("..\\..\\windows")
def test_removes_dangerous_chars(self) -> None:
"""Dangerous characters are replaced."""
result = _sanitize_session_id("test:session*name?")
assert ":" not in result
assert "*" not in result
assert "?" not in result
def test_handles_empty_string(self) -> None:
"""Empty string gets prefix."""
result = _sanitize_session_id("")
assert result.startswith("session_")
def test_truncates_long_ids(self) -> None:
"""Long IDs are truncated."""
long_id = "a" * 100
result = _sanitize_session_id(long_id)
assert len(result) <= 64
class TestSessionManager:
"""Tests for SessionManager class."""
def test_create_session_creates_directory(
self, session_manager: SessionManager, sample_session_id: str
) -> None:
"""create_session creates session directory."""
path = session_manager.create_session(sample_session_id)
assert path.exists()
assert path.is_dir()
def test_session_exists_false_initially(
self, session_manager: SessionManager, sample_session_id: str
) -> None:
"""session_exists returns False for new session."""
assert not session_manager.session_exists(sample_session_id)
def test_session_exists_true_after_create(
self, session_manager: SessionManager, sample_session_id: str
) -> None:
"""session_exists returns True after creation."""
session_manager.create_session(sample_session_id)
assert session_manager.session_exists(sample_session_id)
def test_save_and_load_snapshot(
self, session_manager: SessionManager, sample_session_id: str
) -> None:
"""Snapshots can be saved and loaded."""
snapshot = SessionSnapshot(
session_id=sample_session_id,
phase=Phase.RESEARCH,
task_description="Test task",
context_tokens=100,
)
session_manager.save_snapshot(sample_session_id, snapshot)
loaded = session_manager.load_snapshot(sample_session_id)
assert loaded is not None
assert loaded.session_id == sample_session_id
assert loaded.phase == Phase.RESEARCH
assert loaded.task_description == "Test task"
assert loaded.context_tokens == 100
def test_load_snapshot_returns_none_for_missing(
self, session_manager: SessionManager
) -> None:
"""load_snapshot returns None for non-existent session."""
result = session_manager.load_snapshot("nonexistent")
assert result is None
def test_get_session_path_sanitizes_id(
self, session_manager: SessionManager
) -> None:
"""get_session_path sanitizes the session ID."""
path = session_manager.get_session_path("../dangerous")
assert ".." not in str(path)