We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/tokusumi/wassden-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Unit tests for experiment manager."""
import asyncio
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
import yaml
from wassden.lib.experiment import (
ExperimentConfig,
ExperimentStatus,
ExperimentType,
OutputFormat,
)
from wassden.lib.experiment_manager import (
ConfigurationError,
ExecutionError,
ExperimentManager,
ExperimentManagerError,
)
pytestmark = pytest.mark.dev
@pytest.mark.dev
class TestExperimentManagerExecution:
"""Test experiment execution management functionality."""
def test_create_experiment_result(self):
"""Test creation of experiment result objects."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
config = ExperimentConfig(
experiment_type=ExperimentType.PERFORMANCE,
timeout_seconds=300,
memory_limit_mb=150,
output_format=[OutputFormat.JSON],
)
result = manager.create_experiment_result(config)
# Verify result properties
assert result.experiment_id is not None
assert len(result.experiment_id) > 10 # UUID-like length
assert result.config == config
assert result.status == ExperimentStatus.PENDING
assert result.duration_seconds == 0.0
assert result.metadata == {}
def test_update_experiment_status(self):
"""Test experiment status updates."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
config = ExperimentConfig(
experiment_type=ExperimentType.EARS_COVERAGE,
timeout_seconds=120,
memory_limit_mb=75,
output_format=[OutputFormat.YAML],
)
result = manager.create_experiment_result(config)
experiment_id = result.experiment_id
# Test status progression
manager.update_experiment_status(experiment_id, ExperimentStatus.RUNNING)
updated_result = manager.get_experiment_result(experiment_id)
assert updated_result.status == ExperimentStatus.RUNNING
# Test completion with metadata
test_metadata = {"results": {"accuracy": 0.95}, "time_taken": 2.5}
manager.update_experiment_status(experiment_id, ExperimentStatus.COMPLETED, metadata=test_metadata)
final_result = manager.get_experiment_result(experiment_id)
assert final_result.status == ExperimentStatus.COMPLETED
assert final_result.metadata == test_metadata
assert final_result.duration_seconds > 0 # Should be automatically calculated
def test_update_experiment_status_with_error(self):
"""Test experiment status update with error message."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
config = ExperimentConfig(
experiment_type=ExperimentType.COMPARATIVE,
timeout_seconds=180,
memory_limit_mb=200,
output_format=[OutputFormat.JSON],
)
result = manager.create_experiment_result(config)
experiment_id = result.experiment_id
# Test failure with error message
error_message = "Input file not found"
manager.update_experiment_status(experiment_id, ExperimentStatus.FAILED, error_message=error_message)
failed_result = manager.get_experiment_result(experiment_id)
assert failed_result.status == ExperimentStatus.FAILED
assert failed_result.error_message == error_message
"""Test experiment manager exception classes."""
def test_experiment_manager_error_inheritance(self):
"""Test ExperimentManagerError is base exception."""
error = ExperimentManagerError("test message")
assert str(error) == "test message"
assert isinstance(error, Exception)
def test_configuration_error_inheritance(self):
"""Test ConfigurationError inherits from ExperimentManagerError."""
error = ConfigurationError("config error")
assert str(error) == "config error"
assert isinstance(error, ExperimentManagerError)
def test_execution_error_inheritance(self):
"""Test ExecutionError inherits from ExperimentManagerError."""
error = ExecutionError("execution error")
assert str(error) == "execution error"
assert isinstance(error, ExperimentManagerError)
@pytest.mark.dev
class TestExperimentManagerAdvanced:
"""Test advanced ExperimentManager functionality."""
def test_list_active_experiments_and_cleanup(self):
"""Test active experiment listing and cleanup functionality."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
# Create multiple experiments with different statuses
configs = [
ExperimentConfig(
experiment_type=ExperimentType.PERFORMANCE,
timeout_seconds=300,
memory_limit_mb=100,
output_format=[OutputFormat.JSON],
),
ExperimentConfig(
experiment_type=ExperimentType.EARS_COVERAGE,
timeout_seconds=600,
memory_limit_mb=200,
output_format=[OutputFormat.CSV],
),
ExperimentConfig(
experiment_type=ExperimentType.LANGUAGE_DETECTION,
timeout_seconds=450,
memory_limit_mb=150,
output_format=[OutputFormat.YAML],
),
]
results = []
for config in configs:
result = manager.create_experiment_result(config)
results.append(result)
# Verify all experiments are initially active
active_experiments = manager.list_active_experiments()
assert len(active_experiments) == 3
# Update statuses to different states
manager.update_experiment_status(results[0].experiment_id, ExperimentStatus.COMPLETED)
manager.update_experiment_status(
results[1].experiment_id, ExperimentStatus.FAILED, error_message="Test error"
)
manager.update_experiment_status(results[2].experiment_id, ExperimentStatus.RUNNING)
# Test cleanup - should remove completed and failed
cleaned_count = manager.cleanup_completed_experiments()
assert cleaned_count == 2 # Completed and failed experiments removed
# Verify only running experiment remains
remaining_experiments = manager.list_active_experiments()
assert len(remaining_experiments) == 1
assert remaining_experiments[0].status == ExperimentStatus.RUNNING
assert remaining_experiments[0].experiment_id == results[2].experiment_id
def test_experiment_not_found_error_handling(self):
"""Test error handling for non-existent experiments."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
non_existent_id = "non-existent-experiment-id"
# Test get_experiment_result with non-existent ID
result = manager.get_experiment_result(non_existent_id)
assert result is None
# Test update_experiment_status with non-existent ID
with pytest.raises(ExecutionError, match="Experiment .* not found"):
manager.update_experiment_status(non_existent_id, ExperimentStatus.COMPLETED)
def test_create_default_config_for_all_experiment_types(self):
"""Test default config creation for all experiment types."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
# Test each experiment type
experiment_types = [
ExperimentType.PERFORMANCE,
ExperimentType.EARS_COVERAGE,
ExperimentType.LANGUAGE_DETECTION,
ExperimentType.COMPARATIVE,
]
for exp_type in experiment_types:
config = manager.create_default_config(exp_type)
# Verify basic properties
assert config.experiment_type == exp_type
assert config.timeout_seconds == 600 # Default value
assert config.memory_limit_mb == 100 # Default value
assert OutputFormat.JSON in config.output_format # Default format
assert isinstance(config.parameters, dict)
# Test that config can be saved and loaded
config_name = f"test_default_{exp_type.value}"
saved_path = manager.save_config(config, config_name)
assert saved_path.exists()
loaded_config = manager.load_config(config_name)
assert loaded_config.experiment_type == exp_type
@pytest.mark.dev
class TestExperimentManagerInit:
"""Test ExperimentManager initialization."""
def test_init_default_config_dir(self, tmp_path):
"""Test initialization with default config directory."""
with patch("wassden.lib.experiment_manager.Path.cwd", return_value=tmp_path):
manager = ExperimentManager()
expected_dir = tmp_path / ".wassden" / "experiments"
assert manager.config_dir == expected_dir
# Directory is not created until needed
assert not manager.config_dir.exists()
def test_init_custom_config_dir(self):
"""Test initialization with custom config directory."""
with tempfile.TemporaryDirectory() as temp_dir:
custom_dir = Path(temp_dir) / "custom_experiments"
manager = ExperimentManager(config_dir=custom_dir)
assert manager.config_dir == custom_dir
# Directory is not created until needed
assert not manager.config_dir.exists()
def test_init_active_experiments_empty(self, tmp_path):
"""Test initialization with empty active experiments."""
with patch("wassden.lib.experiment_manager.Path.cwd", return_value=tmp_path):
manager = ExperimentManager()
assert manager._active_experiments == {}
def test_ensure_config_dir_creates_directory(self, tmp_path):
"""Test that _ensure_config_dir creates the directory when needed."""
config_dir = tmp_path / "test_config"
manager = ExperimentManager(config_dir=config_dir)
# Directory doesn't exist initially
assert not config_dir.exists()
# Call _ensure_config_dir
manager._ensure_config_dir()
# Directory should now exist
assert config_dir.exists()
assert config_dir.is_dir()
@pytest.mark.dev
class TestExperimentManagerSaveConfig:
"""Test save_config method with real file operations."""
def test_save_config_success(self):
"""Test successful config saving with actual file verification."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
config = ExperimentConfig(
experiment_type=ExperimentType.PERFORMANCE,
timeout_seconds=300,
memory_limit_mb=150,
output_format=[OutputFormat.JSON],
parameters={"measurement_rounds": 5, "warmup": True},
)
config_path = manager.save_config(config, "test_config")
# Verify file was created at correct location
expected_path = manager.config_dir / "test_config.yaml"
assert config_path == expected_path
assert config_path.exists()
assert config_path.is_file()
# Verify content is valid YAML and matches configuration
with config_path.open() as f:
saved_data = yaml.safe_load(f)
# Verify all fields are correctly saved
assert saved_data["experiment_type"] == "performance"
assert saved_data["timeout_seconds"] == 300
assert saved_data["memory_limit_mb"] == 150
assert saved_data["output_format"] == ["json"]
assert saved_data["parameters"] == {"measurement_rounds": 5, "warmup": True}
# Verify file can be loaded back into ExperimentConfig
reloaded_config = ExperimentConfig(**saved_data)
assert reloaded_config.experiment_type == config.experiment_type
assert reloaded_config.parameters == config.parameters
def test_save_config_file_permissions(self):
"""Test that saved config files have correct permissions."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
config = ExperimentConfig(
experiment_type=ExperimentType.EARS_COVERAGE,
timeout_seconds=120,
memory_limit_mb=75,
output_format=[OutputFormat.YAML],
)
config_path = manager.save_config(config, "permission_test")
# Verify file is readable and writable
assert config_path.exists()
assert config_path.stat().st_mode & 0o600 # Owner read/write
# Verify content can be read
content = config_path.read_text()
assert "experiment_type: ears_coverage" in content
def test_save_config_overwrites_existing(self):
"""Test that save_config overwrites existing config."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
# Save first config
config1 = ExperimentConfig(
experiment_type=ExperimentType.EARS_COVERAGE,
timeout_seconds=100,
memory_limit_mb=50,
output_format=[OutputFormat.YAML],
)
manager.save_config(config1, "duplicate_test")
# Save second config with same name
config2 = ExperimentConfig(
experiment_type=ExperimentType.LANGUAGE_DETECTION,
timeout_seconds=200,
memory_limit_mb=100,
output_format=[OutputFormat.JSON],
)
config_path = manager.save_config(config2, "duplicate_test")
# Verify latest config was saved
with config_path.open() as f:
saved_data = yaml.safe_load(f)
assert saved_data["experiment_type"] == "language_detection"
assert saved_data["timeout_seconds"] == 200
@pytest.mark.dev
class TestExperimentManagerLoadConfig:
"""Test load_config method."""
def test_load_config_success(self):
"""Test successful config loading."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
# Create config file
config_data = {
"experiment_type": "comparative",
"timeout_seconds": 400,
"memory_limit_mb": 200,
"output_format": ["json", "yaml"],
}
config_path = manager.config_dir / "load_test.yaml"
with config_path.open("w") as f:
yaml.dump(config_data, f)
# Load config
loaded_config = manager.load_config("load_test")
assert loaded_config.experiment_type == ExperimentType.COMPARATIVE
assert loaded_config.timeout_seconds == 400
assert loaded_config.memory_limit_mb == 200
# Note: ExperimentConfig.output_format only accepts one format at a time in a list
assert OutputFormat.JSON in loaded_config.output_format
def test_load_config_file_not_found(self):
"""Test loading non-existent config raises ConfigurationError."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
with pytest.raises(ConfigurationError, match="Configuration file .* not found"):
manager.load_config("nonexistent")
@pytest.mark.dev
class TestExperimentManagerCreateDefaultConfig:
"""Test create_default_config method."""
def test_create_default_config_performance(self, tmp_path):
"""Test creating default performance config."""
manager = ExperimentManager(config_dir=tmp_path)
config = manager.create_default_config(ExperimentType.PERFORMANCE)
assert config.experiment_type == ExperimentType.PERFORMANCE
assert config.timeout_seconds == 600
assert config.memory_limit_mb == 100
assert OutputFormat.JSON in config.output_format
def test_create_default_config_ears_coverage(self, tmp_path):
"""Test creating default EARS coverage config."""
manager = ExperimentManager(config_dir=tmp_path)
config = manager.create_default_config(ExperimentType.EARS_COVERAGE)
assert config.experiment_type == ExperimentType.EARS_COVERAGE
assert config.timeout_seconds == 600
assert config.memory_limit_mb == 100
assert OutputFormat.JSON in config.output_format
def test_create_default_config_language_detection(self, tmp_path):
"""Test creating default language detection config."""
manager = ExperimentManager(config_dir=tmp_path)
config = manager.create_default_config(ExperimentType.LANGUAGE_DETECTION)
assert config.experiment_type == ExperimentType.LANGUAGE_DETECTION
assert config.timeout_seconds == 600
assert config.memory_limit_mb == 100
assert OutputFormat.JSON in config.output_format
def test_create_default_config_comparative(self, tmp_path):
"""Test creating default comparative config."""
manager = ExperimentManager(config_dir=tmp_path)
config = manager.create_default_config(ExperimentType.COMPARATIVE)
assert config.experiment_type == ExperimentType.COMPARATIVE
assert config.timeout_seconds == 600
assert config.memory_limit_mb == 100
assert OutputFormat.JSON in config.output_format
@pytest.mark.dev
class TestExperimentManagerListConfigs:
"""Test list_configs method."""
def test_list_configs_empty(self):
"""Test listing configs when directory is empty."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
configs = manager.list_configs()
assert configs == []
def test_list_configs_nonexistent_directory(self):
"""Test listing configs when config directory doesn't exist."""
with tempfile.TemporaryDirectory() as temp_dir:
# Use a non-existent subdirectory
nonexistent_dir = Path(temp_dir) / "nonexistent"
manager = ExperimentManager(config_dir=nonexistent_dir)
configs = manager.list_configs()
assert configs == []
def test_list_configs_with_files(self):
"""Test listing configs with multiple files."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
# Create test config files
config_names = ["config1", "config2", "config3"]
for name in config_names:
config = ExperimentConfig(
experiment_type=ExperimentType.PERFORMANCE,
timeout_seconds=300,
memory_limit_mb=100,
output_format=[OutputFormat.JSON],
)
manager.save_config(config, name)
configs = manager.list_configs()
assert len(configs) == 3
assert all(name in configs for name in config_names)
def test_list_configs_ignores_non_yaml(self):
"""Test that list_configs ignores non-YAML files."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
# Create YAML config
config = ExperimentConfig(
experiment_type=ExperimentType.PERFORMANCE,
timeout_seconds=300,
memory_limit_mb=100,
output_format=[OutputFormat.JSON],
)
manager.save_config(config, "valid_config")
# Create non-YAML files
(manager.config_dir / "readme.txt").write_text("Not a config")
(manager.config_dir / "config.json").write_text("{}")
configs = manager.list_configs()
assert configs == ["valid_config"]
@pytest.mark.dev
class TestExperimentManagerBasicFunctionality:
"""Test basic experiment manager functionality that is implemented."""
def test_save_and_load_config_integration(self):
"""Test saving and loading config integration."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
# Create and save config
config = manager.create_default_config(ExperimentType.PERFORMANCE)
config.timeout_seconds = 300
config.memory_limit_mb = 150
manager.save_config(config, "integration_test")
# Load and verify
loaded_config = manager.load_config("integration_test")
assert loaded_config.experiment_type == ExperimentType.PERFORMANCE
assert loaded_config.timeout_seconds == 300
assert loaded_config.memory_limit_mb == 150
@pytest.mark.dev
class TestExperimentManagerAsyncExecution:
"""Test asynchronous experiment execution functionality."""
@pytest.mark.asyncio
async def test_run_experiment_basic_workflow(self):
"""Test basic run_experiment workflow."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
config = ExperimentConfig(
experiment_type=ExperimentType.PERFORMANCE,
timeout_seconds=300,
memory_limit_mb=100,
output_format=[OutputFormat.JSON],
parameters={"test_documents": ["test.py"]},
)
# Execute experiment
result = await manager.run_experiment(config)
# Verify complete workflow
assert result.status == ExperimentStatus.COMPLETED
assert result.experiment_id in manager._active_experiments
assert "experiment_type" in result.metadata
assert result.metadata["experiment_type"] == "performance"
assert "parameters" in result.metadata
assert result.metadata["parameters"] == config.parameters
assert "output_formats" in result.metadata
assert result.metadata["output_formats"] == ["json"]
@pytest.mark.asyncio
async def test_run_experiment_all_types(self):
"""Test run_experiment with all experiment types."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
test_cases = [
(ExperimentType.PERFORMANCE, {"test_documents": ["perf.py"]}),
(ExperimentType.EARS_COVERAGE, {"test_documents": ["ears.py"]}),
(ExperimentType.LANGUAGE_DETECTION, {"test_documents": ["lang.py"]}),
(ExperimentType.COMPARATIVE, {"test_documents": ["comp.py"], "baseline_experiment_id": "baseline-123"}),
]
for exp_type, params in test_cases:
config = ExperimentConfig(
experiment_type=exp_type,
timeout_seconds=300,
memory_limit_mb=100,
output_format=[OutputFormat.JSON],
parameters=params,
)
result = await manager.run_experiment(config)
assert result.status == ExperimentStatus.COMPLETED
assert result.metadata["experiment_type"] == exp_type.value
assert result.metadata["parameters"] == params
@pytest.mark.asyncio
async def test_run_experiment_multiple_output_formats(self):
"""Test run_experiment with multiple output formats."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
config = ExperimentConfig(
experiment_type=ExperimentType.PERFORMANCE,
timeout_seconds=300,
memory_limit_mb=100,
output_format=[OutputFormat.JSON, OutputFormat.CSV, OutputFormat.YAML],
parameters={"test_documents": ["test.py"]},
)
result = await manager.run_experiment(config)
assert result.status == ExperimentStatus.COMPLETED
expected_formats = ["json", "csv", "yaml"]
assert result.metadata["output_formats"] == expected_formats
@pytest.mark.asyncio
async def test_run_experiment_error_handling(self):
"""Test run_experiment error handling."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
config = ExperimentConfig(
experiment_type=ExperimentType.PERFORMANCE,
timeout_seconds=300,
memory_limit_mb=100,
output_format=[OutputFormat.JSON],
parameters={"test_documents": ["test.py"]},
)
# Mock update_experiment_status to raise an exception
original_update = manager.update_experiment_status
call_count = 0
def mock_update(exp_id, status, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 2: # Second call (COMPLETED status)
raise RuntimeError("Simulated update error")
return original_update(exp_id, status, **kwargs)
manager.update_experiment_status = mock_update
with pytest.raises(ExecutionError, match="Experiment execution failed"):
await manager.run_experiment(config)
# Verify experiment was marked as failed
experiment_id = next(iter(manager._active_experiments.keys()))
failed_result = manager._active_experiments[experiment_id]
assert failed_result.status == ExperimentStatus.FAILED
assert "Simulated update error" in failed_result.error_message
@pytest.mark.asyncio
async def test_run_experiment_status_transitions(self):
"""Test experiment status transitions during execution."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
config = ExperimentConfig(
experiment_type=ExperimentType.PERFORMANCE,
timeout_seconds=300,
memory_limit_mb=100,
output_format=[OutputFormat.JSON],
parameters={"test_documents": ["test.py"]},
)
# Track status changes
status_changes = []
original_update = manager.update_experiment_status
def track_status(exp_id, status, **kwargs):
status_changes.append((exp_id, status))
return original_update(exp_id, status, **kwargs)
manager.update_experiment_status = track_status
result = await manager.run_experiment(config)
# Verify status transition: PENDING -> RUNNING -> COMPLETED
assert len(status_changes) == 2
assert status_changes[0][1] == ExperimentStatus.RUNNING
assert status_changes[1][1] == ExperimentStatus.COMPLETED
assert result.status == ExperimentStatus.COMPLETED
@pytest.mark.asyncio
async def test_run_experiment_concurrent_execution(self):
"""Test multiple concurrent experiment executions."""
with tempfile.TemporaryDirectory() as temp_dir:
manager = ExperimentManager(config_dir=Path(temp_dir))
configs = [
ExperimentConfig(
experiment_type=ExperimentType.PERFORMANCE,
timeout_seconds=300,
memory_limit_mb=100,
output_format=[OutputFormat.JSON],
parameters={"test_documents": [f"test{i}.py"]},
)
for i in range(3)
]
# Execute all experiments concurrently
tasks = [manager.run_experiment(config) for config in configs]
results = await asyncio.gather(*tasks)
# Verify all experiments completed successfully
assert len(results) == 3
for result in results:
assert result.status == ExperimentStatus.COMPLETED
# Verify all have unique IDs
experiment_ids = [result.experiment_id for result in results]
assert len(set(experiment_ids)) == 3
# Verify all are tracked in manager
assert len(manager._active_experiments) == 3