test_data_exposure_heuristic.py•8.9 kB
"""
Tests for DataExposure Heuristic Tag
Tests the new DataExposure heuristic tag functionality including:
- Data exposure assessment logic
- Cross-tag consistency rules
- Risk escalation for high exposure operations
"""
import pytest
from katamari_mcp.acp.heuristics import (
HeuristicEngine,
HeuristicTags,
DataExposure,
RiskLevel,
Approval,
Testing
)
class TestDataExposureHeuristic:
"""Test suite for DataExposure heuristic functionality."""
@pytest.fixture
def heuristic_engine(self):
"""Create heuristic engine instance for testing."""
return HeuristicEngine()
def test_data_exposure_enum_values(self):
"""Test DataExposure enum has correct values."""
assert DataExposure.NONE.value == "none"
assert DataExposure.LOCAL.value == "local"
assert DataExposure.INTERNAL.value == "internal"
assert DataExposure.EXTERNAL.value == "external"
assert DataExposure.EXFILTRATION.value == "exfiltration"
def test_assess_data_exposure_facter_capability(self, heuristic_engine):
"""Test facter capability gets EXFILTRATION exposure level."""
operation = {
"type": "execute",
"target": "facter",
"description": "Gather system information"
}
exposure = heuristic_engine._assess_data_exposure(operation)
assert exposure == DataExposure.EXFILTRATION
def test_assess_data_exposure_system_info_capabilities(self, heuristic_engine):
"""Test various system info capabilities get EXFILTRATION level."""
high_exposure_targets = [
"system_info", "env", "processes", "network", "filesystem"
]
for target in high_exposure_targets:
operation = {
"type": "execute",
"target": target,
"description": "System information gathering"
}
exposure = heuristic_engine._assess_data_exposure(operation)
assert exposure == DataExposure.EXFILTRATION, f"Failed for target: {target}"
def test_assess_data_exposure_description_keywords(self, heuristic_engine):
"""Test description keywords trigger EXFILTRATION level."""
high_exposure_descriptions = [
"system information gathering",
"read environment variables",
"list running processes",
"perform network scan"
]
for description in high_exposure_descriptions:
operation = {
"type": "execute",
"target": "some_tool",
"description": description
}
exposure = heuristic_engine._assess_data_exposure(operation)
assert exposure == DataExposure.EXFILTRATION, f"Failed for description: {description}"
def test_assess_data_exposure_web_operations(self, heuristic_engine):
"""Test web operations get EXTERNAL exposure level."""
web_operations = [
{"type": "web_scrape", "target": "example.com"},
{"type": "api_call", "target": "api_service"},
{"type": "execute", "target": "web_search"},
{"type": "execute", "target": "http_client"},
]
for operation in web_operations:
exposure = heuristic_engine._assess_data_exposure(operation)
assert exposure == DataExposure.EXTERNAL
def test_assess_data_exposure_internal_systems(self, heuristic_engine):
"""Test internal system access gets INTERNAL exposure level."""
internal_targets = [
"database", "cache", "logs", "config"
]
for target in internal_targets:
operation = {
"type": "execute",
"target": target
}
exposure = heuristic_engine._assess_data_exposure(operation)
assert exposure == DataExposure.INTERNAL, f"Failed for target: {target}"
def test_assess_data_exposure_file_operations(self, heuristic_engine):
"""Test file operations get LOCAL exposure level."""
file_operations = [
{"type": "read_file", "target": "some_file.txt"},
{"type": "write_file", "target": "output.txt"},
{"type": "list_dir", "target": "/tmp"},
{"type": "execute", "target": "file_processor"},
]
for operation in file_operations:
exposure = heuristic_engine._assess_data_exposure(operation)
assert exposure == DataExposure.LOCAL
def test_assess_data_exposure_no_exposure(self, heuristic_engine):
"""Test safe operations get NONE exposure level."""
safe_operations = [
{"type": "analyze", "target": "data"},
{"type": "validate", "target": "input"},
{"type": "test", "target": "component"},
{"type": "document", "target": "process"},
]
for operation in safe_operations:
exposure = heuristic_engine._assess_data_exposure(operation)
assert exposure == DataExposure.NONE
def test_heuristic_tags_includes_data_exposure(self, heuristic_engine):
"""Test HeuristicTags includes data_exposure field."""
operation = {
"type": "execute",
"target": "facter",
"description": "System information gathering"
}
tags = heuristic_engine.evaluate_operation(operation)
assert hasattr(tags, 'data_exposure')
assert tags.data_exposure == DataExposure.EXFILTRATION
def test_heuristic_tags_to_dict_includes_data_exposure(self, heuristic_engine):
"""Test HeuristicTags.to_dict includes data_exposure."""
operation = {
"type": "execute",
"target": "web_search",
"description": "Search the web"
}
tags = heuristic_engine.evaluate_operation(operation)
tags_dict = tags.to_dict()
assert 'data_exposure' in tags_dict
assert tags_dict['data_exposure'] == DataExposure.EXTERNAL.value
def test_consistency_rules_exfiltration_escalates_risk(self, heuristic_engine):
"""Test EXFILTRATION exposure escalates risk to CRITICAL."""
operation = {
"type": "execute",
"target": "facter",
"description": "System information gathering"
}
tags = heuristic_engine.evaluate_operation(operation)
# Should escalate to CRITICAL risk due to EXFILTRATION
assert tags.risk == RiskLevel.CRITICAL
assert tags.approval == Approval.MANUAL
assert tags.testing == Testing.COMPREHENSIVE
def test_consistency_rules_external_requires_manual_approval(self, heuristic_engine):
"""Test EXTERNAL exposure requires MANUAL approval."""
operation = {
"type": "web_scrape",
"target": "example.com",
"description": "Scrape website data"
}
tags = heuristic_engine.evaluate_operation(operation)
# Should require MANUAL approval and COMPREHENSIVE testing
assert tags.approval == Approval.MANUAL
assert tags.testing == Testing.COMPREHENSIVE
assert tags.data_exposure == DataExposure.EXTERNAL
def test_low_exposure_operations_maintain_normal_approval(self, heuristic_engine):
"""Test low exposure operations maintain normal approval flow."""
operation = {
"type": "analyze",
"target": "data",
"description": "Analyze data patterns"
}
tags = heuristic_engine.evaluate_operation(operation)
# Should maintain normal approval flow
assert tags.data_exposure == DataExposure.NONE
assert tags.approval == Approval.AUTO # Low risk operation
assert tags.testing == Testing.NONE
def test_real_world_facter_example(self, heuristic_engine):
"""Test real-world facter capability example from user question."""
facter_operation = {
"type": "execute",
"target": "facter",
"description": "Expose local system details including hardware, OS, network config"
}
tags = heuristic_engine.evaluate_operation(facter_operation)
# Should be flagged as high risk due to data exfiltration potential
assert tags.data_exposure == DataExposure.EXFILTRATION
assert tags.risk == RiskLevel.CRITICAL
assert tags.approval == Approval.MANUAL
assert tags.testing == Testing.COMPREHENSIVE
# Verify it would be properly safeguarded
assert not heuristic_engine.can_auto_approve(tags)
assert heuristic_engine.needs_manual_approval(tags)