Skip to main content
Glama
tool_policies.py7.29 kB
""" Políticas de herramientas para el MCP. Este módulo define qué herramientas están permitidas, requieren confirmación, o están bloqueadas por defecto. Usa la configuración de service.yaml para determinar las políticas específicas del servicio. Niveles de riesgo: - LOW: Operaciones cotidianas, permitidas sin restricción - MEDIUM: Operaciones que modifican datos, con logging - HIGH: Operaciones destructivas reversibles, requieren confirmación - CRITICAL: Operaciones destructivas irreversibles, bloqueadas por defecto """ from __future__ import annotations import os import re from enum import Enum from typing import Optional import structlog from .config import get_config logger = structlog.get_logger(__name__) class RiskLevel(Enum): """Niveles de riesgo para operaciones.""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" class ToolAction(Enum): """Acciones posibles para una herramienta.""" ALLOW = "allow" ALLOW_WITH_LOGGING = "log" REQUIRE_CONFIRMATION = "confirm" BLOCK = "block" # ============================================================================= # Configuración por variables de entorno # ============================================================================= def _get_bool_env(name: str, default: bool = False) -> bool: """Obtiene una variable de entorno booleana.""" value = os.getenv(name, "").lower() if value in ("true", "1", "yes", "on"): return True if value in ("false", "0", "no", "off"): return False return default # Habilitar herramientas críticas (PELIGROSO - solo para desarrollo/testing) ENABLE_CRITICAL_TOOLS = _get_bool_env("ENABLE_CRITICAL_TOOLS", False) # Deshabilitar logging de operaciones (no recomendado en producción) DISABLE_OPERATION_LOGGING = _get_bool_env("DISABLE_OPERATION_LOGGING", False) # ============================================================================= # Funciones de política # ============================================================================= def _matches_pattern(tool_name: str, patterns: list[str]) -> bool: """Verifica si un tool_name coincide con alguno de los patrones.""" for pattern in patterns: if re.search(pattern, tool_name, re.IGNORECASE): return True return False def is_tool_blocked(tool_name: str) -> bool: """ Verifica si una herramienta está bloqueada. Args: tool_name: Nombre de la herramienta Returns: True si la herramienta está bloqueada """ if ENABLE_CRITICAL_TOOLS: return False config = get_config() return _matches_pattern(tool_name, config.policies.blocked_patterns) def get_tool_policy(tool_name: str) -> tuple[RiskLevel, ToolAction]: """ Obtiene la política para una herramienta. Args: tool_name: Nombre de la herramienta Returns: Tupla (nivel_de_riesgo, acción) """ config = get_config() # Verificar si está bloqueada if _matches_pattern(tool_name, config.policies.blocked_patterns): return (RiskLevel.CRITICAL, ToolAction.BLOCK) # Verificar si requiere confirmación if _matches_pattern(tool_name, config.policies.require_confirmation): return (RiskLevel.HIGH, ToolAction.REQUIRE_CONFIRMATION) # Inferir por el nombre de la operación tool_lower = tool_name.lower() # Operaciones GET son de bajo riesgo if "_get_" in tool_lower or tool_lower.startswith("get_"): return (RiskLevel.LOW, ToolAction.ALLOW) # Operaciones DELETE son de riesgo medio-alto if "_delete_" in tool_lower or tool_lower.startswith("delete_"): return (RiskLevel.MEDIUM, ToolAction.ALLOW_WITH_LOGGING) # Por defecto: riesgo bajo, permitido con logging return (RiskLevel.LOW, ToolAction.ALLOW_WITH_LOGGING) def requires_confirmation(tool_name: str) -> bool: """ Verifica si una herramienta requiere confirmación explícita. Args: tool_name: Nombre de la herramienta Returns: True si requiere parámetro confirm=true """ config = get_config() if _matches_pattern(tool_name, config.policies.require_confirmation): return True _, action = get_tool_policy(tool_name) return action == ToolAction.REQUIRE_CONFIRMATION def should_log_operation(tool_name: str) -> bool: """ Verifica si una operación debe ser logueada. Args: tool_name: Nombre de la herramienta Returns: True si la operación debe registrarse """ if DISABLE_OPERATION_LOGGING: return False config = get_config() if config.policies.audit_all: return True _, action = get_tool_policy(tool_name) return action in ( ToolAction.ALLOW_WITH_LOGGING, ToolAction.REQUIRE_CONFIRMATION, ) def get_blocked_tools_list() -> list[str]: """ Retorna la lista de patrones de herramientas bloqueadas. Returns: Lista de patrones de herramientas bloqueadas """ if ENABLE_CRITICAL_TOOLS: return [] config = get_config() return config.policies.blocked_patterns def log_tool_execution( tool_name: str, *, params: Optional[dict] = None, result_summary: Optional[str] = None, error: Optional[str] = None, ) -> None: """ Registra la ejecución de una herramienta para auditoría. Args: tool_name: Nombre de la herramienta ejecutada params: Parámetros (se filtran datos sensibles) result_summary: Resumen del resultado error: Error si ocurrió """ if DISABLE_OPERATION_LOGGING: return if not should_log_operation(tool_name): return risk, action = get_tool_policy(tool_name) safe_params = _filter_sensitive_params(params) if params else {} log_data = { "tool": tool_name, "risk_level": risk.value, "action": action.value, "params": safe_params, } if result_summary: log_data["result"] = result_summary if error: log_data["error"] = error logger.warning("tool_execution_failed", **log_data) else: logger.info("tool_executed", **log_data) def _filter_sensitive_params(params: dict) -> dict: """ Filtra parámetros sensibles para logging seguro. No registra: - Contenido completo de descripciones largas - Tokens o credenciales - Datos personales """ SENSITIVE_KEYS = {"key", "token", "password", "secret", "credential", "api_key", "apikey"} TRUNCATE_KEYS = {"desc", "description", "text", "body", "content"} MAX_VALUE_LENGTH = 100 filtered = {} for key, value in params.items(): key_lower = key.lower() if any(s in key_lower for s in SENSITIVE_KEYS): filtered[key] = "[REDACTED]" continue if any(s in key_lower for s in TRUNCATE_KEYS): if isinstance(value, str) and len(value) > MAX_VALUE_LENGTH: filtered[key] = value[:MAX_VALUE_LENGTH] + "..." continue if isinstance(value, str) and len(value) > MAX_VALUE_LENGTH: filtered[key] = value[:MAX_VALUE_LENGTH] + "..." else: filtered[key] = value return filtered

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jesusperezdeveloper/mcp_openapi_template'

If you have feedback or need assistance with the MCP directory API, please join our Discord server