import json
import os
import time
from pathlib import Path
import pytest
from unittest.mock import MagicMock, patch
import portalocker
from amicus import core, config
from amicus import server as server_module
# Create a SimpleNamespace to access underlying functions from FunctionTool wrappers
from types import SimpleNamespace
server = SimpleNamespace(
update_state=server_module.update_state.fn,
read_state=server_module.read_state.fn,
toggle_tracking=server_module.toggle_tracking.fn
)
def test_get_context_bus_dir(temp_context_dir):
"""Test directory resolution from environment variable."""
assert core.get_context_bus_dir() == temp_context_dir
def test_ensure_directory_exists(temp_context_dir):
"""Test directory creation."""
test_dir = temp_context_dir / "subdir"
core.ensure_directory_exists(test_dir)
assert test_dir.exists()
assert test_dir.is_dir()
def test_ensure_gitignore(temp_context_dir):
"""Test .gitignore creation and update."""
# Test creation
test_dir = temp_context_dir / ".ai"
test_dir.mkdir()
core.ensure_gitignore(test_dir)
gitignore = temp_context_dir / ".gitignore"
assert gitignore.exists()
assert ".ai/" in gitignore.read_text()
# Test idempotency
core.ensure_gitignore(test_dir)
content = gitignore.read_text()
assert content.count(".ai/") == 1
def test_atomic_write(temp_context_dir):
"""Test atomic write operation."""
target_file = temp_context_dir / "test.json"
data = {"test": "data"}
core.atomic_write(target_file, data)
assert target_file.exists()
with open(target_file) as f:
assert json.load(f) == data
# Verify no temp file remains
assert not (temp_context_dir / "test.json.tmp").exists()
def test_update_and_read_state(temp_context_dir):
"""Test the full update and read cycle via MCP tools."""
# Reset tracking state
core.set_tracking_enabled(True)
summary = "Test Summary"
next_steps = [{"task": "Test Steps", "status": "TODO"}]
active_files = ["file1.py", "file2.py"]
result = server.update_state(summary, next_steps, active_files)
assert "successfully" in result
state_output = server.read_state()
assert summary in state_output
assert "Test Steps" in state_output
assert "file1.py" in state_output
def test_toggle_tracking(temp_context_dir):
"""Test enabling/disabling tracking."""
# Disable
server.toggle_tracking(False)
assert not core.is_tracking_enabled()
# Try update
result = server.update_state("S", [], [])
assert "Tracking is disabled" in result
# Re-enable
server.toggle_tracking(True)
assert core.is_tracking_enabled()
result = server.update_state("S", [], [])
assert "successfully" in result
def test_ask_user_flag(temp_context_dir):
"""Test the elicitation pattern."""
server.update_state("S", [], [], ask_user=True)
state = server.read_state()
assert "🚨 PREVIOUS AGENT REQUESTED HUMAN INPUT" in state
def test_lock_timeout_configuration(temp_context_dir, monkeypatch):
"""Test that timeout can be configured via environment."""
# We need to reload config to pick up env var
monkeypatch.setenv("AMICUS_LOCK_TIMEOUT", "0.1")
# Create a new ConfigManager to read the env
new_config = config.ConfigManager()
assert new_config.get("lock_timeout") == 0.1
# Clean up
monkeypatch.delenv("AMICUS_LOCK_TIMEOUT", raising=False)
def test_lock_acquisition(temp_context_dir):
"""Test that locks are actually acquired and released."""
lock_file = core.get_lock_file()
# Write something
server.update_state("S", [], [])
# The lock file should exist
with portalocker.Lock(lock_file, "w", timeout=1):
pass
def test_read_with_shared_lock(temp_context_dir):
"""Test reading with shared lock."""
# Create state
server.update_state("S", [], [])
state_file = core.get_state_file()
# We should be able to read
data = core.read_with_lock(state_file)
assert data["summary"] == "S"
def test_config_manager_defaults(temp_context_dir):
"""Test ConfigManager defaults."""
cm = config.ConfigManager(temp_context_dir)
assert cm.get("tracking_enabled") is False
assert "ls" in cm.get("command_whitelist")
def test_config_manager_workspace_override(temp_context_dir):
"""Test workspace config override."""
workspace_config = temp_context_dir / ".amicus" / "config.json"
workspace_config.parent.mkdir()
with open(workspace_config, "w") as f:
json.dump({"lock_timeout": 99.0}, f)
cm = config.ConfigManager(temp_context_dir)
assert cm.get("lock_timeout") == 99.0
def test_unsafe_path_rejection(temp_context_dir):
"""Test that unsafe paths in active_files are rejected."""
result = server.update_state("S", [], ["/etc/passwd"])
assert "Error: Path" in result
assert "outside the project root" in result
# Verify state was NOT updated
state = server.read_state()
assert "/etc/passwd" not in state
def test_model_info_update_and_read(temp_context_dir):
"""Test that model_info is correctly updated and displayed."""
core.set_tracking_enabled(True)
model_data = {"name": "gemini-1.5-pro", "strength": "high"}
server.update_state("S", [], [], model_info=model_data)
state_output = server.read_state()
assert "**Node Model:** gemini-1.5-pro (Strength: high)" in state_output
# Update with new model info
new_model_data = {"name": "claude-opus", "strength": "medium"}
server.update_state("S2", [], [], model_info=new_model_data)
state_output = server.read_state()
assert "**Node Model:** claude-opus (Strength: medium)" in state_output
assert "gemini-1.5-pro" not in state_output
def test_model_info_persists_if_not_provided(temp_context_dir):
"""Test that model_info persists if not provided in subsequent updates."""
core.set_tracking_enabled(True)
initial_model_data = {"name": "gemini-1.5-flash", "strength": "low"}
server.update_state("Initial", [], [], model_info=initial_model_data)
# Update state without providing model_info
server.update_state("Subsequent", [], [])
state_output = server.read_state()
assert "**Node Model:** gemini-1.5-flash (Strength: low)" in state_output