"""MCP Python REPL Server - Main server implementation."""
import logging
import asyncio
from pathlib import Path
from typing import Optional
from mcp.server.fastmcp import FastMCP
from .models import SessionInfo, ExecutionResult, InspectionResult, PackageInfo, FileInfo, EnvironmentVariable, FormatResult, LintResult, TestResult
from .session_manager import SessionManager
from .package_manager import PackageManager
from .file_handler import FileHandler
from .env_handler import EnvHandler
from .dev_tools import DevTools
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create server instance using FastMCP
server = FastMCP("mcp-py3repl")
# Global managers
session_manager = None
package_manager = None
file_handler = None
env_handler = None
dev_tools = None
@server.tool()
async def python_execute(code: str, session_id: str | None = None) -> str:
"""Execute Python code in REPL session"""
try:
result = await session_manager.execute_code(code, session_id)
# Format output
output_parts = []
if result.output.strip():
output_parts.append(f"Output:\n{result.output}")
if result.error.strip():
output_parts.append(f"Error:\n{result.error}")
if result.return_value:
output_parts.append(f"Return value:\n{result.return_value}")
if not output_parts:
output_parts.append("Code executed successfully (no output)")
output_text = "\n\n".join(output_parts)
output_text += f"\n\n[Execution time: {result.execution_time:.3f}s, Success: {result.success}]"
return output_text
except Exception as e:
logger.error(f"Execution error: {e}")
raise Exception(f"EXECUTION_ERROR: {str(e)}")
@server.tool()
async def python_inspect(object_name: str, session_id: str | None = None) -> str:
"""Inspect Python objects, variables, or modules"""
try:
session = session_manager.get_session(session_id)
result = await session.executor.inspect_object(object_name)
# Format inspection result
output_parts = [f"Object: {result.name}"]
output_parts.append(f"Type: {result.type}")
if result.value:
output_parts.append(f"Value: {result.value}")
if result.docstring:
output_parts.append(f"Docstring:\n{result.docstring}")
if result.attributes:
output_parts.append(f"Attributes: {', '.join(result.attributes[:10])}")
if len(result.attributes) > 10:
output_parts[-1] += f" (and {len(result.attributes) - 10} more)"
if result.methods:
output_parts.append(f"Methods: {', '.join(result.methods[:10])}")
if len(result.methods) > 10:
output_parts[-1] += f" (and {len(result.methods) - 10} more)"
if result.source:
output_parts.append(f"Source code:\n{result.source}")
output_text = "\n\n".join(output_parts)
return output_text
except Exception as e:
logger.error(f"Inspection error: {e}")
raise Exception(f"INSPECTION_ERROR: {str(e)}")
@server.tool()
async def session_create(session_name: str | None = None, working_directory: str | None = None) -> str:
"""Create a new Python REPL session"""
try:
session_id = await session_manager.create_session(session_name, working_directory)
session = session_manager.get_session(session_id)
session_info = session.to_session_info()
output_text = f"Created session: {session_info.name} ({session_id})\n"
output_text += f"Working directory: {session_info.working_directory}\n"
# Show venv status
if session.venv_path and session.venv_path.exists():
output_text += f"Virtual environment: โ
ACTIVATED ({session.venv_path})\n"
if session.venv_site_packages:
output_text += f"Python packages: {session.venv_site_packages}\n"
else:
output_text += f"Virtual environment: โ NOT FOUND\n"
output_text += f"Python path: {session.project_python_path}\n"
output_text += f"Created at: {session_info.created_at.isoformat()}\n"
# Add helpful message
output_text += f"\n๐ง This session can now import from your project:\n"
if working_directory:
project_name = Path(working_directory).name
output_text += f" - from {project_name}.models import YourModel\n"
output_text += f" - from {project_name}.config import settings\n"
else:
output_text += f" - from your_module import YourClass\n"
return output_text
except Exception as e:
logger.error(f"Session creation error: {e}")
raise Exception(f"SESSION_ERROR: {str(e)}")
@server.tool()
async def session_list() -> str:
"""List all active REPL sessions"""
try:
sessions = session_manager.list_sessions()
if not sessions:
return "No active sessions"
output_parts = ["Active sessions:"]
for session in sessions:
status = "ACTIVE" if session.active else "INACTIVE"
active_marker = " (current)" if session.id == session_manager.active_session_id else ""
output_parts.append(f"- {session.name} ({session.id}): {status}{active_marker}")
output_parts.append(f" Working directory: {session.working_directory}")
output_parts.append(f" Created: {session.created_at.isoformat()}")
output_text = "\n".join(output_parts)
return output_text
except Exception as e:
logger.error(f"Session list error: {e}")
raise Exception(f"SESSION_ERROR: {str(e)}")
@server.tool()
async def session_switch(session_id: str) -> str:
"""Switch to a different REPL session"""
try:
success = session_manager.switch_session(session_id)
session = session_manager.get_session(session_id)
session_info = session.to_session_info()
if success:
output_text = f"Switched to session: {session_info.name} ({session_id})\n"
output_text += f"Working directory: {session_info.working_directory}"
else:
output_text = f"Failed to switch to session {session_id}"
return output_text
except Exception as e:
logger.error(f"Session switch error: {e}")
raise Exception(f"SESSION_ERROR: {str(e)}")
@server.tool()
async def package_install(packages: list[str], dev: bool = False) -> str:
"""Install Python packages using uv"""
try:
if not package_manager:
raise ValueError("Package manager not initialized")
if not package_manager.is_uv_available():
raise ValueError("uv is not available - please install uv first")
result = await package_manager.install_packages(packages, dev)
if result["success"]:
output_text = f"โ
{result['message']}\n"
if result.get("output"):
output_text += f"\nInstallation output:\n{result['output']}"
else:
output_text = f"โ {result['message']}\n"
if result.get("error"):
output_text += f"\nError details:\n{result['error']}"
return output_text
except Exception as e:
logger.error(f"Package installation error: {e}")
raise Exception(f"PACKAGE_ERROR: {str(e)}")
@server.tool()
async def package_remove(packages: list[str]) -> str:
"""Remove Python packages using uv"""
try:
if not package_manager:
raise ValueError("Package manager not initialized")
if not package_manager.is_uv_available():
raise ValueError("uv is not available - please install uv first")
result = await package_manager.remove_packages(packages)
if result["success"]:
output_text = f"โ
{result['message']}\n"
if result.get("output"):
output_text += f"\nRemoval output:\n{result['output']}"
else:
output_text = f"โ {result['message']}\n"
if result.get("error"):
output_text += f"\nError details:\n{result['error']}"
return output_text
except Exception as e:
logger.error(f"Package removal error: {e}")
raise Exception(f"PACKAGE_ERROR: {str(e)}")
@server.tool()
async def package_list() -> str:
"""List installed packages in current environment"""
try:
if not package_manager:
raise ValueError("Package manager not initialized")
if not package_manager.is_uv_available():
raise ValueError("uv is not available - please install uv first")
packages = await package_manager.list_packages()
if not packages:
return "No packages installed or unable to retrieve package list"
output_parts = ["Installed packages:\n"]
# Sort packages by name for consistent output
packages_sorted = sorted(packages, key=lambda p: p.name.lower())
for package in packages_sorted:
marker = " (dev)" if package.dev_dependency else ""
output_parts.append(f"- {package.name} {package.version}{marker}")
output_text = "\n".join(output_parts)
output_text += f"\n\nTotal: {len(packages)} packages"
return output_text
except Exception as e:
logger.error(f"Package listing error: {e}")
raise Exception(f"PACKAGE_ERROR: {str(e)}")
@server.tool()
async def file_read_project(file_path: str, encoding: str | None = None) -> str:
"""Read a file from the project directory with Python session context and encoding detection.
Use built-in Read tool for general file reading - this is optimized for Python project files."""
try:
if not file_handler:
raise ValueError("File handler not initialized")
file_info = await file_handler.read_file(file_path, encoding)
# Format response
output_parts = [f"๐ File: {file_info.path}"]
if not file_info.exists:
output_parts.append(f"โ Error: {file_info.error}")
return "\n".join(output_parts)
output_parts.append(f"โ
File exists")
if file_info.size is not None:
output_parts.append(f"๐ Size: {file_info.size} bytes")
if file_info.encoding:
output_parts.append(f"๐ค Encoding: {file_info.encoding}")
if file_info.error:
output_parts.append(f"โ ๏ธ Warning: {file_info.error}")
if file_info.content is not None:
output_parts.append(f"\n๐ Content:\n{'-' * 40}")
output_parts.append(file_info.content)
output_parts.append('-' * 40)
return "\n".join(output_parts)
except Exception as e:
logger.error(f"File read error: {e}")
raise Exception(f"FILE_ERROR: {str(e)}")
@server.tool()
async def file_write_project(file_path: str, content: str, encoding: str | None = None) -> str:
"""Write content to a file in the project directory with encoding detection and safety checks.
Use built-in Write tool for general file writing - this provides additional safety and metadata."""
try:
if not file_handler:
raise ValueError("File handler not initialized")
file_info = await file_handler.write_file(file_path, content, encoding)
# Format response
output_parts = [f"๐ File: {file_info.path}"]
if file_info.error:
output_parts.append(f"โ Error: {file_info.error}")
else:
output_parts.append(f"โ
File written successfully")
if file_info.size is not None:
output_parts.append(f"๐ Size: {file_info.size} bytes")
if file_info.encoding:
output_parts.append(f"๐ค Encoding: {file_info.encoding}")
# Show first few lines of content for confirmation
lines = content.split('\n')
if len(lines) > 5:
preview = '\n'.join(lines[:3])
output_parts.append(f"\n๐ Content preview (first 3 lines):\n{preview}\n... ({len(lines)} total lines)")
else:
output_parts.append(f"\n๐ Content ({len(lines)} lines):\n{content}")
return "\n".join(output_parts)
except Exception as e:
logger.error(f"File write error: {e}")
raise Exception(f"FILE_ERROR: {str(e)}")
@server.tool()
async def load_env_file(env_file_path: str = ".env") -> str:
"""Load environment variables from .env file"""
try:
if not env_handler:
raise ValueError("Environment handler not initialized")
result = await env_handler.load_env_file(env_file_path)
# Format response
output_parts = [f"๐ Environment File: {env_file_path}"]
if result["success"]:
output_parts.append(f"โ
{result['message']}")
output_parts.append(f"๐ Path: {result['path']}")
output_parts.append(f"๐ Variables loaded: {result['loaded_count']}")
if result.get("variables"):
output_parts.append(f"๐ Variables: {', '.join(result['variables'])}")
else:
output_parts.append(f"โ {result['message']}")
if result.get("path"):
output_parts.append(f"๐ Path: {result['path']}")
return "\n".join(output_parts)
except Exception as e:
logger.error(f"Environment file load error: {e}")
raise Exception(f"ENV_ERROR: {str(e)}")
@server.tool()
async def set_env_var(name: str, value: str, session_specific: bool = True) -> str:
"""Set an environment variable for the session"""
try:
if not env_handler:
raise ValueError("Environment handler not initialized")
result = await env_handler.set_env_var(name, value, session_specific)
# Format response
output_parts = [f"๐ง Environment Variable: {name}"]
if result["success"]:
output_parts.append(f"โ
{result['message']}")
output_parts.append(f"๐พ Value: {result['value']}")
scope = "Session-specific" if result["session_specific"] else "Global"
output_parts.append(f"๐ฏ Scope: {scope}")
else:
output_parts.append(f"โ {result['message']}")
return "\n".join(output_parts)
except Exception as e:
logger.error(f"Environment variable set error: {e}")
raise Exception(f"ENV_ERROR: {str(e)}")
@server.tool()
async def list_env_vars(include_system: bool = False) -> str:
"""List current environment variables"""
try:
if not env_handler:
raise ValueError("Environment handler not initialized")
env_vars = await env_handler.list_env_vars(include_system)
if not env_vars:
return "๐ No environment variables found"
# Format response
output_parts = ["๐ Environment Variables:"]
session_vars = [var for var in env_vars if var.session_specific]
global_vars = [var for var in env_vars if not var.session_specific]
if session_vars:
output_parts.append("\n๐ฏ Session-specific variables:")
for var in session_vars:
# Mask sensitive values
display_value = "***" if any(sensitive in var.name.lower()
for sensitive in ["password", "secret", "key", "token"]) else var.value
output_parts.append(f" โข {var.name} = {display_value}")
if global_vars:
output_parts.append("\n๐ Global/Project variables:")
for var in global_vars:
# Mask sensitive values
display_value = "***" if any(sensitive in var.name.lower()
for sensitive in ["password", "secret", "key", "token"]) else var.value
output_parts.append(f" โข {var.name} = {display_value}")
output_parts.append(f"\n๐ Total: {len(env_vars)} variables")
return "\n".join(output_parts)
except Exception as e:
logger.error(f"Environment variable list error: {e}")
raise Exception(f"ENV_ERROR: {str(e)}")
@server.tool()
async def create_env_template(settings_class: str, output_path: str = ".env.example") -> str:
"""Generate .env.example from pydantic Settings model"""
try:
if not env_handler:
raise ValueError("Environment handler not initialized")
result = await env_handler.create_env_template(settings_class, output_path)
# Format response
output_parts = [f"๐ Environment Template: {output_path}"]
if result["success"]:
output_parts.append(f"โ
{result['message']}")
output_parts.append(f"๐ Path: {result['path']}")
output_parts.append(f"๐๏ธ Settings class: {result['settings_class']}")
output_parts.append(f"๐ Fields processed: {result['field_count']}")
if result.get("fields"):
output_parts.append(f"๐ Fields: {', '.join(result['fields'])}")
else:
output_parts.append(f"โ {result['message']}")
if result.get("error"):
output_parts.append(f"๐ Error: {result['error']}")
return "\n".join(output_parts)
except Exception as e:
logger.error(f"Environment template creation error: {e}")
raise Exception(f"ENV_ERROR: {str(e)}")
@server.tool()
async def format_file(file_path: str, formatter: str = "auto") -> str:
"""Format a Python file using black or ruff"""
try:
if not dev_tools:
raise ValueError("Development tools not initialized")
result = await dev_tools.format_file(file_path, formatter)
# Format response
output_parts = [f"๐จ File Formatting ({formatter})"]
output_parts.append(f"๐ File: {file_path}")
if result.success:
output_parts.append("โ
Formatting successful")
if result.changed:
output_parts.append("๐ File was modified")
else:
output_parts.append("๐ File was already properly formatted")
else:
output_parts.append("โ Formatting failed")
if result.error:
output_parts.append(f"๐ Error: {result.error}")
return "\n".join(output_parts)
except Exception as e:
logger.error(f"File formatting error: {e}")
raise Exception(f"FORMAT_ERROR: {str(e)}")
@server.tool()
async def lint_file(file_path: str, fix: bool = False) -> str:
"""Lint a Python file using ruff"""
try:
if not dev_tools:
raise ValueError("Development tools not initialized")
result = await dev_tools.lint_file(file_path, fix)
# Format response
output_parts = [f"๐ File Linting: {file_path} {'(with auto-fix)' if fix else ''}"]
if result.success:
if not result.errors and not result.warnings:
output_parts.append("โ
No issues found - code looks clean!")
else:
output_parts.append("โ
Linting completed")
else:
output_parts.append("โ Linting found issues")
# Show errors
if result.errors:
output_parts.append(f"\n๐จ Errors ({len(result.errors)}):")
for error in result.errors:
line_info = f"Line {error['line']}" if error['line'] > 0 else "General"
output_parts.append(f" โข {line_info}: {error['message']} ({error['rule']})")
# Show warnings
if result.warnings:
output_parts.append(f"\nโ ๏ธ Warnings ({len(result.warnings)}):")
for warning in result.warnings:
line_info = f"Line {warning['line']}" if warning['line'] > 0 else "General"
output_parts.append(f" โข {line_info}: {warning['message']} ({warning['rule']})")
# Show fixed code status if auto-fix was applied
if fix and result.fixed_code:
output_parts.append(f"\n๐ง Auto-fixes applied to {file_path}")
# Summary
total_issues = len(result.errors) + len(result.warnings)
if total_issues > 0:
output_parts.append(f"\n๐ Total issues: {total_issues} ({len(result.errors)} errors, {len(result.warnings)} warnings)")
return "\n".join(output_parts)
except Exception as e:
logger.error(f"Code linting error: {e}")
raise Exception(f"LINT_ERROR: {str(e)}")
@server.tool()
async def run_tests(test_path: str | None = None, pattern: str | None = None, verbose: bool = False) -> str:
"""Execute tests using pytest"""
try:
if not dev_tools:
raise ValueError("Development tools not initialized")
result = await dev_tools.run_tests(test_path, pattern, verbose)
# Format response
output_parts = ["๐งช Test Execution"]
if test_path:
output_parts.append(f"๐ Path: {test_path}")
if pattern:
output_parts.append(f"๐ Pattern: {pattern}")
if result.success:
output_parts.append("โ
All tests passed!")
else:
if result.total_tests == 0:
output_parts.append("โ No tests found or pytest not available")
else:
output_parts.append("โ Some tests failed")
# Test statistics
if result.total_tests > 0:
output_parts.append(f"\n๐ Test Results:")
output_parts.append(f" โข Total: {result.total_tests}")
output_parts.append(f" โข Passed: {result.passed_tests} โ
")
if result.failed_tests > 0:
output_parts.append(f" โข Failed: {result.failed_tests} โ")
if result.skipped_tests > 0:
output_parts.append(f" โข Skipped: {result.skipped_tests} โญ๏ธ")
output_parts.append(f" โข Duration: {result.execution_time:.2f}s โฑ๏ธ")
# Show test output
if result.output:
output_parts.append(f"\n๐ Test Output:\n{'-' * 40}")
# Limit output length for readability
test_output = result.output
if len(test_output) > 2000:
test_output = test_output[:2000] + "\n... (output truncated)"
output_parts.append(test_output)
output_parts.append('-' * 40)
# Show errors if any
if result.error:
output_parts.append(f"\n๐ Errors:\n{result.error}")
return "\n".join(output_parts)
except Exception as e:
logger.error(f"Test execution error: {e}")
raise Exception(f"TEST_ERROR: {str(e)}")
# Main entry point
def run():
"""Initialize and run the MCP server."""
global session_manager, package_manager, file_handler, env_handler, dev_tools
logger.info("Starting MCP Python REPL Server...")
# Initialize managers
session_manager = SessionManager()
package_manager = PackageManager()
file_handler = FileHandler()
env_handler = EnvHandler()
dev_tools = DevTools()
# Create default session in an async context
async def initialize():
try:
default_session_id = await session_manager.create_session("default")
logger.info(f"Created default session: {default_session_id}")
except Exception as e:
logger.error(f"Failed to create default session: {e}")
# Initialize default session before starting server
asyncio.run(initialize())
# Start server using FastMCP
logger.info("MCP server ready")
server.run(transport="stdio")
if __name__ == "__main__":
run()