Skip to main content
Glama
utils.py16.1 kB
""" Utility functions for the MCP debug tool. Includes: - Safe path resolution and validation - Execution limits and constants - Logging helpers """ import logging import time from pathlib import Path from typing import Any # Execution guard constants DEFAULT_TIMEOUT_SECONDS = 20 MAX_OUTPUT_BYTES = 32 * 1024 # 32KB MAX_DEPTH = 2 MAX_CONTAINER_ITEMS = 50 MAX_REPR_LENGTH = 256 MAX_ARGS_COUNT = 20 MAX_ARG_LENGTH = 512 MAX_ENV_ENTRIES = 50 MAX_ENV_KEY_LENGTH = 64 MAX_ENV_VALUE_LENGTH = 1024 def resolve_workspace_path(workspace_root: Path | str, relative_path: str) -> Path: """ Resolve a project-relative path safely within the workspace. Args: workspace_root: Absolute path to workspace root (Path or str) relative_path: Project-relative path string or absolute path Returns: Absolute resolved path Raises: ValueError: If path escapes workspace or doesn't exist Note: If relative_path is an absolute path, it will be used directly and the workspace root will be updated to the nearest parent directory containing a Python project (with .git, pyproject.toml, etc.) """ # Convert to Path if string workspace_root = Path(workspace_root) if isinstance(workspace_root, str) else workspace_root if not workspace_root.is_absolute(): raise ValueError(f"Workspace root must be absolute: {workspace_root}") # Resolve both paths to handle symlinks consistently workspace_root = workspace_root.resolve() # Check if relative_path is actually an absolute path path_obj = Path(relative_path) if path_obj.is_absolute(): # Use absolute path directly resolved = path_obj.resolve() # No workspace validation for absolute paths return resolved # Resolve the path relative to workspace resolved = (workspace_root / relative_path).resolve() # Ensure it's within workspace try: resolved.relative_to(workspace_root) except ValueError as e: raise ValueError( f"Path {relative_path} escapes workspace {workspace_root}" ) from e return resolved def get_executable_lines(file_path: Path) -> set[int]: """ Get set of line numbers that contain executable code. Uses the dis module to find lines with bytecode instructions. This excludes blank lines, comments, and non-executable statements. Args: file_path: Absolute path to Python file Returns: Set of 1-based line numbers with executable code """ import dis try: with open(file_path, encoding='utf-8', errors='ignore') as f: source = f.read() # Compile the source code code = compile(source, str(file_path), 'exec') # Get line numbers from bytecode executable_lines = set() def extract_lines(code_obj): """Recursively extract line numbers from code object.""" # Get line numbers from this code object try: for _, lineno in dis.findlinestarts(code_obj): if lineno: executable_lines.add(lineno) except Exception: pass # Recursively process nested code objects (functions, classes) for const in code_obj.co_consts: if hasattr(const, 'co_code'): extract_lines(const) extract_lines(code) return executable_lines except SyntaxError: # If file has syntax errors, return empty set return set() except Exception: # For any other error, return empty set return set() def validate_file_and_line(file_path: Path, line: int, check_executable: bool = True) -> None: """ Validate that a file exists and line number is within range. Optionally checks if the line contains executable code. Args: file_path: Absolute path to Python file line: 1-based line number check_executable: If True, verify line contains executable code Raises: FileNotFoundError: If file doesn't exist ValueError: If line is out of range or not executable """ if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") if not file_path.is_file(): raise ValueError(f"Not a file: {file_path}") if line < 1: raise ValueError(f"Line number must be >= 1, got {line}") # Count lines in file with open(file_path, encoding='utf-8', errors='ignore') as f: line_count = sum(1 for _ in f) if line > line_count: raise ValueError(f"Line {line} exceeds file length ({line_count} lines)") # Check if line is executable if check_executable: executable_lines = get_executable_lines(file_path) if executable_lines and line not in executable_lines: # Find nearest executable line for helpful error message nearest = find_nearest_executable_line(line, executable_lines) if nearest: raise ValueError( f"Line {line} does not contain executable code. " f"Nearest executable line: {nearest}" ) else: raise ValueError(f"Line {line} does not contain executable code.") def find_nearest_executable_line(target: int, executable_lines: set[int]) -> int | None: """ Find the nearest executable line to a target line. Args: target: Target line number executable_lines: Set of executable line numbers Returns: Nearest executable line number, or None if no executable lines """ if not executable_lines: return None # Find the closest line closest = min(executable_lines, key=lambda x: abs(x - target)) return closest def truncate_string(s: str, max_length: int = MAX_REPR_LENGTH) -> tuple[str, bool]: """ Truncate a string if it exceeds max length. Args: s: String to truncate max_length: Maximum allowed length Returns: Tuple of (truncated_string, was_truncated) """ if len(s) <= max_length: return s, False return s[:max_length] + "...", True def safe_repr(obj: Any, depth: int = 0, max_depth: int = MAX_DEPTH) -> tuple[str, bool]: """ Get safe string representation of object with depth and length limits. CRITICAL: This function is import-safe and will NOT trigger any imports or __repr__ methods that might cause import side effects. Args: obj: Object to represent depth: Current recursion depth max_depth: Maximum recursion depth Returns: Tuple of (repr_string, was_truncated) """ if depth >= max_depth: return f"<max depth reached: {type(obj).__name__}>", True try: obj_type = type(obj) obj_type_name = obj_type.__name__ # Handle None first if obj is None: return "None", False # Handle simple built-in types that are safe to repr if isinstance(obj, bool): return "True" if obj else "False", False if isinstance(obj, int): s = str(obj) if len(s) > MAX_REPR_LENGTH: return s[:MAX_REPR_LENGTH] + "...", True return s, False if isinstance(obj, float): s = str(obj) if len(s) > MAX_REPR_LENGTH: return s[:MAX_REPR_LENGTH] + "...", True return s, False if isinstance(obj, str): # Safely repr string without calling custom __repr__ if len(obj) <= MAX_REPR_LENGTH: return repr(obj), False else: truncated = obj[:MAX_REPR_LENGTH] return repr(truncated) + "...", True if isinstance(obj, bytes): if len(obj) <= MAX_REPR_LENGTH // 2: return repr(obj), False else: truncated = obj[:MAX_REPR_LENGTH // 2] return repr(truncated) + b"...".decode(), True # Special handling for Path objects (common in debugging) from pathlib import Path if isinstance(obj, Path): import os try: path_str = os.fspath(obj) if len(path_str) > MAX_REPR_LENGTH: return f"Path('{path_str[:MAX_REPR_LENGTH]}...')", True return f"Path('{path_str}')", False except Exception: try: if hasattr(obj, 'parts') and obj.parts: path_str = '/'.join(obj.parts) return f"Path('{path_str[:MAX_REPR_LENGTH]}')", True except Exception: pass return f"<Path object at {hex(id(obj))}>", True # For collections, show type and length only (don't iterate or call __repr__) if isinstance(obj, (list, tuple)): try: length = len(obj) container_type = "list" if isinstance(obj, list) else "tuple" if length == 0: return "[]" if isinstance(obj, list) else "()", False return f"<{container_type} with {length} items>", False except Exception: return f"<{obj_type_name} at {hex(id(obj))}>", True if isinstance(obj, dict): try: length = len(obj) if length == 0: return "{}", False return f"<dict with {length} items>", False except Exception: return f"<dict at {hex(id(obj))}>", True if isinstance(obj, set): try: length = len(obj) if length == 0: return "set()", False return f"<set with {length} items>", False except Exception: return f"<set at {hex(id(obj))}>", True # For modules, show module name and file (safe, no imports) if obj_type_name == 'module': try: mod_name = getattr(obj, '__name__', '<unknown>') mod_file = getattr(obj, '__file__', None) if mod_file: return f"<module '{mod_name}' from '{mod_file}'>", False return f"<module '{mod_name}'>", False except Exception: return f"<module at {hex(id(obj))}>", True # For classes, show the class name if obj_type_name == 'type': try: class_name = obj.__name__ class_module = getattr(obj, '__module__', None) if class_module and class_module != 'builtins': return f"<class '{class_module}.{class_name}'>", False return f"<class '{class_name}'>", False except Exception: return f"<class at {hex(id(obj))}>", True # For functions and methods if obj_type_name in ('function', 'method', 'builtin_function_or_method'): try: func_name = getattr(obj, '__name__', '<lambda>') func_module = getattr(obj, '__module__', None) if func_module: return f"<{obj_type_name} {func_module}.{func_name}>", False return f"<{obj_type_name} {func_name}>", False except Exception: return f"<{obj_type_name} at {hex(id(obj))}>", True # For all other objects, use type name and id only # DO NOT call repr() or any __repr__ method as it might trigger imports try: obj_module = getattr(obj_type, '__module__', None) if obj_module and obj_module != 'builtins': return f"<{obj_module}.{obj_type_name} object at {hex(id(obj))}>", False return f"<{obj_type_name} object at {hex(id(obj))}>", False except Exception: return f"<object at {hex(id(obj))}>", True except Exception as e: # Absolute fallback - should never happen try: return f"<repr failed: {type(e).__name__}>", True except: return "<repr failed>", True # Logging configuration def setup_logger(name: str, level: int = logging.INFO) -> logging.Logger: """ Set up logger for the module. Args: name: Logger name level: Logging level Returns: Configured logger """ logger = logging.getLogger(name) logger.setLevel(level) if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) handler.setFormatter(formatter) logger.addHandler(handler) return logger def log_request(logger: logging.Logger, method: str, params: dict) -> None: """ Log incoming request. Args: logger: Logger instance method: Method name params: Request parameters """ logger.info(f"Request: {method} with params: {params}") def log_response(logger: logging.Logger, method: str, duration_ms: float, success: bool) -> None: """ Log response with timing. Args: logger: Logger instance method: Method name duration_ms: Duration in milliseconds success: Whether request succeeded """ status = "success" if success else "error" logger.info(f"Response: {method} completed in {duration_ms:.2f}ms - {status}") class Timer: """Context manager for timing operations.""" def __init__(self): self.start_time = 0.0 self.elapsed_ms = 0.0 def __enter__(self): self.start_time = time.time() return self def __exit__(self, *args): self.elapsed_ms = (time.time() - self.start_time) * 1000 def format_traceback(exc_type, exc_value, exc_tb, max_frames: int = 10, max_length: int = 2048) -> str: """ Format exception traceback with frame and length limits. Args: exc_type: Exception type exc_value: Exception value exc_tb: Traceback object max_frames: Maximum number of frames to include max_length: Maximum total length of traceback string Returns: Formatted and truncated traceback string """ import traceback # Format the traceback lines = traceback.format_exception(exc_type, exc_value, exc_tb, limit=max_frames) full_traceback = "".join(lines) # Truncate if too long if len(full_traceback) > max_length: return full_traceback[:max_length] + "\n... (truncated)" return full_traceback def find_python_interpreter(workspace_root: Path | str) -> str | None: """ Find the Python interpreter for the target workspace. Searches for virtual environments in common locations and returns the path to the Python executable if found. Args: workspace_root: Root directory of the workspace (Path or str) Returns: Absolute path to Python interpreter, or None if no venv found Priority order: 1. .venv/bin/python (or .venv/Scripts/python.exe on Windows) 2. venv/bin/python 3. .env/bin/python """ import platform # Convert to Path if string workspace_root = Path(workspace_root) if isinstance(workspace_root, str) else workspace_root is_windows = platform.system() == "Windows" python_name = "python.exe" if is_windows else "python" bin_dir = "Scripts" if is_windows else "bin" # Common virtual environment locations venv_paths = [ workspace_root / ".venv", workspace_root / "venv", workspace_root / ".env", ] for venv_path in venv_paths: if venv_path.exists() and venv_path.is_dir(): python_exe = venv_path / bin_dir / python_name if python_exe.exists(): return str(python_exe) # No virtual environment found return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kaina3/Debug-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server