import json
import os
from pathlib import Path
import pytest
from amicus.config import PathValidator, ConfigManager as Config
from amicus.server import (
is_safe_path as _is_safe_path,
execute_safe_command as _execute_safe_command,
broadcast_message as _broadcast_message,
read_state as _read_state,
update_state as _update_state
)
from amicus.core import set_tracking_enabled
from amicus.security import SensitiveDataScanner
# Access the underlying functions from FunctionTool wrappers
is_safe_path = _is_safe_path.fn
execute_safe_command = _execute_safe_command.fn
broadcast_message = _broadcast_message.fn
read_state = _read_state.fn
update_state = _update_state.fn
# --- Scanner Tests ---
def test_scanner_detection():
scanner = SensitiveDataScanner()
# Test cases
text_with_key = "My API key is api_key='1234567890123456'"
findings = scanner.scan(text_with_key)
assert len(findings) == 1
assert findings[0][0] == "Generic API Key"
assert "..." in findings[0][1] # Verify redaction in report
clean_text = "Just some normal text."
assert len(scanner.scan(clean_text)) == 0
def test_scanner_sanitization():
scanner = SensitiveDataScanner()
text = "Key: sk_live_test_api_key_placeholder_12345"
sanitized = scanner.sanitize(text)
assert "sk_live_" not in sanitized
assert "[REDACTED]" in sanitized
# --- Sandboxing & Whitelisting Tests ---
def test_path_validator():
root = Path("/tmp/amicus_test").resolve()
validator = PathValidator(root)
assert validator.is_safe("/tmp/amicus_test/file.txt")
assert validator.is_safe("/tmp/amicus_test/subdir/file.txt")
assert not validator.is_safe("/tmp/other_dir/file.txt")
assert not validator.is_safe("/tmp/amicus_test/../other_dir/file.txt")
def test_config_defaults(temp_context_dir):
config = Config(temp_context_dir)
assert "root_dir" in config.data
assert "command_whitelist" in config.data
def test_is_safe_path_tool(temp_context_dir, monkeypatch):
monkeypatch.setenv("CONTEXT_BUS_DIR", str(temp_context_dir))
cwd = str(Path.cwd())
assert is_safe_path(os.path.join(cwd, "test.txt"))
assert not is_safe_path("/etc/passwd")
def test_execute_safe_command_tool(temp_context_dir, monkeypatch):
monkeypatch.setenv("CONTEXT_BUS_DIR", str(temp_context_dir))
# Whitelisted command
result = execute_safe_command("echo hello")
assert "hello" in result
# Blocked command
result = execute_safe_command("rm -rf /")
assert "not in the whitelist" in result
# Injection Attempt (should print literal, not execute)
result = execute_safe_command("echo injection; echo pwned")
assert "injection; echo pwned" in result
# Complex but safe
result = execute_safe_command("echo 'hello world'")
assert "hello world" in result
def test_broadcast_message(temp_context_dir, monkeypatch):
monkeypatch.setenv("CONTEXT_BUS_DIR", str(temp_context_dir))
set_tracking_enabled(True)
update_state("S", [{"task": "N", "status": "TODO"}], [])
broadcast_message("Hello Agents")
state = read_state()
assert "Hello Agents" in state
def test_update_state_with_messages(temp_context_dir, monkeypatch):
monkeypatch.setenv("CONTEXT_BUS_DIR", str(temp_context_dir))
set_tracking_enabled(True)
update_state("S", [{"task": "N", "status": "TODO"}], [], messages=["Initial message"])
state = read_state()
assert "Initial message" in state