Skip to main content
Glama
open_recent.py8.93 kB
"""open_recent tool implementation.""" from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional import fnmatch import logging import os import time from ..config import WorkspacesConfig, ensure_tool_allowed from ..security import normalize_workspace_path, path_in_workspace LOGGER = logging.getLogger(__name__) TOOL_NAME = "open_recent" @dataclass class OpenRecentRequest: workspace_id: str rel_path: Optional[str] = None count: Optional[int] = None extensions: List[str] = field(default_factory=list) include_globs: List[str] = field(default_factory=list) exclude_globs: List[str] = field(default_factory=list) since: Optional[str] = None follow_symlinks: bool = False @classmethod def from_dict(cls, data: Dict[str, object]) -> "OpenRecentRequest": if "workspace_id" not in data: raise ValueError("workspace_id is required") def _list(name: str) -> List[str]: value = data.get(name) if value is None: return [] if isinstance(value, list): return [str(item) for item in value] raise ValueError(f"{name} must be a list of strings") count = data.get("count") if count is not None and (not isinstance(count, int) or count <= 0): raise ValueError("count must be a positive integer") since = data.get("since") if since is not None and not isinstance(since, str): raise ValueError("since must be an ISO8601 string") return cls( workspace_id=str(data["workspace_id"]), rel_path=str(data.get("rel_path")) if data.get("rel_path") is not None else None, count=count if isinstance(count, int) else None, extensions=_list("extensions"), include_globs=_list("include_globs"), exclude_globs=_list("exclude_globs"), since=since if isinstance(since, str) else None, follow_symlinks=bool(data.get("follow_symlinks", False)), ) @dataclass class RecentFile: path: str abs_path: str mtime: str bytes: int @dataclass class OpenRecentData: files: List[RecentFile] = field(default_factory=list) total_scanned: int = 0 def to_dict(self) -> Dict[str, object]: return { "files": [file.__dict__ for file in self.files], "total_scanned": self.total_scanned, } @dataclass class OpenRecentResponse: ok: bool data: OpenRecentData warnings: List[str] = field(default_factory=list) metrics: Dict[str, int] = field(default_factory=dict) error: Optional[Dict[str, object]] = None def to_dict(self) -> Dict[str, object]: payload = { "ok": self.ok, "data": self.data.to_dict(), "warnings": self.warnings, "metrics": self.metrics, } if self.error is not None: payload["error"] = self.error return payload def _normalize_patterns(patterns: List[str]) -> List[str]: return [pattern for pattern in patterns if pattern] def _matches_any(patterns: List[str], path: str) -> bool: if not patterns: return False candidates = [path, f"./{path}"] if not path.endswith("/"): candidates.extend([f"{path}/", f"./{path}/"]) for pattern in patterns: for candidate in candidates: if fnmatch.fnmatch(candidate, pattern): return True return False def _parse_since(value: Optional[str]) -> Optional[datetime]: if not value: return None sanitized = value.rstrip() if sanitized.endswith("Z"): sanitized = sanitized[:-1] + "+00:00" try: parsed = datetime.fromisoformat(sanitized) except ValueError as exc: raise ValueError(f"Invalid ISO8601 timestamp: {value}") from exc if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return parsed.astimezone(timezone.utc) def execute(request: OpenRecentRequest, config: WorkspacesConfig) -> OpenRecentResponse: start = time.perf_counter() data = OpenRecentData() try: workspace = config.get_workspace(request.workspace_id) except KeyError as exc: elapsed = int((time.perf_counter() - start) * 1000) return OpenRecentResponse( ok=False, data=data, warnings=[], metrics={"elapsed_ms": elapsed}, error={"type": "workspace_not_found", "message": str(exc)}, ) try: ensure_tool_allowed(workspace, TOOL_NAME) except PermissionError as exc: elapsed = int((time.perf_counter() - start) * 1000) return OpenRecentResponse( ok=False, data=data, warnings=["Tool is not allowed for this workspace"], metrics={"elapsed_ms": elapsed}, error={"type": "tool_not_allowed", "message": str(exc)}, ) validation = normalize_workspace_path(workspace.path, Path(request.rel_path) if request.rel_path else None) if not validation.ok or validation.path is None: elapsed = int((time.perf_counter() - start) * 1000) reason = validation.reason or "Invalid path" return OpenRecentResponse( ok=False, data=data, warnings=[reason], metrics={"elapsed_ms": elapsed}, error={"type": "path_error", "message": reason}, ) base_path = validation.path include_patterns = _normalize_patterns(request.include_globs) exclude_patterns = _normalize_patterns(list(workspace.excludes) + request.exclude_globs) extensions = {ext.lower() for ext in request.extensions if ext} try: since_dt = _parse_since(request.since) except ValueError as exc: elapsed = int((time.perf_counter() - start) * 1000) return OpenRecentResponse( ok=False, data=data, warnings=[], metrics={"elapsed_ms": elapsed}, error={"type": "invalid_request", "message": str(exc)}, ) limit = request.count or getattr(config.limits, "recent_files_count", 50) files: List[RecentFile] = [] total_scanned = 0 fs_walk_count = 0 for root, dirs, filenames in os.walk(base_path, followlinks=request.follow_symlinks): fs_walk_count += 1 root_path = Path(root) # Filter directories in-place using exclude patterns workspace_root = workspace.path.resolve() for idx in range(len(dirs) - 1, -1, -1): dir_name = dirs[idx] abs_dir = root_path / dir_name try: rel_dir = abs_dir.resolve().relative_to(workspace_root).as_posix() except ValueError: rel_dir = os.path.relpath(abs_dir, workspace_root) if _matches_any(exclude_patterns, rel_dir): dirs.pop(idx) for name in filenames: abs_file = root_path / name try: rel_path = abs_file.resolve().relative_to(workspace_root).as_posix() except ValueError: rel_path = os.path.relpath(abs_file, workspace_root) total_scanned += 1 if _matches_any(exclude_patterns, rel_path): continue if include_patterns and not _matches_any(include_patterns, rel_path): continue if extensions and Path(name).suffix.lower() not in extensions: continue if not path_in_workspace(workspace.path, abs_file, follow_symlinks=request.follow_symlinks): continue try: stat_result = abs_file.stat() except OSError: continue mtime = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc) if since_dt and mtime < since_dt: continue files.append( RecentFile( path=rel_path, abs_path=str(abs_file), mtime=mtime.isoformat(), bytes=int(stat_result.st_size), ) ) files.sort(key=lambda item: item.mtime, reverse=True) if limit and len(files) > limit: files = files[:limit] data.files = files data.total_scanned = total_scanned elapsed = int((time.perf_counter() - start) * 1000) metrics = {"elapsed_ms": elapsed, "fs_walk_count": fs_walk_count} return OpenRecentResponse( ok=True, data=data, warnings=[], metrics=metrics, ) def execute_from_cli(args: Dict[str, object], config: WorkspacesConfig) -> OpenRecentResponse: request = OpenRecentRequest.from_dict(args) return execute(request, config)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/andrey-zhuravl/mcp-desktop-tools'

If you have feedback or need assistance with the MCP directory API, please join our Discord server