Skip to main content
Glama

Katamari MCP Server

by ciphernaut
test_data_exposure_heuristic.py8.9 kB
""" Tests for DataExposure Heuristic Tag Tests the new DataExposure heuristic tag functionality including: - Data exposure assessment logic - Cross-tag consistency rules - Risk escalation for high exposure operations """ import pytest from katamari_mcp.acp.heuristics import ( HeuristicEngine, HeuristicTags, DataExposure, RiskLevel, Approval, Testing ) class TestDataExposureHeuristic: """Test suite for DataExposure heuristic functionality.""" @pytest.fixture def heuristic_engine(self): """Create heuristic engine instance for testing.""" return HeuristicEngine() def test_data_exposure_enum_values(self): """Test DataExposure enum has correct values.""" assert DataExposure.NONE.value == "none" assert DataExposure.LOCAL.value == "local" assert DataExposure.INTERNAL.value == "internal" assert DataExposure.EXTERNAL.value == "external" assert DataExposure.EXFILTRATION.value == "exfiltration" def test_assess_data_exposure_facter_capability(self, heuristic_engine): """Test facter capability gets EXFILTRATION exposure level.""" operation = { "type": "execute", "target": "facter", "description": "Gather system information" } exposure = heuristic_engine._assess_data_exposure(operation) assert exposure == DataExposure.EXFILTRATION def test_assess_data_exposure_system_info_capabilities(self, heuristic_engine): """Test various system info capabilities get EXFILTRATION level.""" high_exposure_targets = [ "system_info", "env", "processes", "network", "filesystem" ] for target in high_exposure_targets: operation = { "type": "execute", "target": target, "description": "System information gathering" } exposure = heuristic_engine._assess_data_exposure(operation) assert exposure == DataExposure.EXFILTRATION, f"Failed for target: {target}" def test_assess_data_exposure_description_keywords(self, heuristic_engine): """Test description keywords trigger EXFILTRATION level.""" high_exposure_descriptions = [ "system information gathering", "read environment variables", "list running processes", "perform network scan" ] for description in high_exposure_descriptions: operation = { "type": "execute", "target": "some_tool", "description": description } exposure = heuristic_engine._assess_data_exposure(operation) assert exposure == DataExposure.EXFILTRATION, f"Failed for description: {description}" def test_assess_data_exposure_web_operations(self, heuristic_engine): """Test web operations get EXTERNAL exposure level.""" web_operations = [ {"type": "web_scrape", "target": "example.com"}, {"type": "api_call", "target": "api_service"}, {"type": "execute", "target": "web_search"}, {"type": "execute", "target": "http_client"}, ] for operation in web_operations: exposure = heuristic_engine._assess_data_exposure(operation) assert exposure == DataExposure.EXTERNAL def test_assess_data_exposure_internal_systems(self, heuristic_engine): """Test internal system access gets INTERNAL exposure level.""" internal_targets = [ "database", "cache", "logs", "config" ] for target in internal_targets: operation = { "type": "execute", "target": target } exposure = heuristic_engine._assess_data_exposure(operation) assert exposure == DataExposure.INTERNAL, f"Failed for target: {target}" def test_assess_data_exposure_file_operations(self, heuristic_engine): """Test file operations get LOCAL exposure level.""" file_operations = [ {"type": "read_file", "target": "some_file.txt"}, {"type": "write_file", "target": "output.txt"}, {"type": "list_dir", "target": "/tmp"}, {"type": "execute", "target": "file_processor"}, ] for operation in file_operations: exposure = heuristic_engine._assess_data_exposure(operation) assert exposure == DataExposure.LOCAL def test_assess_data_exposure_no_exposure(self, heuristic_engine): """Test safe operations get NONE exposure level.""" safe_operations = [ {"type": "analyze", "target": "data"}, {"type": "validate", "target": "input"}, {"type": "test", "target": "component"}, {"type": "document", "target": "process"}, ] for operation in safe_operations: exposure = heuristic_engine._assess_data_exposure(operation) assert exposure == DataExposure.NONE def test_heuristic_tags_includes_data_exposure(self, heuristic_engine): """Test HeuristicTags includes data_exposure field.""" operation = { "type": "execute", "target": "facter", "description": "System information gathering" } tags = heuristic_engine.evaluate_operation(operation) assert hasattr(tags, 'data_exposure') assert tags.data_exposure == DataExposure.EXFILTRATION def test_heuristic_tags_to_dict_includes_data_exposure(self, heuristic_engine): """Test HeuristicTags.to_dict includes data_exposure.""" operation = { "type": "execute", "target": "web_search", "description": "Search the web" } tags = heuristic_engine.evaluate_operation(operation) tags_dict = tags.to_dict() assert 'data_exposure' in tags_dict assert tags_dict['data_exposure'] == DataExposure.EXTERNAL.value def test_consistency_rules_exfiltration_escalates_risk(self, heuristic_engine): """Test EXFILTRATION exposure escalates risk to CRITICAL.""" operation = { "type": "execute", "target": "facter", "description": "System information gathering" } tags = heuristic_engine.evaluate_operation(operation) # Should escalate to CRITICAL risk due to EXFILTRATION assert tags.risk == RiskLevel.CRITICAL assert tags.approval == Approval.MANUAL assert tags.testing == Testing.COMPREHENSIVE def test_consistency_rules_external_requires_manual_approval(self, heuristic_engine): """Test EXTERNAL exposure requires MANUAL approval.""" operation = { "type": "web_scrape", "target": "example.com", "description": "Scrape website data" } tags = heuristic_engine.evaluate_operation(operation) # Should require MANUAL approval and COMPREHENSIVE testing assert tags.approval == Approval.MANUAL assert tags.testing == Testing.COMPREHENSIVE assert tags.data_exposure == DataExposure.EXTERNAL def test_low_exposure_operations_maintain_normal_approval(self, heuristic_engine): """Test low exposure operations maintain normal approval flow.""" operation = { "type": "analyze", "target": "data", "description": "Analyze data patterns" } tags = heuristic_engine.evaluate_operation(operation) # Should maintain normal approval flow assert tags.data_exposure == DataExposure.NONE assert tags.approval == Approval.AUTO # Low risk operation assert tags.testing == Testing.NONE def test_real_world_facter_example(self, heuristic_engine): """Test real-world facter capability example from user question.""" facter_operation = { "type": "execute", "target": "facter", "description": "Expose local system details including hardware, OS, network config" } tags = heuristic_engine.evaluate_operation(facter_operation) # Should be flagged as high risk due to data exfiltration potential assert tags.data_exposure == DataExposure.EXFILTRATION assert tags.risk == RiskLevel.CRITICAL assert tags.approval == Approval.MANUAL assert tags.testing == Testing.COMPREHENSIVE # Verify it would be properly safeguarded assert not heuristic_engine.can_auto_approve(tags) assert heuristic_engine.needs_manual_approval(tags)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server