"""Development tools for MCP Python REPL server."""
import logging
import subprocess
import time
import json
import re
from pathlib import Path
from typing import Dict, Any, Optional, List
from .models import FormatResult, LintResult, TestResult
logger = logging.getLogger(__name__)
class DevTools:
"""Handles development tools like formatting, linting, and testing."""
def __init__(self, project_dir: Path | None = None):
"""Initialize DevTools with project directory.
Args:
project_dir: Project directory path. If None, uses current working directory.
"""
self.project_dir = (project_dir or Path.cwd()).resolve()
def _run_command(self, cmd: List[str], input_text: Optional[str] = None) -> Dict[str, Any]:
"""Run a command and return result.
Args:
cmd: Command and arguments as list
input_text: Optional input text to pass to command
Returns:
Dictionary with command result
"""
try:
start_time = time.time()
# subprocess.run is synchronous - do NOT await it
result = subprocess.run(
cmd,
cwd=self.project_dir,
capture_output=True,
text=True,
input=input_text
)
execution_time = time.time() - start_time
return {
"command": " ".join(cmd),
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode,
"success": result.returncode == 0,
"execution_time": execution_time
}
except Exception as e:
logger.error(f"Command execution error: {e}")
return {
"command": " ".join(cmd),
"stdout": "",
"stderr": str(e),
"returncode": -1,
"success": False,
"execution_time": 0.0,
"error": str(e)
}
def _is_tool_available(self, tool: str) -> bool:
"""Check if a development tool is available.
Args:
tool: Tool name (e.g., 'black', 'ruff', 'pytest')
Returns:
True if tool is available, False otherwise
"""
try:
result = subprocess.run(
[tool, "--version"],
capture_output=True,
text=True,
cwd=self.project_dir
)
return result.returncode == 0
except (subprocess.SubprocessError, FileNotFoundError):
return False
async def format_file(self, file_path: str, formatter: str = "auto") -> FormatResult:
"""Format a Python file using black or ruff.
Args:
file_path: Path to the file to format
formatter: Formatter to use ("black", "ruff", or "auto")
Returns:
FormatResult object
"""
try:
from .file_handler import FileHandler
# Initialize file handler
file_handler = FileHandler(self.project_dir)
# Read the file
file_info = await file_handler.read_file(file_path)
if not file_info.exists or file_info.error:
return FormatResult(
formatted_code="",
changed=False,
success=False,
error=f"Cannot read file: {file_info.error or 'File does not exist'}"
)
# Format the code
format_result = await self.format_code(file_info.content, formatter)
# If formatting was successful and code changed, write back to file
if format_result.success and format_result.changed:
write_result = await file_handler.write_file(file_path, format_result.formatted_code)
if write_result.error:
return FormatResult(
formatted_code=format_result.formatted_code,
changed=format_result.changed,
success=False,
error=f"Formatting succeeded but failed to write file: {write_result.error}"
)
return format_result
except Exception as e:
logger.error(f"File formatting error: {e}")
return FormatResult(
formatted_code="",
changed=False,
success=False,
error=str(e)
)
async def lint_file(self, file_path: str, fix: bool = False) -> LintResult:
"""Lint a Python file using ruff.
Args:
file_path: Path to the file to lint
fix: Whether to auto-fix issues
Returns:
LintResult object
"""
try:
from .file_handler import FileHandler
# Initialize file handler
file_handler = FileHandler(self.project_dir)
# Read the file
file_info = await file_handler.read_file(file_path)
if not file_info.exists or file_info.error:
return LintResult(
errors=[{
"rule": "FileError",
"message": f"Cannot read file: {file_info.error or 'File does not exist'}",
"line": 0,
"column": 0,
"severity": "error"
}],
warnings=[],
success=False
)
# Lint the code
lint_result = await self.lint_code(file_info.content, fix)
# If fixing was requested and there were changes, write back to file
if fix and lint_result.success and lint_result.fixed_code and lint_result.fixed_code != file_info.content:
write_result = await file_handler.write_file(file_path, lint_result.fixed_code)
if write_result.error:
return LintResult(
errors=lint_result.errors,
warnings=lint_result.warnings,
fixed_code=lint_result.fixed_code,
success=False
)
return lint_result
except Exception as e:
logger.error(f"File linting error: {e}")
return LintResult(
errors=[{
"rule": "LintError",
"message": str(e),
"line": 0,
"column": 0,
"severity": "error"
}],
warnings=[],
success=False
)
async def format_code(self, code: str, formatter: str = "auto") -> FormatResult:
"""Format Python code using black or ruff.
Args:
code: Code to format
formatter: Formatter to use ("black", "ruff", or "auto")
Returns:
FormatResult object
"""
try:
# Determine which formatter to use
if formatter == "auto":
if self._is_tool_available("black"):
formatter = "black"
elif self._is_tool_available("ruff"):
formatter = "ruff"
else:
return FormatResult(
formatted_code=code,
changed=False,
success=False,
error="Neither black nor ruff is available"
)
# Format with chosen tool
if formatter == "black":
result = self._run_command(["black", "--code", code])
if result["success"]:
formatted_code = result["stdout"]
changed = formatted_code != code
return FormatResult(
formatted_code=formatted_code,
changed=changed,
success=True
)
else:
return FormatResult(
formatted_code=code,
changed=False,
success=False,
error=result["stderr"]
)
elif formatter == "ruff":
# Ruff format mode
result = self._run_command(["ruff", "format", "--stdin-filename", "temp.py"], input_text=code)
if result["success"]:
formatted_code = result["stdout"]
changed = formatted_code != code
return FormatResult(
formatted_code=formatted_code,
changed=changed,
success=True
)
else:
return FormatResult(
formatted_code=code,
changed=False,
success=False,
error=result["stderr"]
)
else:
return FormatResult(
formatted_code=code,
changed=False,
success=False,
error=f"Unknown formatter: {formatter}"
)
except Exception as e:
logger.error(f"Code formatting error: {e}")
return FormatResult(
formatted_code=code,
changed=False,
success=False,
error=str(e)
)
async def lint_code(self, code: str, fix: bool = False) -> LintResult:
"""Lint Python code using ruff.
Args:
code: Code to lint
fix: Whether to auto-fix issues
Returns:
LintResult object
"""
try:
if not self._is_tool_available("ruff"):
return LintResult(
errors=[],
warnings=[],
success=False
)
# Prepare ruff command
cmd = ["ruff", "check", "--stdin-filename", "temp.py", "--output-format", "json"]
if fix:
cmd.append("--fix")
result = self._run_command(cmd, input_text=code)
errors = []
warnings = []
fixed_code = None
# Parse ruff output
if result["stdout"].strip():
try:
ruff_output = json.loads(result["stdout"])
for issue in ruff_output:
issue_data = {
"rule": issue.get("code", "Unknown"),
"message": issue.get("message", "No message"),
"line": issue.get("location", {}).get("row", 0),
"column": issue.get("location", {}).get("column", 0),
"severity": issue.get("severity", "error")
}
if issue_data["severity"] == "error":
errors.append(issue_data)
else:
warnings.append(issue_data)
except json.JSONDecodeError:
# Fallback: parse text output
for line in result["stdout"].split('\n'):
if line.strip():
errors.append({
"rule": "ParseError",
"message": line.strip(),
"line": 0,
"column": 0,
"severity": "error"
})
# If fix was requested and there were changes, the fixed code would be in stderr for some cases
# For ruff, fixed code is typically written back to the file, but since we're using stdin,
# we might need to run format separately
if fix and result["success"]:
# Try to get fixed code by running format
format_result = await self.format_code(code, "ruff")
if format_result.success:
fixed_code = format_result.formatted_code
return LintResult(
errors=errors,
warnings=warnings,
fixed_code=fixed_code,
success=result["success"] or len(errors) == 0
)
except Exception as e:
logger.error(f"Code linting error: {e}")
return LintResult(
errors=[{
"rule": "LintError",
"message": str(e),
"line": 0,
"column": 0,
"severity": "error"
}],
warnings=[],
success=False
)
async def run_tests(self, test_path: Optional[str] = None, pattern: Optional[str] = None,
verbose: bool = False) -> TestResult:
"""Execute tests using pytest.
Args:
test_path: Optional path to specific test file or directory
pattern: Optional test pattern to match
verbose: Whether to run in verbose mode
Returns:
TestResult object
"""
try:
if not self._is_tool_available("pytest"):
return TestResult(
success=False,
total_tests=0,
passed_tests=0,
failed_tests=0,
skipped_tests=0,
output="pytest is not available",
error="pytest not found",
execution_time=0.0
)
# Build pytest command
cmd = ["pytest"]
if verbose:
cmd.append("-v")
if test_path:
cmd.append(test_path)
if pattern:
cmd.extend(["-k", pattern])
# Add JSON report for better parsing
cmd.extend(["--tb=short", "--no-header"])
result = self._run_command(cmd)
# Parse pytest output to extract test counts
output = result["stdout"] + result["stderr"]
# Extract test results using regex
total_tests = 0
passed_tests = 0
failed_tests = 0
skipped_tests = 0
# Look for pytest summary line like "= 5 passed, 2 failed, 1 skipped in 1.23s ="
summary_pattern = r"=+\s*(\d+)\s+passed(?:,\s*(\d+)\s+failed)?(?:,\s*(\d+)\s+skipped)?.*?in\s+[\d.]+s\s*=+"
summary_match = re.search(summary_pattern, output)
if summary_match:
passed_tests = int(summary_match.group(1) or 0)
failed_tests = int(summary_match.group(2) or 0)
skipped_tests = int(summary_match.group(3) or 0)
total_tests = passed_tests + failed_tests + skipped_tests
else:
# Fallback: look for individual patterns
passed_match = re.search(r"(\d+)\s+passed", output)
if passed_match:
passed_tests = int(passed_match.group(1))
failed_match = re.search(r"(\d+)\s+failed", output)
if failed_match:
failed_tests = int(failed_match.group(1))
skipped_match = re.search(r"(\d+)\s+skipped", output)
if skipped_match:
skipped_tests = int(skipped_match.group(1))
total_tests = passed_tests + failed_tests + skipped_tests
# Test success if no failures and at least some tests ran
test_success = failed_tests == 0 and total_tests > 0
return TestResult(
success=test_success,
total_tests=total_tests,
passed_tests=passed_tests,
failed_tests=failed_tests,
skipped_tests=skipped_tests,
output=result["stdout"],
error=result["stderr"],
execution_time=result["execution_time"]
)
except Exception as e:
logger.error(f"Test execution error: {e}")
return TestResult(
success=False,
total_tests=0,
passed_tests=0,
failed_tests=0,
skipped_tests=0,
output="",
error=str(e),
execution_time=0.0
)
def get_available_tools(self) -> Dict[str, bool]:
"""Get availability status of development tools.
Returns:
Dictionary mapping tool names to availability status
"""
tools = ["black", "ruff", "pytest"]
return {tool: self._is_tool_available(tool) for tool in tools}
def get_project_dev_info(self) -> Dict[str, Any]:
"""Get information about project development setup.
Returns:
Dictionary with development environment information
"""
try:
dev_files = [
"pyproject.toml", "setup.py", "setup.cfg",
"tox.ini", "pytest.ini", ".coveragerc",
"ruff.toml", ".pre-commit-config.yaml"
]
file_info = {}
for dev_file in dev_files:
file_path = self.project_dir / dev_file
file_info[dev_file] = {
"exists": file_path.exists(),
"path": str(file_path)
}
return {
"project_dir": str(self.project_dir),
"dev_files": file_info,
"available_tools": self.get_available_tools(),
"has_tests_dir": (self.project_dir / "tests").exists(),
"has_src_dir": (self.project_dir / "src").exists()
}
except Exception as e:
logger.error(f"Development info error: {e}")
return {
"project_dir": str(self.project_dir),
"error": str(e)
}