utils.py•16.1 kB
"""
Utility functions for the MCP debug tool.
Includes:
- Safe path resolution and validation
- Execution limits and constants
- Logging helpers
"""
import logging
import time
from pathlib import Path
from typing import Any
# Execution guard constants
DEFAULT_TIMEOUT_SECONDS = 20
MAX_OUTPUT_BYTES = 32 * 1024 # 32KB
MAX_DEPTH = 2
MAX_CONTAINER_ITEMS = 50
MAX_REPR_LENGTH = 256
MAX_ARGS_COUNT = 20
MAX_ARG_LENGTH = 512
MAX_ENV_ENTRIES = 50
MAX_ENV_KEY_LENGTH = 64
MAX_ENV_VALUE_LENGTH = 1024
def resolve_workspace_path(workspace_root: Path | str, relative_path: str) -> Path:
"""
Resolve a project-relative path safely within the workspace.
Args:
workspace_root: Absolute path to workspace root (Path or str)
relative_path: Project-relative path string or absolute path
Returns:
Absolute resolved path
Raises:
ValueError: If path escapes workspace or doesn't exist
Note:
If relative_path is an absolute path, it will be used directly
and the workspace root will be updated to the nearest parent directory
containing a Python project (with .git, pyproject.toml, etc.)
"""
# Convert to Path if string
workspace_root = Path(workspace_root) if isinstance(workspace_root, str) else workspace_root
if not workspace_root.is_absolute():
raise ValueError(f"Workspace root must be absolute: {workspace_root}")
# Resolve both paths to handle symlinks consistently
workspace_root = workspace_root.resolve()
# Check if relative_path is actually an absolute path
path_obj = Path(relative_path)
if path_obj.is_absolute():
# Use absolute path directly
resolved = path_obj.resolve()
# No workspace validation for absolute paths
return resolved
# Resolve the path relative to workspace
resolved = (workspace_root / relative_path).resolve()
# Ensure it's within workspace
try:
resolved.relative_to(workspace_root)
except ValueError as e:
raise ValueError(
f"Path {relative_path} escapes workspace {workspace_root}"
) from e
return resolved
def get_executable_lines(file_path: Path) -> set[int]:
"""
Get set of line numbers that contain executable code.
Uses the dis module to find lines with bytecode instructions.
This excludes blank lines, comments, and non-executable statements.
Args:
file_path: Absolute path to Python file
Returns:
Set of 1-based line numbers with executable code
"""
import dis
try:
with open(file_path, encoding='utf-8', errors='ignore') as f:
source = f.read()
# Compile the source code
code = compile(source, str(file_path), 'exec')
# Get line numbers from bytecode
executable_lines = set()
def extract_lines(code_obj):
"""Recursively extract line numbers from code object."""
# Get line numbers from this code object
try:
for _, lineno in dis.findlinestarts(code_obj):
if lineno:
executable_lines.add(lineno)
except Exception:
pass
# Recursively process nested code objects (functions, classes)
for const in code_obj.co_consts:
if hasattr(const, 'co_code'):
extract_lines(const)
extract_lines(code)
return executable_lines
except SyntaxError:
# If file has syntax errors, return empty set
return set()
except Exception:
# For any other error, return empty set
return set()
def validate_file_and_line(file_path: Path, line: int, check_executable: bool = True) -> None:
"""
Validate that a file exists and line number is within range.
Optionally checks if the line contains executable code.
Args:
file_path: Absolute path to Python file
line: 1-based line number
check_executable: If True, verify line contains executable code
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If line is out of range or not executable
"""
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
if not file_path.is_file():
raise ValueError(f"Not a file: {file_path}")
if line < 1:
raise ValueError(f"Line number must be >= 1, got {line}")
# Count lines in file
with open(file_path, encoding='utf-8', errors='ignore') as f:
line_count = sum(1 for _ in f)
if line > line_count:
raise ValueError(f"Line {line} exceeds file length ({line_count} lines)")
# Check if line is executable
if check_executable:
executable_lines = get_executable_lines(file_path)
if executable_lines and line not in executable_lines:
# Find nearest executable line for helpful error message
nearest = find_nearest_executable_line(line, executable_lines)
if nearest:
raise ValueError(
f"Line {line} does not contain executable code. "
f"Nearest executable line: {nearest}"
)
else:
raise ValueError(f"Line {line} does not contain executable code.")
def find_nearest_executable_line(target: int, executable_lines: set[int]) -> int | None:
"""
Find the nearest executable line to a target line.
Args:
target: Target line number
executable_lines: Set of executable line numbers
Returns:
Nearest executable line number, or None if no executable lines
"""
if not executable_lines:
return None
# Find the closest line
closest = min(executable_lines, key=lambda x: abs(x - target))
return closest
def truncate_string(s: str, max_length: int = MAX_REPR_LENGTH) -> tuple[str, bool]:
"""
Truncate a string if it exceeds max length.
Args:
s: String to truncate
max_length: Maximum allowed length
Returns:
Tuple of (truncated_string, was_truncated)
"""
if len(s) <= max_length:
return s, False
return s[:max_length] + "...", True
def safe_repr(obj: Any, depth: int = 0, max_depth: int = MAX_DEPTH) -> tuple[str, bool]:
"""
Get safe string representation of object with depth and length limits.
CRITICAL: This function is import-safe and will NOT trigger any imports
or __repr__ methods that might cause import side effects.
Args:
obj: Object to represent
depth: Current recursion depth
max_depth: Maximum recursion depth
Returns:
Tuple of (repr_string, was_truncated)
"""
if depth >= max_depth:
return f"<max depth reached: {type(obj).__name__}>", True
try:
obj_type = type(obj)
obj_type_name = obj_type.__name__
# Handle None first
if obj is None:
return "None", False
# Handle simple built-in types that are safe to repr
if isinstance(obj, bool):
return "True" if obj else "False", False
if isinstance(obj, int):
s = str(obj)
if len(s) > MAX_REPR_LENGTH:
return s[:MAX_REPR_LENGTH] + "...", True
return s, False
if isinstance(obj, float):
s = str(obj)
if len(s) > MAX_REPR_LENGTH:
return s[:MAX_REPR_LENGTH] + "...", True
return s, False
if isinstance(obj, str):
# Safely repr string without calling custom __repr__
if len(obj) <= MAX_REPR_LENGTH:
return repr(obj), False
else:
truncated = obj[:MAX_REPR_LENGTH]
return repr(truncated) + "...", True
if isinstance(obj, bytes):
if len(obj) <= MAX_REPR_LENGTH // 2:
return repr(obj), False
else:
truncated = obj[:MAX_REPR_LENGTH // 2]
return repr(truncated) + b"...".decode(), True
# Special handling for Path objects (common in debugging)
from pathlib import Path
if isinstance(obj, Path):
import os
try:
path_str = os.fspath(obj)
if len(path_str) > MAX_REPR_LENGTH:
return f"Path('{path_str[:MAX_REPR_LENGTH]}...')", True
return f"Path('{path_str}')", False
except Exception:
try:
if hasattr(obj, 'parts') and obj.parts:
path_str = '/'.join(obj.parts)
return f"Path('{path_str[:MAX_REPR_LENGTH]}')", True
except Exception:
pass
return f"<Path object at {hex(id(obj))}>", True
# For collections, show type and length only (don't iterate or call __repr__)
if isinstance(obj, (list, tuple)):
try:
length = len(obj)
container_type = "list" if isinstance(obj, list) else "tuple"
if length == 0:
return "[]" if isinstance(obj, list) else "()", False
return f"<{container_type} with {length} items>", False
except Exception:
return f"<{obj_type_name} at {hex(id(obj))}>", True
if isinstance(obj, dict):
try:
length = len(obj)
if length == 0:
return "{}", False
return f"<dict with {length} items>", False
except Exception:
return f"<dict at {hex(id(obj))}>", True
if isinstance(obj, set):
try:
length = len(obj)
if length == 0:
return "set()", False
return f"<set with {length} items>", False
except Exception:
return f"<set at {hex(id(obj))}>", True
# For modules, show module name and file (safe, no imports)
if obj_type_name == 'module':
try:
mod_name = getattr(obj, '__name__', '<unknown>')
mod_file = getattr(obj, '__file__', None)
if mod_file:
return f"<module '{mod_name}' from '{mod_file}'>", False
return f"<module '{mod_name}'>", False
except Exception:
return f"<module at {hex(id(obj))}>", True
# For classes, show the class name
if obj_type_name == 'type':
try:
class_name = obj.__name__
class_module = getattr(obj, '__module__', None)
if class_module and class_module != 'builtins':
return f"<class '{class_module}.{class_name}'>", False
return f"<class '{class_name}'>", False
except Exception:
return f"<class at {hex(id(obj))}>", True
# For functions and methods
if obj_type_name in ('function', 'method', 'builtin_function_or_method'):
try:
func_name = getattr(obj, '__name__', '<lambda>')
func_module = getattr(obj, '__module__', None)
if func_module:
return f"<{obj_type_name} {func_module}.{func_name}>", False
return f"<{obj_type_name} {func_name}>", False
except Exception:
return f"<{obj_type_name} at {hex(id(obj))}>", True
# For all other objects, use type name and id only
# DO NOT call repr() or any __repr__ method as it might trigger imports
try:
obj_module = getattr(obj_type, '__module__', None)
if obj_module and obj_module != 'builtins':
return f"<{obj_module}.{obj_type_name} object at {hex(id(obj))}>", False
return f"<{obj_type_name} object at {hex(id(obj))}>", False
except Exception:
return f"<object at {hex(id(obj))}>", True
except Exception as e:
# Absolute fallback - should never happen
try:
return f"<repr failed: {type(e).__name__}>", True
except:
return "<repr failed>", True
# Logging configuration
def setup_logger(name: str, level: int = logging.INFO) -> logging.Logger:
"""
Set up logger for the module.
Args:
name: Logger name
level: Logging level
Returns:
Configured logger
"""
logger = logging.getLogger(name)
logger.setLevel(level)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def log_request(logger: logging.Logger, method: str, params: dict) -> None:
"""
Log incoming request.
Args:
logger: Logger instance
method: Method name
params: Request parameters
"""
logger.info(f"Request: {method} with params: {params}")
def log_response(logger: logging.Logger, method: str, duration_ms: float, success: bool) -> None:
"""
Log response with timing.
Args:
logger: Logger instance
method: Method name
duration_ms: Duration in milliseconds
success: Whether request succeeded
"""
status = "success" if success else "error"
logger.info(f"Response: {method} completed in {duration_ms:.2f}ms - {status}")
class Timer:
"""Context manager for timing operations."""
def __init__(self):
self.start_time = 0.0
self.elapsed_ms = 0.0
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, *args):
self.elapsed_ms = (time.time() - self.start_time) * 1000
def format_traceback(exc_type, exc_value, exc_tb, max_frames: int = 10, max_length: int = 2048) -> str:
"""
Format exception traceback with frame and length limits.
Args:
exc_type: Exception type
exc_value: Exception value
exc_tb: Traceback object
max_frames: Maximum number of frames to include
max_length: Maximum total length of traceback string
Returns:
Formatted and truncated traceback string
"""
import traceback
# Format the traceback
lines = traceback.format_exception(exc_type, exc_value, exc_tb, limit=max_frames)
full_traceback = "".join(lines)
# Truncate if too long
if len(full_traceback) > max_length:
return full_traceback[:max_length] + "\n... (truncated)"
return full_traceback
def find_python_interpreter(workspace_root: Path | str) -> str | None:
"""
Find the Python interpreter for the target workspace.
Searches for virtual environments in common locations and returns
the path to the Python executable if found.
Args:
workspace_root: Root directory of the workspace (Path or str)
Returns:
Absolute path to Python interpreter, or None if no venv found
Priority order:
1. .venv/bin/python (or .venv/Scripts/python.exe on Windows)
2. venv/bin/python
3. .env/bin/python
"""
import platform
# Convert to Path if string
workspace_root = Path(workspace_root) if isinstance(workspace_root, str) else workspace_root
is_windows = platform.system() == "Windows"
python_name = "python.exe" if is_windows else "python"
bin_dir = "Scripts" if is_windows else "bin"
# Common virtual environment locations
venv_paths = [
workspace_root / ".venv",
workspace_root / "venv",
workspace_root / ".env",
]
for venv_path in venv_paths:
if venv_path.exists() and venv_path.is_dir():
python_exe = venv_path / bin_dir / python_name
if python_exe.exists():
return str(python_exe)
# No virtual environment found
return None