test_tools.py•20.6 kB
"""Unit tests for Scribe MCP tools."""
from __future__ import annotations
import asyncio
import json
import shutil
import uuid
from dataclasses import replace
from pathlib import Path
import pytest
import sys
# Add the MCP_SPINE directory to Python path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from scribe_mcp import server
from scribe_mcp.config.settings import settings
from scribe_mcp.state.manager import StateManager
from scribe_mcp.storage.sqlite import SQLiteStorage
from scribe_mcp.tools import (
append_entry,
generate_doc_templates,
get_project,
list_projects,
read_recent,
rotate_log,
set_project,
)
from scribe_mcp.tools.project_utils import slugify_project_name
def run(coro):
"""Execute an async coroutine from a synchronous test."""
return asyncio.run(coro)
@pytest.fixture
def isolated_state(tmp_path, monkeypatch):
"""Provide an isolated StateManager and assign it to the server module."""
manager = StateManager(path=tmp_path / "state.json")
monkeypatch.setattr(server, "state_manager", manager, raising=False)
if getattr(server, "storage_backend", None):
run(server.storage_backend.close())
storage = SQLiteStorage(tmp_path / "scribe.db")
run(storage.setup())
monkeypatch.setattr(server, "storage_backend", storage, raising=False)
append_entry._RATE_TRACKER.clear()
append_entry._RATE_LOCKS.clear()
# Clean up audit trail files for test isolation
import shutil
audit_dir = Path(__file__).parent.parent / "scribe_mcp" / "state"
if audit_dir.exists():
for audit_file in audit_dir.glob("rotation_audit_*.json"):
# Only remove test-related audit files (those with test patterns)
if any(pattern in audit_file.name for pattern in [
"test-", "test_", "-test", "_test", "performance", "metadata",
"integrity", "hash-chain", "history", "invalid-metadata", "enhanced-rotation"
]):
try:
audit_file.unlink()
except Exception:
pass # Ignore cleanup errors
return manager
@pytest.fixture
def project_root(tmp_path):
root = settings.project_root / "tmp_tests" / str(uuid.uuid4())
root.mkdir(parents=True, exist_ok=True)
yield root
if root.exists():
shutil.rmtree(root)
def test_set_and_get_project_roundtrip(isolated_state, project_root):
root = project_root
result = run(
set_project.set_project(
name="test-project",
root=str(root),
defaults={"emoji": "✅", "agent": "Tester"},
)
)
assert result["ok"]
assert len(result["generated"]) >= 1
active = run(get_project.get_project())
assert active["ok"]
project = active["project"]
assert project["name"] == "test-project"
docs_dir = root / "docs" / "dev_plans" / slugify_project_name("test-project")
assert project["progress_log"] == str((docs_dir / "PROGRESS_LOG.md").resolve())
assert project["docs"]["architecture"].endswith("ARCHITECTURE_GUIDE.md")
assert active["recent_projects"][0] == "test-project"
def test_append_and_read_recent(isolated_state, project_root):
root = project_root
run(set_project.set_project("log-test", str(root)))
append_result = run(
append_entry.append_entry(
message="Recorded unit test entry",
status="info",
meta={"scope": "unit-test"},
)
)
assert append_result["ok"]
written_line = append_result["written_line"]
lines = [line for line in Path(append_result["path"]).read_text(encoding="utf-8").splitlines() if line.strip()]
assert lines[-1] == written_line
recent = run(read_recent.read_recent(n=5))
assert recent["ok"]
# Check if any entry contains "unit-test" in the message or meta field
found_unit_test = False
for entry in recent["entries"]:
# Handle both dict entries (from DB) and string entries (from file)
if isinstance(entry, dict):
if "unit-test" in str(entry.get("message", "")) or "unit-test" in str(entry.get("meta", "")):
found_unit_test = True
break
else:
# String entry from file
if "unit-test" in str(entry):
found_unit_test = True
break
assert found_unit_test, f"No entry containing 'unit-test' found in entries: {recent['entries']}"
projects = run(list_projects.list_projects())
assert projects["ok"]
assert any(p["name"] == "log-test" for p in projects["projects"])
assert projects["recent_projects"][0] == "log-test"
def test_append_entry_uses_slugified_log_path(isolated_state, project_root):
root = project_root
project_name = "IMPLEMENTATION TESTING"
slug = slugify_project_name(project_name)
run(set_project.set_project(project_name, str(root)))
canonical_dir = (root / "docs" / "dev_plans" / slug).resolve()
log_path = (canonical_dir / "PROGRESS_LOG.md").resolve()
assert log_path.exists()
append_result = run(
append_entry.append_entry(
message="Verifying slugified path usage",
status="success",
)
)
assert append_result["ok"]
assert Path(append_result["path"]).resolve() == log_path
assert "Verifying slugified path usage" in log_path.read_text(encoding="utf-8")
def test_rotate_log_creates_archive(isolated_state, project_root):
root = project_root
run(set_project.set_project("rotate-test", str(root)))
run(append_entry.append_entry(message="Before rotation"))
result = run(rotate_log.rotate_log(suffix="test", confirm=True))
assert result["ok"]
archive_path = Path(result["archived_to"])
assert archive_path.exists()
assert archive_path.read_text(encoding="utf-8")
def test_generate_doc_templates_renders_files(tmp_path, isolated_state):
project_name = "UnitTestDocs"
target_dir = tmp_path / "docs" / "dev_plans" / slugify_project_name(project_name)
if target_dir.exists():
shutil.rmtree(target_dir)
try:
result = run(
generate_doc_templates.generate_doc_templates(
project_name=project_name,
author="QA",
base_dir=str(tmp_path),
)
)
assert result["ok"]
for filename in (
"ARCHITECTURE_GUIDE.md",
"PHASE_PLAN.md",
"CHECKLIST.md",
"PROGRESS_LOG.md",
):
path = target_dir / filename
assert path.exists()
content = path.read_text(encoding="utf-8")
assert "{{" not in content
finally:
if target_dir.exists():
shutil.rmtree(target_dir)
def test_rate_limit_blocks_excess_entries(monkeypatch, isolated_state, project_root):
root = project_root
run(set_project.set_project("rate-limit", str(root)))
patched_settings = replace(
settings,
log_rate_limit_count=1,
log_rate_limit_window=3600,
)
monkeypatch.setattr(append_entry, "settings", patched_settings, raising=False)
append_entry._RATE_TRACKER.clear()
append_entry._RATE_LOCKS.clear()
first = run(
append_entry.append_entry(
message="Initial entry",
status="info",
)
)
assert first["ok"]
second = run(
append_entry.append_entry(
message="Should be limited",
status="info",
)
)
assert not second["ok"]
assert "rate limit" in second["error"].lower()
assert second["retry_after_seconds"] >= 1
def test_log_rotation_triggers_when_max_bytes_reached(monkeypatch, isolated_state, project_root):
root = project_root
run(set_project.set_project("rotation-limit", str(root)))
# First, create a large log file that exceeds the threshold
result = run(append_entry.append_entry(
message="Initial large entry that exceeds max bytes threshold when combined with metadata" * 10,
status="info",
meta={"test": "large" * 20}
))
assert result["ok"]
log_path = Path(result["path"])
# Manually patch the log to be larger than threshold
initial_content = log_path.read_text(encoding="utf-8")
large_content = initial_content + "\n" + "Large content to exceed max bytes" * 100
log_path.write_text(large_content, encoding="utf-8")
# Now set a very low threshold and patch settings
patched_settings = replace(
settings,
log_max_bytes=100, # Set higher than 10 to avoid edge cases but still low
)
monkeypatch.setattr(append_entry, "settings", patched_settings, raising=False)
append_entry._RATE_TRACKER.clear()
append_entry._RATE_LOCKS.clear()
# Add another entry - this should trigger rotation
result = run(
append_entry.append_entry(
message="Entry that should trigger rotation",
status="info",
)
)
assert result["ok"]
# Check for archive files
archives = list(log_path.parent.glob(f"{log_path.name}.*.md"))
assert archives, "Expected rotated archive file to be created"
class TestEnhancedRotationEngine:
"""Integration tests for enhanced rotation engine with Phase 0 utilities."""
def test_enhanced_rotation_with_integrity(isolated_state, project_root):
"""Test enhanced rotation with SHA-256 integrity verification."""
# Set up project
root = project_root
result = run(
set_project.set_project(
name="enhanced-rotation-test",
root=str(root),
defaults={"emoji": "🧪", "agent": "TestAgent"},
)
)
assert result["ok"]
# Add test entries
run(
append_entry.append_entry(
message="Test entry 1 before rotation",
status="info",
meta={"phase": "1", "test": "true"}
)
)
run(
append_entry.append_entry(
message="Test entry 2 before rotation",
status="success",
meta={"phase": "1", "test": "true"}
)
)
# Test dry run rotation
dry_run_result = run(
rotate_log.rotate_log(dry_run=True)
)
assert dry_run_result["ok"]
assert dry_run_result["dry_run"] is True
assert "rotation_id" in dry_run_result
assert "file_hash" in dry_run_result
assert "entry_count" in dry_run_result
assert "sequence_number" in dry_run_result
# Test actual enhanced rotation
rotation_result = run(
rotate_log.rotate_log(suffix="test-enhanced", confirm=True)
)
if not rotation_result["ok"]:
print(f"Rotation failed with error: {rotation_result.get('error', 'Unknown error')}")
print(f"Full rotation result: {rotation_result}")
assert rotation_result["ok"]
print(f"✅ Rotation successful!")
print(f" Archive path: {rotation_result.get('archive_path', 'N/A')}")
print(f" Archive hash: {rotation_result.get('archive_hash', 'N/A')}")
print(f" Entry count: {rotation_result.get('entry_count', 'N/A')}")
assert rotation_result["rotation_completed"] is True
assert "rotation_id" in rotation_result
assert "archive_path" in rotation_result
assert "archive_hash" in rotation_result
assert "entry_count" in rotation_result
assert "rotation_duration_seconds" in rotation_result
assert rotation_result["integrity_verified"] is True
assert rotation_result["audit_trail_stored"] is True
assert rotation_result["state_updated"] is True
# Verify archive file exists
archive_path = Path(rotation_result["archive_path"])
assert archive_path.exists()
# Verify new progress log was created
active = run(get_project.get_project())
assert active["ok"]
new_log_path = Path(active["project"]["progress_log"])
assert new_log_path.exists()
assert new_log_path != archive_path
def test_rotation_with_custom_metadata(isolated_state, project_root):
"""Test rotation with custom metadata."""
# Set up project
root = project_root
run(
set_project.set_project(
name="metadata-test",
root=str(root),
)
)
# Add test entry
run(
append_entry.append_entry(
message="Test entry for metadata rotation",
status="info"
)
)
# Test rotation with custom metadata
custom_metadata = {"environment": "test", "version": "1.0", "test_run": True}
rotation_result = run(
rotate_log.rotate_log(
suffix="metadata-test",
custom_metadata=json.dumps(custom_metadata),
confirm=True
)
)
assert rotation_result["ok"]
assert rotation_result["rotation_completed"] is True
def test_rotation_with_invalid_metadata(isolated_state, project_root):
"""Test rotation with invalid JSON metadata."""
# Set up project
root = project_root
run(
set_project.set_project(
name="invalid-metadata-test",
root=str(root),
)
)
# Test rotation with invalid JSON
rotation_result = run(
rotate_log.rotate_log(
custom_metadata="{'invalid': json structure"
)
)
assert rotation_result["ok"] is False
assert "Invalid JSON" in rotation_result["error"]
def test_rotation_hash_chain_tracking(isolated_state, project_root):
"""Test hash chain tracking across multiple rotations."""
# Set up project
root = project_root
run(
set_project.set_project(
name="hash-chain-test",
root=str(root),
)
)
# First rotation
run(
append_entry.append_entry(
message="Entry before first rotation",
status="info"
)
)
rotation_1 = run(rotate_log.rotate_log(suffix="rotation-1", confirm=True))
assert rotation_1["ok"]
hash_1 = rotation_1["archive_hash"]
sequence_1 = rotation_1["sequence_number"]
# Add entries and second rotation
run(
append_entry.append_entry(
message="Entry between rotations",
status="info"
)
)
rotation_2 = run(rotate_log.rotate_log(suffix="rotation-2", confirm=True))
assert rotation_2["ok"]
hash_2 = rotation_2["archive_hash"]
sequence_2 = rotation_2["sequence_number"]
# Verify hash chain
assert sequence_2 == sequence_1 + 1
assert hash_2 != hash_1
def test_rotation_integrity_verification(isolated_state, project_root):
"""Test rotation integrity verification."""
# Set up project
root = project_root
run(
set_project.set_project(
name="integrity-test",
root=str(root),
)
)
# Add test entry and rotate
run(
append_entry.append_entry(
message="Test entry for integrity verification",
status="info"
)
)
rotation_result = run(rotate_log.rotate_log(confirm=True))
assert rotation_result["ok"]
# Test integrity verification
verification_result = run(
rotate_log.verify_rotation_integrity(rotation_result["rotation_id"])
)
if not verification_result["ok"]:
print(f"❌ Integrity verification failed: {verification_result.get('error', 'Unknown error')}")
print(f" Rotation ID: {rotation_result['rotation_id']}")
assert verification_result["ok"]
assert verification_result["integrity_valid"] is True
assert verification_result["project"] == "integrity-test"
def test_rotation_history_tracking(isolated_state, project_root):
"""Test rotation history tracking."""
# Set up project with unique name to avoid audit file conflicts
import uuid
unique_id = str(uuid.uuid4())[:8]
project_name = f"history-test-{unique_id}"
root = project_root
run(
set_project.set_project(
name=project_name,
root=str(root),
)
)
# Perform multiple rotations
for i in range(3):
run(
append_entry.append_entry(
message=f"Entry before rotation {i+1}",
status="info"
)
)
rotation_result = run(rotate_log.rotate_log(suffix=f"rotation-{i+1}", confirm=True))
assert rotation_result["ok"]
# Test rotation history
history_result = run(rotate_log.get_rotation_history(limit=5))
if not history_result["ok"]:
print(f"❌ History tracking failed: {history_result.get('error', 'Unknown error')}")
assert history_result["ok"]
assert history_result["project"] == project_name
assert history_result["rotation_count"] == 3
assert len(history_result["rotations"]) == 3
def test_rotation_error_handling(isolated_state, project_root, monkeypatch):
"""Test rotation error handling."""
# Test with no project configured - mock all project discovery methods
from scribe_mcp.state.manager import StateManager
from scribe_mcp import server as server_module
from scribe_mcp.tools import project_utils
# Mock environment variable to prevent fallback project discovery
monkeypatch.delenv("SCRIBE_DEFAULT_PROJECT", raising=False)
# Mock load_project_config to return None (no project configuration found)
def mock_load_project_config(project_name=None):
return None
monkeypatch.setattr(project_utils, "load_project_config", mock_load_project_config)
# Create a completely fresh state manager with no project data
fresh_state_path = project_root / "fresh_state.json"
fresh_state_manager = StateManager(path=fresh_state_path)
# Temporarily replace the server's state manager
original_state_manager = server_module.state_manager
server_module.state_manager = fresh_state_manager
try:
error_result = run(rotate_log.rotate_log())
assert error_result["ok"] is False
assert "No project configured" in error_result["error"]
finally:
# Restore original state manager
server_module.state_manager = original_state_manager
# Set up project but don't create log file
root = project_root
run(
set_project.set_project(
name="error-test",
root=str(root),
)
)
# Manually delete progress log to test error handling
active = run(get_project.get_project())
log_path = Path(active["project"]["progress_log"])
if log_path.exists():
log_path.unlink()
error_result = run(rotate_log.rotate_log())
assert error_result["ok"] is False
assert "not found" in error_result["error"]
def test_rotation_performance_monitoring(isolated_state, project_root):
"""Test rotation performance monitoring."""
# Set up project
root = project_root
run(
set_project.set_project(
name="performance-test",
root=str(root),
)
)
# Add some test entries
for i in range(5):
run(
append_entry.append_entry(
message=f"Performance test entry {i+1}",
status="info"
)
)
# Test rotation with performance monitoring
rotation_result = run(rotate_log.rotate_log(confirm=True))
assert rotation_result["ok"]
assert "rotation_duration_seconds" in rotation_result
assert rotation_result["rotation_duration_seconds"] >= 0
assert rotation_result["integrity_verified"] is True