Skip to main content
Glama
test_policy.py51.1 kB
"""Unit tests for policy evaluation engine. CRITICAL: Tests for deny-before-allow precedence are the most important tests in this file. This precedence order must NEVER be violated. """ import pytest from src.policy import PolicyEngine class TestDenyBeforeAllowPrecedence: """CRITICAL: Tests for deny-before-allow precedence rules. These tests are the most important in the entire test suite. The precedence order MUST be: 1. Explicit deny rules 2. Explicit allow rules 3. Wildcard deny rules 4. Wildcard allow rules 5. Default policy """ def test_explicit_deny_overrides_wildcard_allow(self): """Test that explicit deny takes precedence over wildcard allow.""" rules = { "agents": { "test_agent": { "allow": { "servers": ["postgres"], "tools": {"postgres": ["*"]} # Wildcard allow ALL }, "deny": { "tools": {"postgres": ["drop_table"]} # Explicit deny } } }, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(rules) # Should allow query (matches wildcard allow, no deny) assert engine.can_access_tool("test_agent", "postgres", "query") is True # Should deny drop_table (explicit deny overrides wildcard allow) assert engine.can_access_tool("test_agent", "postgres", "drop_table") is False def test_wildcard_deny_overrides_wildcard_allow(self): """Test that wildcard deny patterns take precedence over wildcard allow.""" rules = { "agents": { "test_agent": { "allow": { "servers": ["postgres"], "tools": {"postgres": ["*"]} # Wildcard allow }, "deny": { "tools": {"postgres": ["drop_*"]} # Pattern deny } } }, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(rules) # Should allow query (matches wildcard allow, no deny) assert engine.can_access_tool("test_agent", "postgres", "query") is True # Should deny drop_table (matches deny pattern, even though wildcard allows) assert engine.can_access_tool("test_agent", "postgres", "drop_table") is False # Should deny drop_database (matches deny pattern) assert engine.can_access_tool("test_agent", "postgres", "drop_database") is False def test_explicit_deny_overrides_explicit_allow(self): """Test that explicit deny overrides explicit allow for same tool.""" rules = { "agents": { "test_agent": { "allow": { "servers": ["db"], "tools": {"db": ["dangerous_tool", "safe_tool"]} }, "deny": { "tools": {"db": ["dangerous_tool"]} # Also in allow } } }, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(rules) # Should deny dangerous_tool (deny overrides allow) assert engine.can_access_tool("test_agent", "db", "dangerous_tool") is False # Should allow safe_tool (only in allow, not in deny) assert engine.can_access_tool("test_agent", "db", "safe_tool") is True def test_wildcard_deny_overrides_explicit_allow(self): """Test that wildcard deny (level 2) overrides explicit allow (level 3).""" rules = { "agents": { "test_agent": { "allow": { "servers": ["db"], "tools": {"db": ["delete_user", "delete_data", "get_user"]} }, "deny": { "tools": {"db": ["delete_*"]} # Pattern deny } } }, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(rules) # Should DENY delete_user (wildcard deny wins over explicit allow) assert engine.can_access_tool("test_agent", "db", "delete_user") is False # Should DENY delete_data (wildcard deny wins over explicit allow) assert engine.can_access_tool("test_agent", "db", "delete_data") is False # Should ALLOW get_user (in allow list, doesn't match deny pattern) assert engine.can_access_tool("test_agent", "db", "get_user") is True # Should DENY delete_something_else (wildcard deny, not in explicit allow) assert engine.can_access_tool("test_agent", "db", "delete_something_else") is False def test_wildcard_deny_blocks_all_matching_tools(self): """Test that wildcard deny blocks all tools matching the pattern.""" # This tests that wildcard deny (level 2) overrides explicit allow (level 3) rules = { "agents": { "test_agent": { "allow": { "servers": ["db"], "tools": {"db": ["drop_old_data", "query"]} # Explicit allow for tools }, "deny": { "tools": {"db": ["drop_*"]} # Pattern deny } } }, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(rules) # Should DENY drop_old_data (wildcard deny beats explicit allow) # This is level 2 (wildcard deny) vs level 3 (explicit allow) assert engine.can_access_tool("test_agent", "db", "drop_old_data") is False # Should DENY drop_table (matches wildcard deny, not in explicit allow) assert engine.can_access_tool("test_agent", "db", "drop_table") is False # Should ALLOW query (in explicit allow, doesn't match deny pattern) assert engine.can_access_tool("test_agent", "db", "query") is True def test_complex_precedence_scenario(self): """Test complex scenario with multiple precedence levels.""" rules = { "agents": { "backend": { "allow": { "servers": ["postgres"], "tools": { "postgres": ["*", "query", "read_data"] # Wildcard + explicit } }, "deny": { "tools": { "postgres": ["drop_*", "delete_all"] # Pattern + explicit } } } }, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(rules) # Allowed by wildcard, not denied assert engine.can_access_tool("backend", "postgres", "insert_data") is True assert engine.can_access_tool("backend", "postgres", "query") is True # Denied by explicit deny assert engine.can_access_tool("backend", "postgres", "delete_all") is False # Denied by pattern assert engine.can_access_tool("backend", "postgres", "drop_table") is False assert engine.can_access_tool("backend", "postgres", "drop_index") is False class TestImplicitGrant: """Test cases for implicit tool grant behavior.""" def test_no_tool_rules_defaults_to_implicit_grant(self): """Test that no tool rules means implicit grant of all tools.""" rules = { "agents": { "test": { "allow": { "servers": ["db"] # No tools section - should grant all tools implicitly } } } } engine = PolicyEngine(rules) assert engine.can_access_server("test", "db") is True assert engine.can_access_tool("test", "db", "any_tool") is True assert engine.can_access_tool("test", "db", "another_tool") is True assert engine.can_access_tool("test", "db", "query") is True def test_explicit_tool_rules_override_implicit_grant(self): """Test that explicit tool rules narrow access from implicit grant.""" rules = { "agents": { "test": { "allow": { "servers": ["db"], "tools": { "db": ["query", "list_tables"] } } } } } engine = PolicyEngine(rules) assert engine.can_access_server("test", "db") is True assert engine.can_access_tool("test", "db", "query") is True assert engine.can_access_tool("test", "db", "list_tables") is True assert engine.can_access_tool("test", "db", "drop_table") is False # Not in explicit list def test_wildcard_tools_grant_all_explicitly(self): """Test that explicit wildcard ['*'] grants all tools.""" rules = { "agents": { "test": { "allow": { "servers": ["db"], "tools": { "db": ["*"] } } } } } engine = PolicyEngine(rules) assert engine.can_access_tool("test", "db", "any_tool") is True assert engine.can_access_tool("test", "db", "another_tool") is True def test_deny_tools_filters_implicit_grant(self): """Test that deny.tools filters tools from implicit grant.""" rules = { "agents": { "test": { "allow": { "servers": ["db"] # No tools - implicit grant all }, "deny": { "tools": { "db": ["drop_*", "delete_*"] } } } } } engine = PolicyEngine(rules) # Should allow most tools (implicit grant) assert engine.can_access_tool("test", "db", "query") is True assert engine.can_access_tool("test", "db", "insert") is True assert engine.can_access_tool("test", "db", "list_tables") is True # Should deny dangerous tools assert engine.can_access_tool("test", "db", "drop_table") is False assert engine.can_access_tool("test", "db", "drop_database") is False assert engine.can_access_tool("test", "db", "delete_user") is False def test_admin_wildcard_servers_grants_all_tools(self): """Test that admin with servers:['*'] gets all tools from all servers.""" rules = { "agents": { "admin": { "allow": { "servers": ["*"] # No tools section - should grant all tools from all servers } } } } engine = PolicyEngine(rules) # All servers accessible assert engine.can_access_server("admin", "playwright") is True assert engine.can_access_server("admin", "brave-search") is True assert engine.can_access_server("admin", "github") is True # All tools accessible (implicit grant) assert engine.can_access_tool("admin", "playwright", "browser_navigate") is True assert engine.can_access_tool("admin", "brave-search", "brave_web_search") is True assert engine.can_access_tool("admin", "github", "create_issue") is True def test_mixed_explicit_and_implicit_tool_grants(self): """Test combination of servers with explicit tool rules and implicit grants.""" rules = { "agents": { "test": { "allow": { "servers": ["db", "api", "filesystem"], "tools": { "db": ["query"], # Explicit restriction # "api" has no tools entry - implicit grant all "filesystem": ["read_*"] # Explicit restriction } } } } } engine = PolicyEngine(rules) # db: only query allowed assert engine.can_access_tool("test", "db", "query") is True assert engine.can_access_tool("test", "db", "insert") is False # api: all tools allowed (implicit) assert engine.can_access_tool("test", "api", "get_data") is True assert engine.can_access_tool("test", "api", "post_data") is True assert engine.can_access_tool("test", "api", "delete_data") is True # filesystem: only read_* pattern allowed assert engine.can_access_tool("test", "filesystem", "read_file") is True assert engine.can_access_tool("test", "filesystem", "read_directory") is True assert engine.can_access_tool("test", "filesystem", "write_file") is False def test_deny_with_implicit_grant_per_server(self): """Test that deny.tools are server-specific with implicit grant.""" rules = { "agents": { "test": { "allow": { "servers": ["db1", "db2"] # No tools - implicit grant for both servers }, "deny": { "tools": { "db1": ["drop_*"] # Only deny on db1 } } } } } engine = PolicyEngine(rules) # db1: implicit grant minus drop_* assert engine.can_access_tool("test", "db1", "query") is True assert engine.can_access_tool("test", "db1", "insert") is True assert engine.can_access_tool("test", "db1", "drop_table") is False # Denied # db2: full implicit grant (no deny rules) assert engine.can_access_tool("test", "db2", "query") is True assert engine.can_access_tool("test", "db2", "insert") is True assert engine.can_access_tool("test", "db2", "drop_table") is True # Allowed class TestWildcardPatternMatching: """Test cases for wildcard pattern matching functionality.""" def test_wildcard_star_matches_all(self): """Test that '*' pattern matches all tool names.""" rules = { "agents": { "test": { "allow": { "servers": ["api"], "tools": {"api": ["*"]} } } } } engine = PolicyEngine(rules) assert engine.can_access_tool("test", "api", "any_tool") is True assert engine.can_access_tool("test", "api", "another_tool") is True assert engine.can_access_tool("test", "api", "get_data") is True def test_prefix_wildcard(self): """Test that 'get_*' pattern matches tools starting with 'get_'.""" rules = { "agents": { "test": { "allow": { "servers": ["api"], "tools": {"api": ["get_*"]} } } } } engine = PolicyEngine(rules) assert engine.can_access_tool("test", "api", "get_user") is True assert engine.can_access_tool("test", "api", "get_data") is True assert engine.can_access_tool("test", "api", "get_all_records") is True assert engine.can_access_tool("test", "api", "set_user") is False assert engine.can_access_tool("test", "api", "user") is False def test_suffix_wildcard(self): """Test that '*_query' pattern matches tools ending with '_query'.""" rules = { "agents": { "test": { "allow": { "servers": ["db"], "tools": {"db": ["*_query"]} } } } } engine = PolicyEngine(rules) assert engine.can_access_tool("test", "db", "read_query") is True assert engine.can_access_tool("test", "db", "write_query") is True assert engine.can_access_tool("test", "db", "complex_search_query") is True assert engine.can_access_tool("test", "db", "query") is False assert engine.can_access_tool("test", "db", "query_builder") is False def test_multiple_patterns(self): """Test that multiple patterns can be specified.""" rules = { "agents": { "test": { "allow": { "servers": ["api"], "tools": {"api": ["get_*", "list_*", "search_*"]} } } } } engine = PolicyEngine(rules) assert engine.can_access_tool("test", "api", "get_user") is True assert engine.can_access_tool("test", "api", "list_items") is True assert engine.can_access_tool("test", "api", "search_data") is True assert engine.can_access_tool("test", "api", "delete_user") is False class TestServerAccess: """Test cases for server-level access control.""" def test_agent_not_in_rules_deny_default(self): """Test unknown agent with deny_on_missing_agent=true.""" rules = { "agents": { "known_agent": { "allow": {"servers": ["api"]} } }, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(rules) assert engine.can_access_server("unknown_agent", "api") is False assert engine.can_access_server("unknown_agent", "any_server") is False def test_agent_not_in_rules_allow_default(self): """Test unknown agent with deny_on_missing_agent=false.""" rules = { "agents": { "known_agent": { "allow": {"servers": ["api"]} } }, "defaults": {"deny_on_missing_agent": False} } engine = PolicyEngine(rules) # Unknown agents allowed when default is permissive assert engine.can_access_server("unknown_agent", "api") is True def test_server_in_allow_list(self): """Test that server in allow list is accessible.""" rules = { "agents": { "test": { "allow": {"servers": ["postgres", "redis"]} } } } engine = PolicyEngine(rules) assert engine.can_access_server("test", "postgres") is True assert engine.can_access_server("test", "redis") is True assert engine.can_access_server("test", "mongodb") is False def test_server_in_deny_list(self): """Test that server in deny list is blocked.""" rules = { "agents": { "test": { "allow": {"servers": ["*"]}, "deny": {"servers": ["production_db"]} } } } engine = PolicyEngine(rules) assert engine.can_access_server("test", "dev_db") is True assert engine.can_access_server("test", "production_db") is False def test_wildcard_server_access(self): """Test that wildcard '*' allows all servers.""" rules = { "agents": { "admin": { "allow": {"servers": ["*"]} } } } engine = PolicyEngine(rules) assert engine.can_access_server("admin", "any_server") is True assert engine.can_access_server("admin", "another_server") is True def test_wildcard_server_deny(self): """Test that wildcard deny blocks all servers.""" rules = { "agents": { "restricted": { "deny": {"servers": ["*"]} } } } engine = PolicyEngine(rules) assert engine.can_access_server("restricted", "any_server") is False class TestToolAccess: """Test cases for tool-level access control.""" def test_server_access_required_for_tool_access(self): """Test that agent must have server access before tool access.""" rules = { "agents": { "test": { "allow": { "servers": ["api"], "tools": { "postgres": ["*"] # Has tool rules but no server access } } } } } engine = PolicyEngine(rules) # Cannot access postgres tools without postgres server access assert engine.can_access_tool("test", "postgres", "query") is False def test_explicit_tool_allow(self): """Test explicit tool name in allow list.""" rules = { "agents": { "test": { "allow": { "servers": ["db"], "tools": {"db": ["query", "read"]} } } } } engine = PolicyEngine(rules) assert engine.can_access_tool("test", "db", "query") is True assert engine.can_access_tool("test", "db", "read") is True assert engine.can_access_tool("test", "db", "write") is False def test_explicit_tool_deny(self): """Test explicit tool name in deny list.""" rules = { "agents": { "test": { "allow": { "servers": ["db"], "tools": {"db": ["*"]} }, "deny": { "tools": {"db": ["drop_table"]} } } } } engine = PolicyEngine(rules) assert engine.can_access_tool("test", "db", "query") is True assert engine.can_access_tool("test", "db", "drop_table") is False def test_pattern_tool_allow(self): """Test pattern matching in tool allow rules.""" rules = { "agents": { "readonly": { "allow": { "servers": ["db"], "tools": {"db": ["get_*", "list_*", "read_*"]} } } } } engine = PolicyEngine(rules) assert engine.can_access_tool("readonly", "db", "get_user") is True assert engine.can_access_tool("readonly", "db", "list_tables") is True assert engine.can_access_tool("readonly", "db", "read_data") is True assert engine.can_access_tool("readonly", "db", "write_data") is False def test_pattern_tool_deny(self): """Test pattern matching in tool deny rules.""" rules = { "agents": { "safe": { "allow": { "servers": ["db"], "tools": {"db": ["*"]} }, "deny": { "tools": {"db": ["drop_*", "delete_*", "truncate_*"]} } } } } engine = PolicyEngine(rules) assert engine.can_access_tool("safe", "db", "query") is True assert engine.can_access_tool("safe", "db", "drop_table") is False assert engine.can_access_tool("safe", "db", "delete_all") is False assert engine.can_access_tool("safe", "db", "truncate_table") is False class TestHelperMethods: """Test cases for helper methods.""" def test_get_allowed_servers_basic(self): """Test getting list of allowed servers.""" rules = { "agents": { "test": { "allow": {"servers": ["api", "db", "cache"]} } } } engine = PolicyEngine(rules) servers = engine.get_allowed_servers("test") assert set(servers) == {"api", "db", "cache"} def test_get_allowed_servers_wildcard(self): """Test that wildcard returns ['*'].""" rules = { "agents": { "admin": { "allow": {"servers": ["*"]} } } } engine = PolicyEngine(rules) servers = engine.get_allowed_servers("admin") assert servers == ["*"] def test_get_allowed_servers_with_deny(self): """Test that denied servers are filtered out.""" rules = { "agents": { "test": { "allow": {"servers": ["api", "db", "cache"]}, "deny": {"servers": ["cache"]} } } } engine = PolicyEngine(rules) servers = engine.get_allowed_servers("test") assert "api" in servers assert "db" in servers assert "cache" not in servers def test_get_allowed_servers_unknown_agent(self): """Test get_allowed_servers for unknown agent.""" rules = { "agents": {"known": {"allow": {"servers": ["api"]}}}, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(rules) servers = engine.get_allowed_servers("unknown") assert servers == [] def test_get_allowed_tools_wildcard(self): """Test that wildcard tools returns '*'.""" rules = { "agents": { "test": { "allow": { "servers": ["api"], "tools": {"api": ["*"]} } } } } engine = PolicyEngine(rules) tools = engine.get_allowed_tools("test", "api") assert tools == "*" def test_get_allowed_tools_list(self): """Test that specific tools return list.""" rules = { "agents": { "test": { "allow": { "servers": ["api"], "tools": {"api": ["get_*", "list_*"]} } } } } engine = PolicyEngine(rules) tools = engine.get_allowed_tools("test", "api") assert isinstance(tools, list) assert "get_*" in tools assert "list_*" in tools def test_get_allowed_tools_no_server_access(self): """Test that no server access returns empty list.""" rules = { "agents": { "test": { "allow": { "servers": ["api"], "tools": {"db": ["*"]} } } } } engine = PolicyEngine(rules) tools = engine.get_allowed_tools("test", "db") assert tools == [] def test_get_policy_decision_reason_server_denied(self): """Test policy reason when server is denied.""" rules = { "agents": { "test": { "allow": {"servers": ["api"]}, "deny": {"servers": ["db"]} } } } engine = PolicyEngine(rules) reason = engine.get_policy_decision_reason("test", "db") assert "denied" in reason.lower() assert "db" in reason def test_get_policy_decision_reason_server_allowed(self): """Test policy reason when server is allowed.""" rules = { "agents": { "test": { "allow": {"servers": ["api"]} } } } engine = PolicyEngine(rules) reason = engine.get_policy_decision_reason("test", "api") assert "allowed" in reason.lower() assert "api" in reason def test_get_policy_decision_reason_tool_denied(self): """Test policy reason when tool is denied.""" rules = { "agents": { "test": { "allow": { "servers": ["db"], "tools": {"db": ["*"]} }, "deny": { "tools": {"db": ["drop_table"]} } } } } engine = PolicyEngine(rules) reason = engine.get_policy_decision_reason("test", "db", "drop_table") assert "denied" in reason.lower() assert "drop_table" in reason def test_get_policy_decision_reason_tool_allowed(self): """Test policy reason when tool is allowed.""" rules = { "agents": { "test": { "allow": { "servers": ["db"], "tools": {"db": ["query"]} } } } } engine = PolicyEngine(rules) reason = engine.get_policy_decision_reason("test", "db", "query") assert "allowed" in reason.lower() assert "query" in reason def test_get_policy_decision_reason_unknown_agent(self): """Test policy reason for unknown agent.""" rules = { "agents": {}, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(rules) reason = engine.get_policy_decision_reason("unknown", "api") assert "not found" in reason.lower() assert "unknown" in reason def test_get_policy_decision_reason_wildcard_server_allow(self): """Test policy reason when server allowed by wildcard.""" rules = { "agents": { "test": { "allow": {"servers": ["*"]} } } } engine = PolicyEngine(rules) reason = engine.get_policy_decision_reason("test", "any_server") assert "wildcard" in reason.lower() assert "*" in reason def test_get_policy_decision_reason_pattern_deny(self): """Test policy reason when tool denied by pattern.""" rules = { "agents": { "test": { "allow": { "servers": ["db"], "tools": {"db": ["*"]} }, "deny": { "tools": {"db": ["drop_*"]} } } } } engine = PolicyEngine(rules) reason = engine.get_policy_decision_reason("test", "db", "drop_table") assert "denied by pattern" in reason.lower() assert "drop_*" in reason def test_get_policy_decision_reason_pattern_allow(self): """Test policy reason when tool allowed by pattern.""" rules = { "agents": { "test": { "allow": { "servers": ["db"], "tools": {"db": ["get_*"]} } } } } engine = PolicyEngine(rules) reason = engine.get_policy_decision_reason("test", "db", "get_user") assert "allowed by pattern" in reason.lower() assert "get_*" in reason def test_get_policy_decision_reason_tool_not_allowed(self): """Test policy reason when tool is not in allowed list.""" rules = { "agents": { "test": { "allow": { "servers": ["db"], "tools": {"db": ["query"]} } } } } engine = PolicyEngine(rules) reason = engine.get_policy_decision_reason("test", "db", "write") assert "not in allowed list" in reason.lower() assert "write" in reason class TestPolicyReload: """Test cases for policy reload functionality.""" def test_reload_valid_rules(self): """Test successful reload with valid rules.""" initial_rules = { "agents": { "agent1": { "allow": {"servers": ["api"]} } }, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(initial_rules) # Verify initial state assert engine.can_access_server("agent1", "api") is True assert engine.can_access_server("agent2", "db") is False # Reload with new rules new_rules = { "agents": { "agent1": { "allow": {"servers": ["api", "db"]} }, "agent2": { "allow": {"servers": ["db"]} } }, "defaults": {"deny_on_missing_agent": True} } success, error = engine.reload(new_rules) # Verify reload succeeded assert success is True assert error is None # Verify new rules are active assert engine.can_access_server("agent1", "db") is True assert engine.can_access_server("agent2", "db") is True def test_reload_invalid_rules_no_change(self): """Test that invalid rules don't modify engine state.""" initial_rules = { "agents": { "agent1": { "allow": {"servers": ["api"]} } }, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(initial_rules) # Attempt to reload with invalid rules (malformed structure) invalid_rules = { "agents": { "agent1": { "allow": {"servers": "not_a_list"} # Should be list } } } success, error = engine.reload(invalid_rules) # Verify reload failed assert success is False assert error is not None assert "Validation error" in error # Verify original rules still active assert engine.can_access_server("agent1", "api") is True def test_reload_with_agent_additions(self): """Test reload that adds new agents.""" initial_rules = { "agents": { "agent1": { "allow": {"servers": ["api"]} } } } engine = PolicyEngine(initial_rules) new_rules = { "agents": { "agent1": { "allow": {"servers": ["api"]} }, "agent2": { "allow": {"servers": ["db"]} }, "agent3": { "allow": {"servers": ["cache"]} } } } success, error = engine.reload(new_rules) assert success is True assert engine.can_access_server("agent2", "db") is True assert engine.can_access_server("agent3", "cache") is True def test_reload_with_agent_removals(self): """Test reload that removes agents.""" initial_rules = { "agents": { "agent1": {"allow": {"servers": ["api"]}}, "agent2": {"allow": {"servers": ["db"]}}, "agent3": {"allow": {"servers": ["cache"]}} }, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(initial_rules) new_rules = { "agents": { "agent1": {"allow": {"servers": ["api"]}} }, "defaults": {"deny_on_missing_agent": True} } success, error = engine.reload(new_rules) assert success is True # Removed agents should be denied (if default is deny) assert engine.can_access_server("agent2", "db") is False assert engine.can_access_server("agent3", "cache") is False def test_reload_with_agent_modifications(self): """Test reload that modifies existing agent permissions.""" initial_rules = { "agents": { "agent1": { "allow": { "servers": ["api"], "tools": {"api": ["get_*"]} } } } } engine = PolicyEngine(initial_rules) # Verify initial permissions assert engine.can_access_tool("agent1", "api", "get_user") is True assert engine.can_access_tool("agent1", "api", "set_user") is False # Reload with modified permissions new_rules = { "agents": { "agent1": { "allow": { "servers": ["api"], "tools": {"api": ["*"]} # Now allow all tools } } } } success, error = engine.reload(new_rules) assert success is True # Verify new permissions assert engine.can_access_tool("agent1", "api", "get_user") is True assert engine.can_access_tool("agent1", "api", "set_user") is True def test_reload_with_defaults_change(self): """Test reload that changes default policy.""" initial_rules = { "agents": { "agent1": {"allow": {"servers": ["api"]}} }, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(initial_rules) # Unknown agent should be denied assert engine.can_access_server("unknown", "api") is False # Reload with permissive default new_rules = { "agents": { "agent1": {"allow": {"servers": ["api"]}} }, "defaults": {"deny_on_missing_agent": False} } success, error = engine.reload(new_rules) assert success is True # Unknown agent should now be allowed assert engine.can_access_server("unknown", "api") is True def test_reload_empty_rules(self): """Test reload with empty agents section.""" initial_rules = { "agents": { "agent1": {"allow": {"servers": ["api"]}} }, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(initial_rules) # Reload with empty agents new_rules = { "agents": {}, "defaults": {"deny_on_missing_agent": True} } success, error = engine.reload(new_rules) assert success is True # All agents should now be denied assert engine.can_access_server("agent1", "api") is False def test_reload_no_changes(self): """Test reload with identical rules.""" rules = { "agents": { "agent1": {"allow": {"servers": ["api"]}} }, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(rules) # Reload with same rules success, error = engine.reload(rules) assert success is True assert error is None # Should still work the same assert engine.can_access_server("agent1", "api") is True def test_reload_invalid_wildcard_pattern(self): """Test reload with invalid wildcard patterns.""" initial_rules = { "agents": { "agent1": {"allow": {"servers": ["api"]}} } } engine = PolicyEngine(initial_rules) # Attempt reload with invalid pattern (multiple wildcards) invalid_rules = { "agents": { "agent1": { "allow": { "servers": ["api"], "tools": {"api": ["get_*_data"]} # Multiple wildcards not allowed } } } } success, error = engine.reload(invalid_rules) # Should fail validation assert success is False assert error is not None # Original rules should remain assert engine.can_access_server("agent1", "api") is True def test_reload_preserves_deny_before_allow(self): """Test that reload maintains deny-before-allow precedence.""" initial_rules = { "agents": { "agent1": { "allow": { "servers": ["db"], "tools": {"db": ["*"]} } } } } engine = PolicyEngine(initial_rules) # Reload with deny rules added new_rules = { "agents": { "agent1": { "allow": { "servers": ["db"], "tools": {"db": ["*"]} }, "deny": { "tools": {"db": ["drop_*"]} } } } } success, error = engine.reload(new_rules) assert success is True # Verify deny-before-allow is respected assert engine.can_access_tool("agent1", "db", "query") is True assert engine.can_access_tool("agent1", "db", "drop_table") is False class TestDefaultAgent: """Test cases for agent named 'default' - used in fallback chain.""" def test_default_agent_is_regular_agent(self): """Test that 'default' is treated as a regular agent name.""" rules = { "agents": { "default": { "allow": {"servers": ["api"]} }, "researcher": { "allow": {"servers": ["brave-search"]} } } } engine = PolicyEngine(rules) # 'default' should work like any other agent assert engine.can_access_server("default", "api") is True assert engine.can_access_server("default", "brave-search") is False assert engine.can_access_server("researcher", "brave-search") is True assert engine.can_access_server("researcher", "api") is False def test_default_agent_with_tool_permissions(self): """Test that policy evaluation works with agent_id='default'.""" rules = { "agents": { "default": { "allow": { "servers": ["db"], "tools": {"db": ["query", "read_*"]} }, "deny": { "tools": {"db": ["drop_*"]} } } } } engine = PolicyEngine(rules) # Test server access assert engine.can_access_server("default", "db") is True # Test explicit tool permissions assert engine.can_access_tool("default", "db", "query") is True # Test wildcard allow patterns assert engine.can_access_tool("default", "db", "read_data") is True assert engine.can_access_tool("default", "db", "read_users") is True # Test wildcard deny patterns assert engine.can_access_tool("default", "db", "drop_table") is False # Test tool not in allow list assert engine.can_access_tool("default", "db", "write") is False def test_default_agent_with_deny_before_allow(self): """Test that deny-before-allow precedence works for 'default' agent.""" rules = { "agents": { "default": { "allow": { "servers": ["db"], "tools": {"db": ["*"]} # Allow all }, "deny": { "tools": {"db": ["dangerous_op"]} # But deny this one } } } } engine = PolicyEngine(rules) # Should allow most tools assert engine.can_access_tool("default", "db", "query") is True assert engine.can_access_tool("default", "db", "read") is True # Should deny dangerous_op (explicit deny overrides wildcard allow) assert engine.can_access_tool("default", "db", "dangerous_op") is False def test_get_allowed_servers_for_default_agent(self): """Test helper method returns correct servers for 'default' agent.""" rules = { "agents": { "default": { "allow": {"servers": ["api", "db", "cache"]} } } } engine = PolicyEngine(rules) servers = engine.get_allowed_servers("default") assert set(servers) == {"api", "db", "cache"} def test_get_allowed_tools_for_default_agent(self): """Test helper method returns correct tools for 'default' agent.""" rules = { "agents": { "default": { "allow": { "servers": ["api"], "tools": {"api": ["get_*", "list_*"]} } } } } engine = PolicyEngine(rules) tools = engine.get_allowed_tools("default", "api") assert isinstance(tools, list) assert "get_*" in tools assert "list_*" in tools def test_get_policy_decision_reason_for_default_agent(self): """Test policy reason works correctly for 'default' agent.""" rules = { "agents": { "default": { "allow": { "servers": ["api"], "tools": {"api": ["query"]} } } } } engine = PolicyEngine(rules) # Test server access reason reason = engine.get_policy_decision_reason("default", "api") assert "allowed" in reason.lower() assert "api" in reason # Test tool access reason reason = engine.get_policy_decision_reason("default", "api", "query") assert "allowed" in reason.lower() assert "query" in reason def test_default_agent_coexists_with_other_agents(self): """Test that 'default' agent can coexist with other agents without conflicts.""" rules = { "agents": { "default": { "allow": {"servers": ["api"]} }, "researcher": { "allow": {"servers": ["brave-search"]} }, "backend": { "allow": {"servers": ["postgres"]} } } } engine = PolicyEngine(rules) # Each agent should have independent permissions assert engine.can_access_server("default", "api") is True assert engine.can_access_server("default", "brave-search") is False assert engine.can_access_server("default", "postgres") is False assert engine.can_access_server("researcher", "api") is False assert engine.can_access_server("researcher", "brave-search") is True assert engine.can_access_server("backend", "postgres") is True assert engine.can_access_server("backend", "api") is False def test_reload_with_default_agent(self): """Test that policy reload works correctly with 'default' agent.""" initial_rules = { "agents": { "default": { "allow": {"servers": ["api"]} } } } engine = PolicyEngine(initial_rules) # Verify initial state assert engine.can_access_server("default", "api") is True assert engine.can_access_server("default", "db") is False # Reload with updated permissions for 'default' new_rules = { "agents": { "default": { "allow": {"servers": ["api", "db"]} } } } success, error = engine.reload(new_rules) # Verify reload succeeded assert success is True assert error is None # Verify new rules are active assert engine.can_access_server("default", "api") is True assert engine.can_access_server("default", "db") is True class TestEdgeCases: """Test edge cases and boundary conditions.""" def test_empty_agents_section(self): """Test with no agents defined.""" rules = { "agents": {}, "defaults": {"deny_on_missing_agent": True} } engine = PolicyEngine(rules) assert engine.can_access_server("any_agent", "any_server") is False def test_no_defaults_section(self): """Test with no defaults section.""" rules = { "agents": { "test": { "allow": {"servers": ["api"]} } } } engine = PolicyEngine(rules) # Should default to deny for unknown agents assert engine.can_access_server("unknown", "api") is False def test_empty_allow_deny_sections(self): """Test with empty allow/deny sections.""" rules = { "agents": { "test": { "allow": {}, "deny": {} } } } engine = PolicyEngine(rules) assert engine.can_access_server("test", "any_server") is False def test_agent_with_only_deny_rules(self): """Test agent that only has deny rules.""" rules = { "agents": { "test": { "deny": {"servers": ["production"]} } } } engine = PolicyEngine(rules) # No allow rules means no access assert engine.can_access_server("test", "dev") is False assert engine.can_access_server("test", "production") is False def test_case_sensitive_matching(self): """Test that tool/server names are case-sensitive.""" rules = { "agents": { "test": { "allow": { "servers": ["API"], "tools": {"API": ["GetData"]} } } } } engine = PolicyEngine(rules) assert engine.can_access_server("test", "API") is True assert engine.can_access_server("test", "api") is False assert engine.can_access_tool("test", "API", "GetData") is True assert engine.can_access_tool("test", "API", "getdata") is False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/roddutra/agent-mcp-gateway'

If you have feedback or need assistance with the MCP directory API, please join our Discord server