DevEnvInfoServer
#!/usr/bin/env python3
"""
DevEnv MCP Server
A comprehensive Model Context Protocol (MCP) server that gathers detailed information
about the development environment to provide context for LLMs.
"""
import asyncio
import json
import logging
import os
import platform
import re
import shutil
import socket
import subprocess
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple, Union
try:
import psutil
except ImportError:
subprocess.check_call([sys.executable, "-m", "pip", "install", "psutil"])
import psutil
try:
from mcp.server.fastmcp import FastMCP, Context
except ImportError:
subprocess.check_call([sys.executable, "-m", "pip", "install", "mcp[cli]"])
from mcp.server.fastmcp import FastMCP, Context
# Set up logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("devenv-mcp")
# Define a class to track progress
@dataclass
class ProgressTracker:
total: int = 0
completed: int = 0
current_task: str = ""
def update(self, task: str, increment: int = 1) -> None:
self.current_task = task
self.completed += increment
logger.info(f"Progress: [{self.completed}/{self.total}] {task}")
def reset(self, total: int) -> None:
self.total = total
self.completed = 0
self.current_task = ""
# Global progress tracker
progress = ProgressTracker()
# Define a context manager for task tracking
class TaskTracker:
def __init__(self, task_name: str):
self.task_name = task_name
def __enter__(self):
logger.info(f"Starting: {self.task_name}")
progress.update(self.task_name)
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
duration = time.time() - self.start_time
if exc_type:
logger.error(f"Failed: {self.task_name} after {duration:.2f}s - {exc_val}")
else:
logger.info(f"Completed: {self.task_name} in {duration:.2f}s")
return False # Don't suppress exceptions
# Create a helper function to safely run commands
async def run_command(cmd: Union[str, List[str]], shell: bool = False, timeout: int = 30) -> Tuple[str, str, int]:
"""Run a command and return stdout, stderr, and return code."""
if isinstance(cmd, str) and not shell:
cmd = cmd.split()
try:
process = await asyncio.create_subprocess_shell(
cmd if shell else " ".join(cmd),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
shell=shell
)
try:
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout)
return stdout.decode('utf-8', errors='replace'), stderr.decode('utf-8', errors='replace'), process.returncode
except asyncio.TimeoutError:
try:
process.kill()
except:
pass
return "", f"Command timed out after {timeout} seconds", -1
except Exception as e:
return "", f"Failed to execute command: {str(e)}", -1
def safe_execute(func, default_value: Any = None) -> Any:
"""Execute a function and return a default value if it fails."""
try:
return func()
except Exception as e:
logger.warning(f"Error executing {func.__name__}: {str(e)}")
return default_value
# Create the MCP server - remove the lifespan since we're simplifying
mcp = FastMCP("DevEnv Scanner")
# ----- System Information Collectors -----
async def collect_os_info() -> Dict[str, Any]:
"""Collect OS version information."""
with TaskTracker("Collecting OS Information"):
info = {
"system": platform.system(),
"release": platform.release(),
"version": platform.version(),
"machine": platform.machine(),
"architecture": platform.architecture(),
"node": platform.node(),
"platform": platform.platform(),
"processor": platform.processor(),
}
# Add distro information if on Linux
if platform.system() == "Linux":
try:
# Try to get more detailed Linux distribution information
stdout, stderr, returncode = await run_command("lsb_release -a", shell=True)
if returncode == 0:
for line in stdout.splitlines():
if ":" in line:
key, value = line.split(":", 1)
info[f"distro_{key.strip().lower()}"] = value.strip()
# Try to read from os-release file
if os.path.exists("/etc/os-release"):
with open("/etc/os-release", "r") as f:
for line in f:
if "=" in line:
key, value = line.split("=", 1)
info[f"os_{key.strip().lower()}"] = value.strip().strip('"')
except Exception as e:
logger.warning(f"Error collecting Linux distro info: {str(e)}")
# Add macOS information if applicable
if platform.system() == "Darwin":
stdout, stderr, returncode = await run_command("sw_vers", shell=True)
if returncode == 0:
for line in stdout.splitlines():
if ":" in line:
key, value = line.split(":", 1)
info[f"macos_{key.strip().lower().replace(' ', '_')}"] = value.strip()
# Add Windows information if applicable
if platform.system() == "Windows":
try:
stdout, stderr, returncode = await run_command("systeminfo", shell=True)
if returncode == 0:
for line in stdout.splitlines():
if ":" in line:
key, value = line.split(":", 1)
info[f"win_{key.strip().lower().replace(' ', '_')}"] = value.strip()
except Exception as e:
logger.warning(f"Error collecting Windows system info: {str(e)}")
return info
async def collect_hardware_info() -> Dict[str, Any]:
"""Collect hardware details."""
with TaskTracker("Collecting Hardware Information"):
info = {
"cpu_count_physical": psutil.cpu_count(logical=False),
"cpu_count_logical": psutil.cpu_count(logical=True),
"cpu_freq": safe_execute(lambda: psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None),
"memory_total": psutil.virtual_memory().total,
"memory_available": psutil.virtual_memory().available,
"disk_partitions": safe_execute(lambda: [p._asdict() for p in psutil.disk_partitions()]),
"disk_usage": {},
}
# Get disk usage for each partition
for partition in psutil.disk_partitions():
try:
info["disk_usage"][partition.mountpoint] = psutil.disk_usage(partition.mountpoint)._asdict()
except:
pass
# Try to get CPU model name
if platform.system() == "Linux":
try:
with open("/proc/cpuinfo", "r") as f:
for line in f:
if "model name" in line:
info["cpu_model"] = line.split(":", 1)[1].strip()
break
except:
pass
elif platform.system() == "Darwin":
stdout, stderr, returncode = await run_command("sysctl -n machdep.cpu.brand_string", shell=True)
if returncode == 0:
info["cpu_model"] = stdout.strip()
elif platform.system() == "Windows":
stdout, stderr, returncode = await run_command("wmic cpu get name", shell=True)
if returncode == 0 and len(stdout.strip().split("\n")) > 1:
info["cpu_model"] = stdout.strip().split("\n")[1].strip()
return info
async def collect_python_versions() -> Dict[str, Any]:
"""Collect information about installed Python versions."""
with TaskTracker("Collecting Python Versions"):
info = {
"current_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
"current_executable": sys.executable,
"implementation": platform.python_implementation(),
"installed_versions": [],
}
# Try to find all Python versions
python_executables = set()
# Check common paths
common_paths = os.environ.get("PATH", "").split(os.pathsep)
for path in common_paths:
for name in ["python", "python3"]:
for version in range(6, 13): # Python 3.6 to 3.12
for suffix in ["", ".exe"]:
executable = os.path.join(path, f"{name}{version}{suffix}")
if os.path.isfile(executable) and os.access(executable, os.X_OK):
python_executables.add(executable)
# Also check for plain python/python3
for suffix in ["", ".exe"]:
executable = os.path.join(path, f"{name}{suffix}")
if os.path.isfile(executable) and os.access(executable, os.X_OK):
python_executables.add(executable)
# Check for pyenv installations
pyenv_root = os.path.expanduser("~/.pyenv/versions")
if os.path.isdir(pyenv_root):
for version_dir in os.listdir(pyenv_root):
for suffix in ["", ".exe"]:
executable = os.path.join(pyenv_root, version_dir, "bin", f"python{suffix}")
if os.path.isfile(executable) and os.access(executable, os.X_OK):
python_executables.add(executable)
# Get version information for each Python executable
for executable in python_executables:
try:
stdout, stderr, returncode = await run_command(
[executable, "-c",
"import sys, platform; print(f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro},{platform.python_implementation()},{sys.executable}')"
]
)
if returncode == 0 and stdout.strip():
version, implementation, path = stdout.strip().split(",", 2)
info["installed_versions"].append({
"version": version,
"implementation": implementation,
"path": path,
})
except Exception as e:
logger.warning(f"Error checking Python version for {executable}: {str(e)}")
return info
async def collect_network_config() -> Dict[str, Any]:
"""Collect network configuration information."""
with TaskTracker("Collecting Network Configuration"):
info = {
"hostname": socket.gethostname(),
"interfaces": safe_execute(lambda: [i._asdict() for i in psutil.net_if_addrs().values()], []),
"connections": safe_execute(lambda: [c._asdict() for c in psutil.net_connections()], []),
}
# Get active interfaces
try:
info["active_interfaces"] = []
for interface, addresses in psutil.net_if_addrs().items():
for address in addresses:
if address.family == socket.AF_INET: # IPv4
info["active_interfaces"].append({
"interface": interface,
"address": address.address,
"netmask": address.netmask,
})
except Exception as e:
logger.warning(f"Error collecting active interfaces: {str(e)}")
# Try to get DNS configuration
if platform.system() == "Linux":
try:
if os.path.exists("/etc/resolv.conf"):
with open("/etc/resolv.conf", "r") as f:
nameservers = []
for line in f:
if line.startswith("nameserver"):
nameservers.append(line.split()[1])
info["dns_servers"] = nameservers
except Exception as e:
logger.warning(f"Error collecting DNS information: {str(e)}")
elif platform.system() == "Darwin":
stdout, stderr, returncode = await run_command("scutil --dns", shell=True)
if returncode == 0:
nameservers = []
for line in stdout.splitlines():
if "nameserver" in line and "[" in line and "]" in line:
server = line.split("[")[1].split("]")[0].strip()
if server:
nameservers.append(server)
info["dns_servers"] = nameservers
elif platform.system() == "Windows":
stdout, stderr, returncode = await run_command("ipconfig /all", shell=True)
if returncode == 0:
nameservers = []
for line in stdout.splitlines():
if "DNS Servers" in line and ":" in line:
server = line.split(":", 1)[1].strip()
if server:
nameservers.append(server)
info["dns_servers"] = nameservers
return info
async def collect_installed_programs() -> Dict[str, Any]:
"""Collect information about installed programs and package managers."""
with TaskTracker("Collecting Installed Programs"):
info = {
"package_managers": {},
"programs": [],
}
# Check common package managers
package_managers = [
("brew", "brew --version"),
("npm", "npm --version"),
("pip", "pip --version"),
("pip3", "pip3 --version"),
("yarn", "yarn --version"),
("uv", "uv --version"),
("cargo", "cargo --version"),
("go", "go version"),
("apt", "apt --version"),
("yum", "yum --version"),
("dnf", "dnf --version"),
("pacman", "pacman --version"),
("conda", "conda --version"),
]
for name, command in package_managers:
stdout, stderr, returncode = await run_command(command, shell=True)
if returncode == 0:
info["package_managers"][name] = stdout.strip()
# Find installed programs (platform specific)
if platform.system() == "Windows":
try:
stdout, stderr, returncode = await run_command("wmic product get name,version", shell=True)
if returncode == 0:
lines = stdout.strip().split("\n")[1:] # Skip the header
for line in lines:
parts = line.strip().rsplit(" ", 1)
if len(parts) >= 2:
name, version = parts[0].strip(), parts[1].strip()
if name:
info["programs"].append({"name": name, "version": version})
except Exception as e:
logger.warning(f"Error collecting Windows installed programs: {str(e)}")
elif platform.system() == "Darwin":
try:
stdout, stderr, returncode = await run_command("ls -la /Applications", shell=True)
if returncode == 0:
for line in stdout.splitlines():
if ".app" in line:
parts = line.split()
if len(parts) >= 9:
name = " ".join(parts[8:])
info["programs"].append({"name": name, "version": "Unknown"})
# Also check for Homebrew installed applications
if "brew" in info["package_managers"]:
stdout, stderr, returncode = await run_command("brew list --versions", shell=True)
if returncode == 0:
for line in stdout.splitlines():
parts = line.strip().split()
if len(parts) >= 2:
name, version = parts[0], " ".join(parts[1:])
info["programs"].append({"name": f"brew:{name}", "version": version})
except Exception as e:
logger.warning(f"Error collecting macOS installed programs: {str(e)}")
elif platform.system() == "Linux":
try:
# Try to get dpkg installations (Debian/Ubuntu)
stdout, stderr, returncode = await run_command("dpkg-query -l", shell=True)
if returncode == 0:
lines = stdout.strip().split("\n")
for line in lines:
if line.startswith("ii"):
parts = line.split()
if len(parts) >= 3:
name, version = parts[1], parts[2]
info["programs"].append({"name": f"deb:{name}", "version": version})
# Try to get rpm installations (Fedora/CentOS/RHEL)
stdout, stderr, returncode = await run_command("rpm -qa --queryformat '%{NAME} %{VERSION}\\n'", shell=True)
if returncode == 0:
for line in stdout.splitlines():
if " " in line:
name, version = line.split(" ", 1)
info["programs"].append({"name": f"rpm:{name}", "version": version})
except Exception as e:
logger.warning(f"Error collecting Linux installed programs: {str(e)}")
return info
async def collect_compilers() -> Dict[str, Any]:
"""Collect information about compilers and interpreters."""
with TaskTracker("Collecting Compiler Information"):
info = {
"compilers": {},
"interpreters": {},
}
# Check for common compilers
compilers = [
("gcc", "gcc --version"),
("g++", "g++ --version"),
("clang", "clang --version"),
("clang++", "clang++ --version"),
("javac", "javac -version"),
("rustc", "rustc --version"),
("dotnet", "dotnet --version"),
("swiftc", "swiftc --version"),
]
for name, command in compilers:
stdout, stderr, returncode = await run_command(command, shell=True)
if returncode == 0:
output = stdout if stdout else stderr
info["compilers"][name] = output.strip()
# Check for common interpreters
interpreters = [
("node", "node --version"),
("ruby", "ruby --version"),
("perl", "perl --version"),
("php", "php --version"),
("python", "python --version"),
("python3", "python3 --version"),
("R", "R --version"),
("julia", "julia --version"),
("lua", "lua -v"),
]
for name, command in interpreters:
stdout, stderr, returncode = await run_command(command, shell=True)
if returncode == 0:
output = stdout if stdout else stderr
info["interpreters"][name] = output.strip()
return info
async def collect_container_info() -> Dict[str, Any]:
"""Collect information about Docker, Kubernetes, etc."""
with TaskTracker("Collecting Container Information"):
info = {
"docker": {
"installed": False,
"version": None,
"containers": [],
"images": [],
},
"kubernetes": {
"installed": False,
"version": None,
"context": None,
},
"podman": {
"installed": False,
"version": None,
},
}
# Check Docker
docker_path = shutil.which("docker")
if docker_path:
info["docker"]["installed"] = True
# Get Docker version
stdout, stderr, returncode = await run_command("docker --version", shell=True)
if returncode == 0:
info["docker"]["version"] = stdout.strip()
# Get running containers
stdout, stderr, returncode = await run_command("docker ps", shell=True)
if returncode == 0:
lines = stdout.strip().split("\n")
if len(lines) > 1: # Skip header row
for line in lines[1:]:
info["docker"]["containers"].append(line)
# Get images
stdout, stderr, returncode = await run_command("docker images", shell=True)
if returncode == 0:
lines = stdout.strip().split("\n")
if len(lines) > 1: # Skip header row
for line in lines[1:]:
info["docker"]["images"].append(line)
# Check Kubernetes
kubectl_path = shutil.which("kubectl")
if kubectl_path:
info["kubernetes"]["installed"] = True
# Get Kubernetes version
stdout, stderr, returncode = await run_command("kubectl version --client", shell=True)
if returncode == 0:
info["kubernetes"]["version"] = stdout.strip()
# Get current context
stdout, stderr, returncode = await run_command("kubectl config current-context", shell=True)
if returncode == 0:
info["kubernetes"]["context"] = stdout.strip()
# Check Podman
podman_path = shutil.which("podman")
if podman_path:
info["podman"]["installed"] = True
# Get Podman version
stdout, stderr, returncode = await run_command("podman --version", shell=True)
if returncode == 0:
info["podman"]["version"] = stdout.strip()
return info
async def collect_gpu_info() -> Dict[str, Any]:
"""Collect information about GPUs and CUDA."""
with TaskTracker("Collecting GPU Information"):
info = {
"gpus": [],
"cuda": {
"installed": False,
"version": None,
},
"opencl": {
"installed": False,
"version": None,
},
}
# Platform-specific GPU detection
if platform.system() == "Windows":
try:
stdout, stderr, returncode = await run_command("wmic path win32_VideoController get name,driverversion,videoprocessor", shell=True)
if returncode == 0:
lines = stdout.strip().split("\n")
if len(lines) > 1: # Skip header row
for line in lines[1:]:
parts = line.strip().split()
if parts:
info["gpus"].append({
"name": " ".join(parts[:-2]) if len(parts) > 2 else " ".join(parts),
"driver_version": parts[-2] if len(parts) > 2 else None,
"processor": parts[-1] if len(parts) > 1 else None,
})
except Exception as e:
logger.warning(f"Error collecting Windows GPU information: {str(e)}")
elif platform.system() == "Linux":
try:
# Try to get information from lspci
stdout, stderr, returncode = await run_command("lspci | grep -i 'vga\\|3d\\|display'", shell=True)
if returncode == 0:
for line in stdout.splitlines():
if ":" in line:
info["gpus"].append({"name": line.split(":", 1)[1].strip()})
# Try to get NVIDIA-specific information
stdout, stderr, returncode = await run_command("nvidia-smi -L", shell=True)
if returncode == 0:
for line in stdout.splitlines():
if "GPU " in line:
info["gpus"].append({"name": line.strip(), "type": "nvidia"})
except Exception as e:
logger.warning(f"Error collecting Linux GPU information: {str(e)}")
elif platform.system() == "Darwin":
try:
stdout, stderr, returncode = await run_command("system_profiler SPDisplaysDataType", shell=True)
if returncode == 0:
current_gpu = None
for line in stdout.splitlines():
line = line.strip()
if "Chipset Model" in line:
current_gpu = {"name": line.split(":", 1)[1].strip()}
info["gpus"].append(current_gpu)
elif current_gpu and ":" in line:
key, value = line.split(":", 1)
current_gpu[key.strip().lower().replace(" ", "_")] = value.strip()
except Exception as e:
logger.warning(f"Error collecting macOS GPU information: {str(e)}")
# Check for CUDA
nvcc_path = shutil.which("nvcc")
if nvcc_path:
info["cuda"]["installed"] = True
# Get CUDA version
stdout, stderr, returncode = await run_command("nvcc --version", shell=True)
if returncode == 0:
for line in stdout.splitlines():
if "release" in line:
info["cuda"]["version"] = line.strip()
# Check for nvidia-smi
nvidia_smi_path = shutil.which("nvidia-smi")
if nvidia_smi_path:
stdout, stderr, returncode = await run_command("nvidia-smi", shell=True)
if returncode == 0:
info["cuda"]["nvidia_smi"] = stdout.strip()
return info
async def collect_dev_processes() -> Dict[str, Any]:
"""Collect information about running development processes."""
with TaskTracker("Collecting Development Processes"):
info = {
"dev_processes": [],
"editors": [],
"servers": [],
"databases": [],
}
# Define process patterns to look for
dev_patterns = [
# Editors
r"code", r"vscode", r"atom", r"sublime_text", r"vim", r"nvim", r"gvim",
r"emacs", r"pycharm", r"intellij", r"webstorm", r"phpstorm", r"goland",
# Servers
r"nginx", r"apache", r"httpd", r"node", r"flask", r"django", r"gunicorn",
r"uwsgi", r"webpack", r"vite", r"parcel", r"nodemon", r"pm2",
# Databases
r"mysql", r"postgres", r"mongodb", r"redis", r"sqlite", r"mariadb",
# Dev tools
r"npm", r"pip", r"conda", r"jupyter", r"docker", r"kubectl", r"java",
r"go", r"python", r"ruby", r"perl", r"php", r"rustc", r"cargo",
]
try:
# Get all running processes
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'username']):
try:
process_info = proc.info
process_name = process_info['name'].lower() if process_info['name'] else ""
# Convert cmdline to a string for pattern matching
cmdline = " ".join(process_info['cmdline']).lower() if process_info['cmdline'] else ""
# Check if this matches any dev process pattern
for pattern in dev_patterns:
if re.search(pattern, process_name) or re.search(pattern, cmdline):
info["dev_processes"].append({
"pid": process_info['pid'],
"name": process_info['name'],
"cmdline": process_info['cmdline'],
"username": process_info['username'],
})
# Categorize the process
if any(x in process_name or x in cmdline for x in ["code", "vscode", "atom", "sublime", "vim", "nvim", "emacs", "pycharm", "intellij"]):
info["editors"].append(process_info['name'])
elif any(x in process_name or x in cmdline for x in ["nginx", "apache", "httpd", "node", "flask", "django", "gunicorn"]):
info["servers"].append(process_info['name'])
elif any(x in process_name or x in cmdline for x in ["mysql", "postgres", "mongodb", "redis", "sqlite"]):
info["databases"].append(process_info['name'])
# Don't add duplicates
break
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
except Exception as e:
logger.warning(f"Error collecting development processes: {str(e)}")
# Deduplicate
info["editors"] = list(set(info["editors"]))
info["servers"] = list(set(info["servers"]))
info["databases"] = list(set(info["databases"]))
return info
# ----- Tool Endpoints -----
@mcp.tool()
async def get_system_info(ctx: Context, category: str) -> str:
"""
Get detailed information about a specific category of system information.
Args:
category: Category of information to retrieve. Available categories:
os, hardware, python, network, programs, compilers, containers, gpu, processes
"""
# Define collector functions for each category
category_collectors = {
"os": collect_os_info,
"hardware": collect_hardware_info,
"python": collect_python_versions,
"network": collect_network_config,
"programs": collect_installed_programs,
"compilers": collect_compilers,
"containers": collect_container_info,
"gpu": collect_gpu_info,
"processes": collect_dev_processes
}
if category not in category_collectors:
return f"Error: Invalid category '{category}'. Available categories: {', '.join(category_collectors.keys())}"
# Collect the data directly (no caching for simplicity)
with TaskTracker(f"Collecting {category} information"):
result = await category_collectors[category]()
return json.dumps(result, indent=2)
@mcp.tool()
async def analyze_cursor_environment(ctx: Context) -> str:
"""
Analyze the development environment specifically focusing on relevant components for Cursor IDE.
This provides a targeted analysis of components that affect code editing, execution, and AI assistance.
Returns:
str: A JSON string containing:
- OS and hardware capabilities for running Cursor
- Python environment details and package compatibility
- GPU availability for AI model acceleration
- Compiler/build tools for code execution
- System resource metrics impacting performance
"""
# List of important categories to scan for Cursor
categories = ["os", "hardware", "python"]
# Run a system scan for these categories
with TaskTracker("Analyzing Cursor environment"):
# Initialize collectors
collectors = {
"os": collect_os_info,
"hardware": collect_hardware_info,
"python": collect_python_versions,
"compilers": collect_compilers,
"gpu": collect_gpu_info
}
# Collect data concurrently
results = {}
futures = []
for category in categories:
if category in collectors:
futures.append((category, asyncio.create_task(collectors[category]())))
# Collect results
for category, future in futures:
try:
with TaskTracker(f"Collecting {category} information"):
results[category] = await future
except Exception as e:
logger.error(f"Error collecting {category} information: {str(e)}")
results[category] = {"error": str(e)}
# Analyze the environment and provide insights
insights = []
# OS insights
if "os" in results:
os_name = results["os"].get("system", "Unknown")
insights.append(f"**Operating System**: {os_name} {results['os'].get('release', '')}")
# Hardware insights
if "hardware" in results:
hw = results["hardware"]
insights.append(f"**CPU**: {hw.get('cpu_count_logical', 'Unknown')} cores")
if "memory_total" in hw:
ram_gb = hw["memory_total"] / (1024**3)
insights.append(f"**RAM**: {ram_gb:.1f} GB")
# Python insights
if "python" in results:
py_info = results["python"]
insights.append(f"**Python**: {py_info.get('current_version', 'Unknown')} ({py_info.get('implementation', 'Unknown')})")
# Format insights
markdown = "# Cursor Development Environment Analysis\n\n"
markdown += f"**Date**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
markdown += "## Environment Summary\n\n"
for insight in insights:
markdown += f"- {insight}\n"
return markdown
@mcp.tool()
async def scan_system(ctx: Context, categories: Union[List[str], str, None] = None, output_format: Optional[str] = "markdown") -> str:
"""
Scan the system and collect information about the specified categories.
Args:
categories: Categories to scan. Can be:
- A single category as a string (e.g., "hardware")
- A list of categories (e.g., ["os", "hardware"])
- None to scan all categories
Available categories: os, hardware, python, network, programs, compilers, containers, gpu, processes
output_format: Format for the output. Supported values: "markdown"(DONT USE), "json"(USE THIS). Default is "json". ONLY USE JSON FOR NOW
Examples:
To scan OS only: "os" or ["os"]
To scan multiple categories: ["os", "hardware", "python"]
To scan all categories: leave empty or use null
"""
# Define all available categories
all_categories = {
"os": collect_os_info,
"hardware": collect_hardware_info,
"python": collect_python_versions,
"network": collect_network_config,
"programs": collect_installed_programs,
"compilers": collect_compilers,
"containers": collect_container_info,
"gpu": collect_gpu_info,
"processes": collect_dev_processes
}
# Handle string input for categories by converting to list
if isinstance(categories, str):
categories = [categories]
# If no categories specified, scan all
if not categories:
categories = list(all_categories.keys())
# Initialize progress tracker
progress.reset(len(categories))
# Collect information for each category
results = {}
futures = []
# Start async tasks
with TaskTracker("Starting System Scan"):
for category in categories:
if category in all_categories:
futures.append((category, asyncio.create_task(all_categories[category]())))
# Collect results
for category, future in futures:
try:
with TaskTracker(f"Collecting {category} information"):
results[category] = await future
except Exception as e:
logger.error(f"Error collecting {category} information: {str(e)}")
results[category] = {"error": str(e)}
# Format the output based on requested format
if output_format and output_format.lower() == "json":
return json.dumps(results, indent=2)
else:
# Create a detailed markdown report
markdown = "# System Scan Report\n\n"
markdown += f"**Date**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
# Add system summary
if "os" in results:
os_info = results["os"]
markdown += "## System Summary\n\n"
markdown += f"**OS**: {os_info.get('system', 'Unknown')} {os_info.get('release', '')}\n"
markdown += f"**Version**: {os_info.get('version', 'Unknown')}\n"
markdown += f"**Architecture**: {os_info.get('machine', 'Unknown')}\n"
markdown += f"**Hostname**: {os_info.get('node', 'Unknown')}\n\n"
# Add hardware summary
if "hardware" in results:
hw_info = results["hardware"]
markdown += "## Hardware\n\n"
if "cpu_model" in hw_info:
markdown += f"**CPU**: {hw_info.get('cpu_model', 'Unknown')}\n"
markdown += f"**CPU Cores**: {hw_info.get('cpu_count_physical', 'Unknown')} physical, {hw_info.get('cpu_count_logical', 'Unknown')} logical\n"
if "memory_total" in hw_info:
memory_gb = hw_info.get("memory_total", 0) / (1024 * 1024 * 1024)
markdown += f"**Memory**: {memory_gb:.2f} GB total\n\n"
# Display disk information
if "disk_partitions" in hw_info and hw_info["disk_partitions"]:
markdown += "### Storage\n\n"
markdown += "| Mount Point | Type | Size |\n"
markdown += "|------------|------|------|\n"
for partition in hw_info["disk_partitions"]:
mountpoint = partition.get("mountpoint", "Unknown")
if "disk_usage" in hw_info and mountpoint in hw_info["disk_usage"]:
usage = hw_info["disk_usage"][mountpoint]
size_gb = usage.get("total", 0) / (1024 * 1024 * 1024)
markdown += f"| {mountpoint} | {partition.get('fstype', 'Unknown')} | {size_gb:.2f} GB |\n"
markdown += "\n"
# Add Python information
if "python" in results:
py_info = results["python"]
markdown += "## Python Environment\n\n"
markdown += f"**Current Python**: {py_info.get('current_version', 'Unknown')} ({py_info.get('implementation', 'Unknown')})\n"
markdown += f"**Executable**: {py_info.get('current_executable', 'Unknown')}\n\n"
if "installed_versions" in py_info and py_info["installed_versions"]:
markdown += "### Installed Python Versions\n\n"
markdown += "| Version | Implementation | Path |\n"
markdown += "|---------|---------------|------|\n"
for version in py_info["installed_versions"]:
markdown += f"| {version.get('version', 'Unknown')} | {version.get('implementation', 'Unknown')} | {version.get('path', 'Unknown')} |\n"
markdown += "\n"
# Add Network information
if "network" in results:
net_info = results["network"]
markdown += "## Network Configuration\n\n"
markdown += f"**Hostname**: {net_info.get('hostname', 'Unknown')}\n\n"
if "active_interfaces" in net_info and net_info["active_interfaces"]:
markdown += "### Network Interfaces\n\n"
markdown += "| Interface | IP Address | Netmask |\n"
markdown += "|-----------|------------|--------|\n"
for interface in net_info["active_interfaces"]:
markdown += f"| {interface.get('interface', 'Unknown')} | {interface.get('address', 'Unknown')} | {interface.get('netmask', 'Unknown')} |\n"
markdown += "\n"
if "dns_servers" in net_info and net_info["dns_servers"]:
markdown += "### DNS Servers\n\n"
for dns in net_info["dns_servers"]:
markdown += f"- {dns}\n"
markdown += "\n"
# Add compiler/interpreter information
if "compilers" in results:
comp_info = results["compilers"]
markdown += "## Development Environment\n\n"
if "compilers" in comp_info and comp_info["compilers"]:
markdown += "### Compilers\n\n"
for name, version in comp_info["compilers"].items():
markdown += f"- **{name}**: {version.splitlines()[0] if version.splitlines() else version}\n"
markdown += "\n"
if "interpreters" in comp_info and comp_info["interpreters"]:
markdown += "### Interpreters\n\n"
for name, version in comp_info["interpreters"].items():
markdown += f"- **{name}**: {version.splitlines()[0] if version.splitlines() else version}\n"
markdown += "\n"
# Add container information
if "containers" in results:
container_info = results["containers"]
markdown += "## Containerization\n\n"
if container_info.get("docker", {}).get("installed", False):
markdown += f"**Docker**: {container_info['docker'].get('version', 'Installed')}\n"
if container_info["docker"].get("containers"):
markdown += f"- Running containers: {len(container_info['docker']['containers'])}\n"
if container_info["docker"].get("images"):
markdown += f"- Available images: {len(container_info['docker']['images'])}\n"
markdown += "\n"
if container_info.get("kubernetes", {}).get("installed", False):
markdown += f"**Kubernetes**: {container_info['kubernetes'].get('version', 'Installed')}\n"
markdown += f"- Current context: {container_info['kubernetes'].get('context', 'Unknown')}\n\n"
# Add GPU information
if "gpu" in results:
gpu_info = results["gpu"]
markdown += "## GPU Information\n\n"
if gpu_info.get("gpus"):
markdown += "### GPUs\n\n"
for i, gpu in enumerate(gpu_info["gpus"]):
markdown += f"**GPU {i+1}**: {gpu.get('name', 'Unknown')}\n"
# Add driver version if available
if "driver_version" in gpu and gpu["driver_version"]:
markdown += f"- Driver version: {gpu['driver_version']}\n"
markdown += "\n"
if gpu_info.get("cuda", {}).get("installed", False):
markdown += f"**CUDA**: {gpu_info['cuda'].get('version', 'Installed')}\n\n"
# Add running processes if available
if "processes" in results:
proc_info = results["processes"]
markdown += "## Development Processes\n\n"
if proc_info.get("editors"):
markdown += "**Active Editors**: " + ", ".join(proc_info["editors"]) + "\n"
if proc_info.get("servers"):
markdown += "**Active Servers**: " + ", ".join(proc_info["servers"]) + "\n"
if proc_info.get("databases"):
markdown += "**Active Databases**: " + ", ".join(proc_info["databases"]) + "\n"
return markdown
# Add these tool functions to your script to replace the resources
#EEEEEEE
@mcp.tool()
async def get_os_info(ctx: Context) -> str:
"""
Get detailed information about the operating system.
Returns formatted details about the OS, version, architecture, etc.
"""
result = await collect_os_info()
return json.dumps(result, indent=2)
@mcp.tool()
async def get_hardware_info(ctx: Context) -> str:
"""
Get detailed information about the system hardware.
Returns CPU, memory, disk information in formatted JSON.
"""
result = await collect_hardware_info()
return json.dumps(result, indent=2)
@mcp.tool()
async def get_python_info(ctx: Context) -> str:
"""
Get detailed information about Python installations.
Returns current Python version and all detected Python environments.
"""
result = await collect_python_versions()
return json.dumps(result, indent=2)
@mcp.tool()
async def get_network_config(ctx: Context) -> str:
"""
Get detailed information about network configuration.
Returns network interfaces, connections, and DNS configuration.
"""
# Assuming you kept the collect_network_config function in your script
result = await collect_network_config()
return json.dumps(result, indent=2)
@mcp.tool()
async def get_installed_programs(ctx: Context) -> str:
"""
Get detailed information about installed programs and package managers.
Returns list of detected apps and package managers on the system.
"""
# Assuming you kept the collect_installed_programs function
result = await collect_installed_programs()
return json.dumps(result, indent=2)
@mcp.tool()
async def get_dev_tools_info(ctx: Context) -> str:
"""
Get detailed information about development tools.
Returns installed compilers, IDEs, and related development software.
"""
# This requires collect_compilers and other functions
results = {}
# Collect compilers data
with TaskTracker("Collecting development tools information"):
try:
results["compilers"] = await collect_compilers()
except Exception as e:
logger.error(f"Error collecting compiler information: {str(e)}")
results["compilers"] = {"error": str(e)}
return json.dumps(results, indent=2)
@mcp.tool()
async def get_container_info(ctx: Context) -> str:
"""
Get detailed information about containerization tools.
Returns Docker, Kubernetes, and other container-related information.
"""
# Assuming you kept the collect_container_info function
result = await collect_container_info()
return json.dumps(result, indent=2)
@mcp.tool()
async def get_gpu_info(ctx: Context) -> str:
"""
Get detailed information about GPUs and CUDA.
Returns information about installed graphics cards and CUDA capabilities.
"""
# Assuming you kept the collect_gpu_info function
result = await collect_gpu_info()
return json.dumps(result, indent=2)
@mcp.tool()
async def get_running_processes(ctx: Context) -> str:
"""
Get information about currently running development processes.
Returns information about editors, servers, and other dev-related processes.
"""
# Assuming you kept the collect_dev_processes function
result = await collect_dev_processes()
return json.dumps(result, indent=2)
@mcp.tool()
async def generate_system_report(ctx: Context, output_path: Optional[str] = None, format: str = "markdown") -> str:
"""
Generate a comprehensive system report with all available information.
Args:
output_path: Optional path to save the report. If not provided, returns the report directly.
format: Format for the output. Supported values: "markdown"(DONT USE), "json"(USE) (DONT USE MARKDOWN, USE JSON FOR NOW)
"""
# List all categories to scan
categories = ["os", "hardware", "python", "network", "programs", "compilers",
"gpu", "containers", "processes"]
# Use the existing scan_system function
report = await scan_system(ctx, categories, format)
# Write to file if path provided
if output_path:
try:
with open(os.path.expanduser(output_path), "w") as f:
f.write(report)
return f"System report saved to {output_path} ({len(report)} bytes)"
except Exception as e:
return f"Error writing report to file: {str(e)}\n\n{report}"
# Otherwise return directly
return report
# Run the server directly with stdio transport
if __name__ == "__main__":
mcp.run(transport='stdio')