Skip to main content
Glama
path_utils.py19.3 kB
"""Path helpers for history and artifact storage defaults.""" from __future__ import annotations import logging import os from pathlib import Path from igloo_mcp.mcp.exceptions import MCPValidationError logger = logging.getLogger(__name__) DEFAULT_HISTORY_PATH = Path("logs/doc.jsonl") DEFAULT_ARTIFACT_ROOT = Path("logs/artifacts") DEFAULT_CACHE_SUBDIR = Path("cache") DEFAULT_REPORTS_SUBDIR = Path("reports") DEFAULT_CATALOG_SUBDIR = Path("catalogs") # Safe root directories for path validation (security boundary) SAFE_ROOTS = [ Path.home().resolve(), # User's home directory (~/.igloo_mcp/) Path.cwd().resolve(), # Current working directory (project) ] def _validate_path_safety(resolved: Path, context: str) -> bool: """Validate that a resolved path is within safe boundaries. This prevents path traversal attacks where malicious environment variables could point to sensitive system files like /etc/passwd. Args: resolved: The resolved absolute path to validate context: Description of what this path is for (e.g., "reports root") Returns: True if path is within safe boundaries, False otherwise """ for safe_root in SAFE_ROOTS: try: resolved.relative_to(safe_root) return True # Path is within a safe root except ValueError: continue # Try next safe root # Path is outside all safe roots - security violation logger.warning( f"SECURITY: Rejected {context} path outside safe boundaries: {resolved}", extra={ "rejected_path": str(resolved), "safe_roots": [str(r) for r in SAFE_ROOTS], "context": context, }, ) return False def _get_log_scope() -> str: """Get log scope from environment (global|repo).""" return os.environ.get("IGLOO_MCP_LOG_SCOPE", "global").lower() def _is_namespaced_logs() -> bool: """Check if namespaced logs are enabled.""" val = os.environ.get("IGLOO_MCP_NAMESPACED_LOGS", "false").lower() return val in ("true", "1", "yes", "on") def get_global_base() -> Path: """Return global base directory (~/.igloo_mcp).""" return Path.home() / ".igloo_mcp" def apply_namespacing(subpath: Path) -> Path: """Apply namespacing if enabled (logs/igloo_mcp/... instead of logs/...). Args: subpath: Relative path to modify (e.g., Path("logs/doc.jsonl")) Returns: Modified path with igloo_mcp inserted if namespacing enabled """ if _is_namespaced_logs(): # Replace logs/ with logs/igloo_mcp/ parts = list(subpath.parts) if parts and parts[0] == "logs": parts.insert(1, "igloo_mcp") subpath = Path(*parts) return subpath def _iter_candidate_roots(start: Path) -> list[Path]: """Return candidate repo roots walking up from *start*.""" if not start.is_absolute(): start = start.resolve() candidates = [start] candidates.extend(start.parents) return candidates def find_repo_root(start: Path | None = None) -> Path: """Best-effort detection of the repository root. Walks upward from *start* (default: current working directory) until a directory containing a ``.git`` entry is found. Falls back to *start* if no explicit marker is detected. """ start_path = start or Path.cwd() for candidate in _iter_candidate_roots(start_path): if (candidate / ".git").exists(): return candidate return start_path def _resolve_with_repo_root(raw: str, repo_root: Path) -> Path: path = Path(raw).expanduser() if path.is_absolute(): return path return (repo_root / path).resolve() def resolve_history_path(raw: str | None = None, *, start: Path | None = None) -> Path: """Return the desired path to the JSONL history file. Precedence: 1. Explicit env path (IGLOO_MCP_QUERY_HISTORY) if raw is None 2. Scope/namespacing defaults if no explicit path 3. Fallback to repo-based defaults Args: raw: Explicit path override (takes highest precedence) start: Starting directory for repo root detection Returns: Resolved path to history file """ # Explicit path takes precedence (back-compat) candidate = raw if raw is not None else os.environ.get("IGLOO_MCP_QUERY_HISTORY") if candidate: repo_root = find_repo_root(start=start) return _resolve_with_repo_root(candidate, repo_root) # Scope-based resolution scope = _get_log_scope() if scope == "global": base = get_global_base() subpath = apply_namespacing(DEFAULT_HISTORY_PATH) return (base / subpath).resolve() # repo scope repo_root = find_repo_root(start=start) subpath = apply_namespacing(DEFAULT_HISTORY_PATH) return (repo_root / subpath).resolve() def resolve_artifact_root(raw: str | None = None, *, start: Path | None = None) -> Path: """Return the root directory for artifacts (queries/results/meta). Precedence: 1. Explicit env path (IGLOO_MCP_ARTIFACT_ROOT) if raw is None 2. Scope/namespacing defaults if no explicit path 3. Fallback to repo-based defaults Args: raw: Explicit path override (takes highest precedence) start: Starting directory for repo root detection Returns: Resolved path to artifact root directory """ # Explicit path takes precedence (back-compat) candidate = raw if raw is not None else os.environ.get("IGLOO_MCP_ARTIFACT_ROOT") if candidate: repo_root = find_repo_root(start=start) return _resolve_with_repo_root(candidate, repo_root) # Scope-based resolution scope = _get_log_scope() if scope == "global": base = get_global_base() subpath = apply_namespacing(DEFAULT_ARTIFACT_ROOT) return (base / subpath).resolve() # repo scope repo_root = find_repo_root(start=start) subpath = apply_namespacing(DEFAULT_ARTIFACT_ROOT) return (repo_root / subpath).resolve() def resolve_cache_root( raw: str | None = None, *, start: Path | None = None, artifact_root: Path | None = None, ) -> Path: """Return the root directory for cached query results. Precedence: 1. Explicit env path (IGLOO_MCP_CACHE_ROOT) if raw is None 2. Derived from artifact_root if provided 3. Scope/namespacing defaults if no explicit path 4. Fallback to repo-based defaults Args: raw: Explicit path override (takes highest precedence) start: Starting directory for repo root detection artifact_root: Optional artifact root (cache is subdirectory) Returns: Resolved path to cache root directory """ # Explicit path takes precedence (back-compat) candidate = raw if raw is not None else os.environ.get("IGLOO_MCP_CACHE_ROOT") if candidate: repo_root = find_repo_root(start=start) return _resolve_with_repo_root(candidate, repo_root) # Derive from artifact_root if provided if artifact_root is not None: return (artifact_root / DEFAULT_CACHE_SUBDIR).resolve() # Scope-based resolution (inherits from artifact_root resolution) artifact_path = resolve_artifact_root(start=start) return (artifact_path / DEFAULT_CACHE_SUBDIR).resolve() def resolve_reports_root( raw: str | None = None, *, start: Path | None = None, ) -> Path: """Return the root directory for living reports. Precedence: 1. Explicit env path (IGLOO_MCP_REPORTS_ROOT) if raw is None 2. Derived from instance-specific history/artifact paths (for bundling) 3. Scope/namespacing defaults if no explicit path 4. Fallback to repo-based defaults Args: raw: Explicit path override (takes highest precedence) start: Starting directory for repo root detection Returns: Resolved path to reports root directory """ # Explicit path takes precedence candidate = raw if raw is not None else os.environ.get("IGLOO_MCP_REPORTS_ROOT") if candidate: repo_root = find_repo_root(start=start) return _resolve_with_repo_root(candidate, repo_root) # Try to derive from instance-specific paths (for bundling with history/artifacts) # Check if history or artifacts paths indicate an instance-specific directory history_path = os.environ.get("IGLOO_MCP_QUERY_HISTORY") artifact_path = os.environ.get("IGLOO_MCP_ARTIFACT_ROOT") for instance_path in [history_path, artifact_path]: if instance_path: try: # Expand and resolve the path expanded = Path(instance_path).expanduser() if expanded.is_absolute() and ("logs" in expanded.parts or "artifacts" in expanded.parts): # Extract the base directory (parent of logs/artifacts) # E.g., ~/.igloo-mcp-experimental/logs/query_history.jsonl -> ~/.igloo-mcp-experimental # Find the base igloo-mcp directory parts = expanded.parts for i, part in enumerate(parts): if part in ["logs", "artifacts"]: base_parts = parts[:i] if base_parts: base_name = base_parts[-1] if base_name.startswith(".igloo-mcp") or base_name.startswith(".igloo_mcp"): base_path = Path(*base_parts) candidate_path = (base_path / DEFAULT_REPORTS_SUBDIR).resolve() # Security: Validate path is within safe boundaries if _validate_path_safety(candidate_path, "reports root"): return candidate_path # Path rejected, continue to fallback except (ValueError, OSError): pass # Invalid path, continue to fallback # Scope-based resolution (fallback) scope = _get_log_scope() if scope == "global": base = get_global_base() return (base / DEFAULT_REPORTS_SUBDIR).resolve() # repo scope repo_root = find_repo_root(start=start) return (repo_root / DEFAULT_REPORTS_SUBDIR).resolve() def resolve_catalog_root( raw: str | None = None, *, start: Path | None = None, ) -> Path: """Return the root directory for catalog storage. Precedence: 1. Explicit env path (IGLOO_MCP_CATALOG_ROOT) if raw is None 2. Derived from instance-specific history/artifact paths (for bundling) 3. Scope/namespacing defaults if no explicit path 4. Fallback to repo-based defaults Args: raw: Explicit path override (takes highest precedence) start: Starting directory for repo root detection Returns: Resolved path to catalog root directory """ # Explicit path takes precedence candidate = raw if raw is not None else os.environ.get("IGLOO_MCP_CATALOG_ROOT") if candidate: repo_root = find_repo_root(start=start) return _resolve_with_repo_root(candidate, repo_root) # Try to derive from instance-specific paths (for bundling with history/artifacts) # Check if history or artifacts paths indicate an instance-specific directory history_path = os.environ.get("IGLOO_MCP_QUERY_HISTORY") artifact_path = os.environ.get("IGLOO_MCP_ARTIFACT_ROOT") for instance_path in [history_path, artifact_path]: if instance_path: try: # Expand and resolve the path for catalog expanded = Path(instance_path).expanduser() if expanded.is_absolute() and ("logs" in expanded.parts or "artifacts" in expanded.parts): # Extract the base directory (parent of logs/artifacts) # Find the base igloo-mcp directory parts = expanded.parts for i, part in enumerate(parts): if part in ["logs", "artifacts"]: base_parts = parts[:i] if base_parts: base_name = base_parts[-1] if base_name.startswith(".igloo-mcp") or base_name.startswith(".igloo_mcp"): base_path = Path(*base_parts) candidate_path = (base_path / DEFAULT_CATALOG_SUBDIR).resolve() # Security: Validate path is within safe boundaries if _validate_path_safety(candidate_path, "catalog root"): return candidate_path # Path rejected, continue to fallback except (ValueError, OSError): pass # Invalid path, continue to fallback # Scope-based resolution (fallback) scope = _get_log_scope() if scope == "global": base = get_global_base() return (base / DEFAULT_CATALOG_SUBDIR).resolve() # repo scope repo_root = find_repo_root(start=start) return (repo_root / DEFAULT_CATALOG_SUBDIR).resolve() def resolve_catalog_path( database: str | None = None, account_scope: bool = False, *, catalog_root: Path | None = None, start: Path | None = None, ) -> Path: """Return the catalog directory path for a specific database or account. Args: database: Database name (None for current database) account_scope: Whether this is an account-wide catalog catalog_root: Optional catalog root directory (defaults to resolved root) start: Starting directory for repo root detection Returns: Resolved path to catalog directory: - ~/.igloo_mcp/catalogs/account/ for account-wide catalogs - ~/.igloo_mcp/catalogs/{database}/ for database-specific catalogs - ~/.igloo_mcp/catalogs/current/ for current database (when database is None) """ if catalog_root is None: catalog_root = resolve_catalog_root(start=start) if account_scope: # Account-wide catalogs go to catalogs/account/ return (catalog_root / "account").resolve() if database: # Database-specific catalogs go to catalogs/{database}/ # Sanitize database name for filesystem safety safe_db_name = database.replace("/", "_").replace("\\", "_") return (catalog_root / safe_db_name).resolve() # Current database (unknown) goes to catalogs/current/ return (catalog_root / "current").resolve() def validate_safe_path( path: str | Path, *, reject_parent_dirs: bool = True, reject_absolute: bool = False, base_dir: Path | None = None, ) -> Path: """Validate and sanitize a file path to prevent path traversal attacks. This function ensures that: - Paths do not contain parent directory references (..) when reject_parent_dirs=True - Paths are relative when reject_absolute=True - Paths stay within base_dir when provided - Paths do not contain null bytes or other dangerous characters Args: path: Path string or Path object to validate reject_parent_dirs: If True, reject paths containing '..' (default: True) reject_absolute: If True, reject absolute paths (default: False) base_dir: Optional base directory to restrict paths within Returns: Validated and resolved Path object Raises: MCPValidationError: If path is unsafe or invalid Examples: >>> validate_safe_path("./data_catalogue") Path('data_catalogue') >>> validate_safe_path("../etc/passwd", reject_parent_dirs=True) MCPValidationError: Path contains parent directory references >>> validate_safe_path("/etc/passwd", reject_absolute=True) MCPValidationError: Absolute paths are not allowed """ # Convert to Path object path_obj = Path(path) if isinstance(path, str) else path # Check for null bytes (path traversal indicator) path_str = str(path_obj) if "\x00" in path_str: raise MCPValidationError( "Path contains null bytes which are not allowed", validation_errors=[f"Invalid path: {path_str}"], hints=["Remove null bytes from the path"], ) # Expand user directory (~) try: path_obj = path_obj.expanduser() except (ValueError, RuntimeError) as e: raise MCPValidationError( f"Invalid path format: {e!s}", validation_errors=[f"Path expansion failed: {path_str}"], hints=["Check that the path is properly formatted"], ) from e # Reject absolute paths if requested if reject_absolute and path_obj.is_absolute(): raise MCPValidationError( "Absolute paths are not allowed", validation_errors=[f"Path is absolute: {path_str}"], hints=["Use a relative path instead"], ) # Check for parent directory references if reject_parent_dirs: # Check string representation for '..' if ".." in path_str: raise MCPValidationError( "Path contains parent directory references (..) which are not allowed", validation_errors=[f"Path contains '..': {path_str}"], hints=[ "Remove '..' from the path", "Use a relative path within the current directory", ], ) # Also check resolved path to catch symlink-based traversal try: resolved = path_obj.resolve() # Check if resolved path goes outside base_dir if base_dir is not None: base_resolved = base_dir.resolve() try: resolved.relative_to(base_resolved) except ValueError: raise MCPValidationError( "Path resolves outside the allowed base directory", validation_errors=[ f"Path: {path_str}", f"Resolved to: {resolved}", f"Base directory: {base_resolved}", ], hints=["Ensure the path stays within the base directory"], ) from None except (OSError, ValueError) as e: # Path might not exist yet, which is okay for validation # But if it's a clear traversal attempt, we should catch it if ".." in str(e).lower() or "parent" in str(e).lower(): raise MCPValidationError( f"Path validation failed: {e!s}", validation_errors=[f"Path: {path_str}"], hints=["Check that the path is valid and does not contain '..'"], ) from e # Normalize the path (remove redundant separators, etc.) try: normalized = path_obj.resolve() if path_obj.is_absolute() else path_obj return normalized except (OSError, ValueError): # If path doesn't exist, return the normalized path object anyway # (it will be created later) return path_obj

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server