Skip to main content
Glama
policy.py7.38 kB
""" Authorization policy engine for MCP tools """ import re import logging from dataclasses import dataclass from enum import Enum from typing import Any logger = logging.getLogger(__name__) class PolicyAction(Enum): ALLOW = "allow" DENY = "deny" @dataclass class ToolPolicy: """Policy rule for MCP tool access""" tool_pattern: str # regex pattern for tool names required_scopes: list[str] action: PolicyAction = PolicyAction.ALLOW conditions: dict[str, Any] | None = None def matches_tool(self, tool_name: str) -> bool: """Check if the policy applies to given tool""" return bool(re.match(self.tool_pattern, tool_name)) def evaluate_scopes(self, user_scopes: list[str]) -> bool: """Check if user has required scopes""" return all(scope in user_scopes for scope in self.required_scopes) class PolicyEngine: """Authorization policy engine for Turkish legal database tools""" def __init__(self): self.policies: list[ToolPolicy] = [] self.default_action = PolicyAction.DENY def add_policy(self, policy: ToolPolicy): """Add a policy rule""" self.policies.append(policy) logger.debug(f"Added policy: {policy.tool_pattern} -> {policy.required_scopes}") def add_tool_scope_policy( self, tool_pattern: str, required_scopes: str | list[str], action: PolicyAction = PolicyAction.ALLOW, ): """Convenience method to add tool-scope policy""" if isinstance(required_scopes, str): required_scopes = [required_scopes] policy = ToolPolicy( tool_pattern=tool_pattern, required_scopes=required_scopes, action=action ) self.add_policy(policy) def authorize_tool_call( self, tool_name: str, user_scopes: list[str], user_claims: dict[str, Any] | None = None, ) -> tuple[bool, str | None]: """ Authorize a tool call Returns: (authorized: bool, reason: Optional[str]) """ logger.debug(f"Authorizing tool '{tool_name}' for user with scopes: {user_scopes}") matching_policies = [ policy for policy in self.policies if policy.matches_tool(tool_name) ] if not matching_policies: if self.default_action == PolicyAction.ALLOW: logger.debug(f"No policies found for '{tool_name}', allowing by default") return True, None else: logger.warning(f"No policies found for '{tool_name}', denying by default") return False, f"No policy found for tool '{tool_name}', default deny" # Check for explicit deny policies first for policy in matching_policies: if policy.action == PolicyAction.DENY: if policy.evaluate_scopes(user_scopes): logger.warning(f"Explicit deny policy matched for '{tool_name}'") return False, f"Explicit deny policy for tool '{tool_name}'" # Check allow policies allow_policies = [ p for p in matching_policies if p.action == PolicyAction.ALLOW ] if not allow_policies: logger.warning(f"No allow policies found for '{tool_name}'") return False, f"No allow policies found for tool '{tool_name}'" for policy in allow_policies: if policy.evaluate_scopes(user_scopes): if self._evaluate_conditions(policy.conditions, user_claims): logger.debug(f"Authorization granted for '{tool_name}'") return True, None logger.warning(f"Insufficient scopes for '{tool_name}'. Required: {[p.required_scopes for p in allow_policies]}, User has: {user_scopes}") return False, f"Insufficient scopes for tool '{tool_name}'" def _evaluate_conditions( self, conditions: dict[str, Any] | None, user_claims: dict[str, Any] | None, ) -> bool: """Evaluate additional policy conditions""" if not conditions: return True if not user_claims: logger.debug("No user claims provided, conditions evaluation failed") return False for key, expected_value in conditions.items(): user_value = user_claims.get(key) if isinstance(expected_value, list): if user_value not in expected_value: logger.debug(f"Condition failed: {key} = {user_value} not in {expected_value}") return False elif user_value != expected_value: logger.debug(f"Condition failed: {key} = {user_value} != {expected_value}") return False return True def get_allowed_tools(self, user_scopes: list[str]) -> list[str]: """Get list of tool patterns user is allowed to call""" allowed_tools = [] for policy in self.policies: if policy.action == PolicyAction.ALLOW and policy.evaluate_scopes( user_scopes ): allowed_tools.append(policy.tool_pattern) return allowed_tools def create_turkish_legal_policies() -> PolicyEngine: """Create policy set for Turkish legal database MCP server""" engine = PolicyEngine() # Administrative tools (full access) engine.add_tool_scope_policy(".*", ["mcp:tools:admin"]) # Search tools - require read access engine.add_tool_scope_policy("search.*", ["mcp:tools:read"]) # Fetch/get document tools - require read access engine.add_tool_scope_policy("get_.*", ["mcp:tools:read"]) engine.add_tool_scope_policy("fetch.*", ["mcp:tools:read"]) # Specific Turkish legal database tools engine.add_tool_scope_policy("search_yargitay.*", ["mcp:tools:read"]) engine.add_tool_scope_policy("search_danistay.*", ["mcp:tools:read"]) engine.add_tool_scope_policy("search_anayasa.*", ["mcp:tools:read"]) engine.add_tool_scope_policy("search_rekabet.*", ["mcp:tools:read"]) engine.add_tool_scope_policy("search_kik.*", ["mcp:tools:read"]) engine.add_tool_scope_policy("search_emsal.*", ["mcp:tools:read"]) engine.add_tool_scope_policy("search_uyusmazlik.*", ["mcp:tools:read"]) engine.add_tool_scope_policy("search_sayistay.*", ["mcp:tools:read"]) engine.add_tool_scope_policy("search_.*_bedesten", ["mcp:tools:read"]) engine.add_tool_scope_policy("search_yerel_hukuk.*", ["mcp:tools:read"]) engine.add_tool_scope_policy("search_istinaf_hukuk.*", ["mcp:tools:read"]) engine.add_tool_scope_policy("search_kyb.*", ["mcp:tools:read"]) # Document retrieval tools engine.add_tool_scope_policy("get_.*_document.*", ["mcp:tools:read"]) engine.add_tool_scope_policy("get_.*_markdown", ["mcp:tools:read"]) # Write operations (if any future tools need them) engine.add_tool_scope_policy("create_.*", ["mcp:tools:write"]) engine.add_tool_scope_policy("update_.*", ["mcp:tools:write"]) engine.add_tool_scope_policy("delete_.*", ["mcp:tools:write"]) logger.info("Created Turkish legal database policy engine") return engine def create_default_policies() -> PolicyEngine: """Create a default policy set for MCP servers (backwards compatibility)""" return create_turkish_legal_policies()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/saidsurucu/yargi-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server