Skip to main content
Glama
policy.py19.3 kB
"""Policy evaluation engine for Agent MCP Gateway. This module implements the core policy engine that evaluates agent permissions against configured rules. It enforces a strict deny-before-allow precedence and supports wildcard pattern matching for flexible rule definitions. Precedence Order (CRITICAL - DO NOT CHANGE): 1. Explicit deny rules (specific tool names) 2. Wildcard deny rules (patterns like drop_*) 3. Explicit allow rules (specific tool names) 4. Wildcard allow rules (patterns like get_* or *) 5. Implicit grant (if server allowed but no tool rules specified) 6. Default policy (deny) """ import fnmatch import logging import threading from typing import Literal, Optional logger = logging.getLogger(__name__) class PolicyEngine: """Evaluates agent permissions against configured rules. This class implements the security policy evaluation logic for the gateway. It determines whether agents can access specific servers and tools based on configurable allow/deny rules with wildcard pattern support. """ def __init__(self, rules: dict): """Initialize policy engine with rules dictionary. Args: rules: Gateway rules configuration with structure: { "agents": { "agent_id": { "allow": {"servers": [...], "tools": {...}}, "deny": {"servers": [...], "tools": {...}} } }, "defaults": {"deny_on_missing_agent": bool} } """ self.rules = rules self.agents = rules.get("agents", {}) self.defaults = rules.get("defaults", {}) self._lock = threading.RLock() # Reentrant lock for nested calls def can_access_server(self, agent_id: str, server: str) -> bool: """Check if agent can access a server. An agent can access a server if: - The agent exists in the rules - The server is in the agent's allow.servers list (or "*" is present) - The server is not in the agent's deny.servers list Args: agent_id: Agent identifier server: Server name Returns: True if agent can access server, False otherwise """ with self._lock: # Check if agent exists in rules if agent_id not in self.agents: # Unknown agent - check default policy return not self.defaults.get("deny_on_missing_agent", True) agent_rules = self.agents[agent_id] # Check deny rules first (deny takes precedence) deny_servers = agent_rules.get("deny", {}).get("servers", []) if server in deny_servers or "*" in deny_servers: return False # Check for wildcard deny patterns for pattern in deny_servers: if self._matches_pattern(server, pattern): return False # Check allow rules allow_servers = agent_rules.get("allow", {}).get("servers", []) # Explicit allow or wildcard allow if server in allow_servers or "*" in allow_servers: return True # Check for wildcard allow patterns for pattern in allow_servers: if self._matches_pattern(server, pattern): return True # Not explicitly allowed return False def can_access_tool(self, agent_id: str, server: str, tool: str) -> bool: """Check if agent can access a specific tool. Applies deny-before-allow precedence: 1. Explicit deny rules (specific tool names) 2. Wildcard deny rules (patterns like drop_*) 3. Explicit allow rules (specific tool names) 4. Wildcard allow rules (patterns like get_* or *) 5. Implicit grant (if server allowed but no tool rules specified) 6. Default policy (deny) Args: agent_id: Agent identifier server: Server name tool: Tool name Returns: True if agent can access tool, False otherwise """ with self._lock: # First, agent must have access to the server if not self.can_access_server(agent_id, server): return False # Check if agent exists in rules if agent_id not in self.agents: # Unknown agent but has server access - check default policy return not self.defaults.get("deny_on_missing_agent", True) agent_rules = self.agents[agent_id] # Get tool rules for this server deny_tools = agent_rules.get("deny", {}).get("tools", {}).get(server, []) allow_tools = agent_rules.get("allow", {}).get("tools", {}).get(server, []) # Separate explicit rules from wildcard patterns explicit_deny = [] wildcard_deny = [] explicit_allow = [] wildcard_allow = [] for rule in deny_tools: if "*" in rule: wildcard_deny.append(rule) else: explicit_deny.append(rule) for rule in allow_tools: if "*" in rule: wildcard_allow.append(rule) else: explicit_allow.append(rule) # Apply precedence order (CRITICAL - DO NOT CHANGE) # 1. Explicit deny rules if tool in explicit_deny: return False # 2. Wildcard deny rules for pattern in wildcard_deny: if self._matches_pattern(tool, pattern): return False # 3. Explicit allow rules if tool in explicit_allow: return True # 4. Wildcard allow rules for pattern in wildcard_allow: if self._matches_pattern(tool, pattern): return True # 5. Implicit grant - if server allowed but no tool rules specified if not allow_tools: return True # 6. Default policy - if no rules match, deny return False def get_allowed_servers(self, agent_id: str) -> list[str]: """Get list of servers this agent can access. Note: This returns the configured server list, not all possible servers. If wildcard "*" is present, returns ["*"] to indicate all servers allowed. Args: agent_id: Agent identifier Returns: List of server names the agent can access, or ["*"] for wildcard """ with self._lock: # Check if agent exists in rules if agent_id not in self.agents: # Unknown agent - check default policy if self.defaults.get("deny_on_missing_agent", True): return [] else: # If not denying unknown agents, return empty list # (caller should interpret this as "depends on what servers exist") return [] agent_rules = self.agents[agent_id] allow_servers = agent_rules.get("allow", {}).get("servers", []) deny_servers = agent_rules.get("deny", {}).get("servers", []) # If wildcard allow and no wildcard deny, return wildcard if "*" in allow_servers and "*" not in deny_servers: return ["*"] # Filter out denied servers allowed = [] for server in allow_servers: if server == "*": continue # Check if this server is denied is_denied = False if server in deny_servers: is_denied = True else: # Check wildcard deny patterns for pattern in deny_servers: if self._matches_pattern(server, pattern): is_denied = True break if not is_denied: allowed.append(server) return allowed def get_allowed_tools(self, agent_id: str, server: str) -> list[str] | Literal["*"]: """Get list of allowed tools for agent on server. Returns either a list of specific tool names or "*" to indicate all tools are allowed (subject to deny rules being checked at access time). Args: agent_id: Agent identifier server: Server name Returns: List of tool names or "*" for wildcard access """ with self._lock: # Agent must have server access first if not self.can_access_server(agent_id, server): return [] # Check if agent exists in rules if agent_id not in self.agents: # Unknown agent but has server access if not self.defaults.get("deny_on_missing_agent", True): return "*" return [] agent_rules = self.agents[agent_id] allow_tools = agent_rules.get("allow", {}).get("tools", {}).get(server, []) # If wildcard allow, return "*" if "*" in allow_tools: return "*" # Return list of allowed tools (including patterns) return allow_tools def get_policy_decision_reason(self, agent_id: str, server: str, tool: str | None = None) -> str: """Get human-readable reason for policy decision. Provides clear explanation of why access was allowed or denied, useful for debugging and audit logs. Args: agent_id: Agent identifier server: Server name tool: Optional tool name Returns: String explaining why access was allowed/denied """ with self._lock: # Check if agent exists if agent_id not in self.agents: if self.defaults.get("deny_on_missing_agent", True): return f"Agent '{agent_id}' not found in rules; default policy denies access" else: return f"Agent '{agent_id}' not found in rules; default policy allows access" agent_rules = self.agents[agent_id] # Check server access deny_servers = agent_rules.get("deny", {}).get("servers", []) allow_servers = agent_rules.get("allow", {}).get("servers", []) # Check explicit server deny if server in deny_servers: return f"Server '{server}' explicitly denied for agent '{agent_id}'" # Check wildcard server deny for pattern in deny_servers: if self._matches_pattern(server, pattern): return f"Server '{server}' denied by pattern '{pattern}' for agent '{agent_id}'" # Check server allow server_allowed = False server_allow_reason = "" if server in allow_servers: server_allowed = True server_allow_reason = f"Server '{server}' explicitly allowed" elif "*" in allow_servers: server_allowed = True server_allow_reason = "Server allowed by wildcard '*'" else: # Check wildcard patterns for pattern in allow_servers: if self._matches_pattern(server, pattern): server_allowed = True server_allow_reason = f"Server '{server}' allowed by pattern '{pattern}'" break if not server_allowed: return f"Server '{server}' not in allowed list for agent '{agent_id}'" # If no tool specified, return server access reason if tool is None: return server_allow_reason # Check tool access deny_tools = agent_rules.get("deny", {}).get("tools", {}).get(server, []) allow_tools = agent_rules.get("allow", {}).get("tools", {}).get(server, []) # Check explicit tool deny if tool in deny_tools: return f"Tool '{tool}' explicitly denied for agent '{agent_id}' on server '{server}'" # Check explicit tool allow if tool in allow_tools: return f"Tool '{tool}' explicitly allowed for agent '{agent_id}' on server '{server}'" # Check wildcard deny patterns for pattern in deny_tools: if "*" in pattern and self._matches_pattern(tool, pattern): return f"Tool '{tool}' denied by pattern '{pattern}' for agent '{agent_id}' on server '{server}'" # Check wildcard allow patterns for pattern in allow_tools: if "*" in pattern and self._matches_pattern(tool, pattern): return f"Tool '{tool}' allowed by pattern '{pattern}' for agent '{agent_id}' on server '{server}'" # No matching rule return f"Tool '{tool}' not in allowed list for agent '{agent_id}' on server '{server}'" def _matches_pattern(self, name: str, pattern: str) -> bool: """Check if name matches wildcard pattern. Uses glob-style pattern matching: - * matches any sequence of characters - ? matches any single character - [seq] matches any character in seq - [!seq] matches any character not in seq Args: name: String to match pattern: Pattern with wildcards (*, get_*, etc.) Returns: True if name matches pattern """ return fnmatch.fnmatch(name, pattern) def _compute_rule_diff(self, old_rules: dict, new_rules: dict) -> dict[str, list[str]]: """Compute differences between old and new rules. Args: old_rules: Current rules dictionary new_rules: New rules dictionary Returns: Dictionary with keys 'added', 'removed', 'modified' containing lists of agent IDs """ old_agents = set(old_rules.get("agents", {}).keys()) new_agents = set(new_rules.get("agents", {}).keys()) added = sorted(new_agents - old_agents) removed = sorted(old_agents - new_agents) # Check for modified agents (agents present in both but with different configs) modified = [] for agent_id in sorted(old_agents & new_agents): old_config = old_rules.get("agents", {}).get(agent_id) new_config = new_rules.get("agents", {}).get(agent_id) if old_config != new_config: modified.append(agent_id) # Check if defaults changed defaults_changed = old_rules.get("defaults") != new_rules.get("defaults") return { "added": added, "removed": removed, "modified": modified, "defaults_changed": defaults_changed } def reload(self, new_rules: dict) -> tuple[bool, Optional[str]]: """Reload policy rules with validation and atomic swap. This method validates new rules before applying them. If validation fails, the current rules remain unchanged. If validation succeeds, rules are atomically swapped to the new configuration. Thread-safety: This method is thread-safe and uses an internal lock to prevent race conditions during reload operations. Args: new_rules: New gateway rules configuration with structure: { "agents": { "agent_id": { "allow": {"servers": [...], "tools": {...}}, "deny": {"servers": [...], "tools": {...}} } }, "defaults": {"deny_on_missing_agent": bool} } Returns: Tuple of (success, error_message): - (True, None) if reload successful - (False, error_message) if validation failed Example: >>> engine = PolicyEngine(old_rules) >>> success, error = engine.reload(new_rules) >>> if success: ... print("Rules reloaded successfully") ... else: ... print(f"Reload failed: {error}") """ with self._lock: logger.info("PolicyEngine reload initiated") # Import validation function to avoid circular dependency at module level from src.config import validate_gateway_rules # Validate new rules structure valid, error_msg = validate_gateway_rules(new_rules) if not valid: logger.error(f"PolicyEngine reload failed: Validation error: {error_msg}") return False, f"Validation error: {error_msg}" logger.info("PolicyEngine reload: Validation passed") # Store old rules for potential rollback old_rules = self.rules try: # Compute diff for logging diff = self._compute_rule_diff(old_rules, new_rules) # Log changes if diff["added"]: logger.info(f"PolicyEngine reload: Added agents: {', '.join(diff['added'])}") if diff["removed"]: logger.info(f"PolicyEngine reload: Removed agents: {', '.join(diff['removed'])}") if diff["modified"]: logger.info(f"PolicyEngine reload: Modified agents: {', '.join(diff['modified'])}") if diff["defaults_changed"]: logger.info("PolicyEngine reload: Default policy changed") # Summarize changes total_changes = len(diff["added"]) + len(diff["removed"]) + len(diff["modified"]) if total_changes == 0 and not diff["defaults_changed"]: logger.info("PolicyEngine reload: No changes detected in rules") else: logger.info( f"PolicyEngine reload: Rules updated - " f"{len(diff['added'])} agents added, " f"{len(diff['removed'])} removed, " f"{len(diff['modified'])} modified" ) # Atomic swap: Update internal state self.rules = new_rules self.agents = new_rules.get("agents", {}) self.defaults = new_rules.get("defaults", {}) logger.info("PolicyEngine reload complete") return True, None except Exception as e: # Rollback on any error during swap logger.error(f"PolicyEngine reload failed: Unexpected error during swap: {e}") self.rules = old_rules self.agents = old_rules.get("agents", {}) self.defaults = old_rules.get("defaults", {}) return False, f"Unexpected error during reload: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/roddutra/agent-mcp-gateway'

If you have feedback or need assistance with the MCP directory API, please join our Discord server