"""
Windows MCP Server Example.
"""
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
import utils
# Create an MCP server
mcp = FastMCP("Windows MCP Server", "1.0.0")
class DriveInfo(BaseModel):
"""Model for drive information."""
name: str = Field(description="Drive name")
used_spaceGB: float = Field(description="Used space in GB")
free_spaceGB: float = Field(description="Free space in GB")
class ProcessInfo(BaseModel):
"""Model for process information returned by top-processes tools."""
pid: int = Field(description="Process ID")
name: str = Field(description="Process name")
cpu_percent: float | None = Field(default=None, description="CPU usage percentage sampled over a short interval")
memoryMB: float | None = Field(default=None, description="Resident memory usage in MB")
@mcp.tool(
name="Windows-system-info",
description="Get Windows system information including OS version, release, architecture, and hostname",
)
def get_windows_name_version() -> dict[str, str]:
"""Get Windows system information including OS version, release, architecture, and hostname."""
import platform
import socket
name = platform.node()
version = platform.version()
release = platform.release()
system = platform.system()
architecture = platform.architecture()[0]
hostname = socket.gethostname()
return {
"name": name,
"system": system,
"release": release,
"version": version,
"architecture": architecture,
"hostname": hostname
}
@mcp.tool(
name="Windows-last-boot-time",
description="Get the last boot time of the Windows system",
)
def get_windows_last_boot_time() -> str:
"""Get the last boot time of the Windows system."""
import subprocess
try:
# Use PowerShell to get the last boot time
result = subprocess.run(
["powershell", "-Command", "(Get-CimInstance -Class Win32_OperatingSystem).LastBootUpTime"],
capture_output=True,
text=True,
check=True
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
return f"Error retrieving last boot time: {e}"
@mcp.tool(
name="Windows-uptime",
description="Get the uptime of the Windows system",
)
def get_windows_uptime() -> str:
"""Get the uptime of the Windows system."""
import psutil
import time
try:
uptime_seconds = time.time() - psutil.boot_time()
return f"Uptime: {uptime_seconds:.0f} seconds"
except Exception as e:
return f"Error retrieving uptime: {e}"
@mcp.tool(
name="Windows-drives",
description="Get a list of all drives in the Windows system",
)
def get_drives() -> list[str]:
"""Get a list of all drives in the Windows system."""
try:
return utils._enumerate_drives()
except Exception:
return []
@mcp.tool(
name="Windows-drive-status",
description="Get status information about the used and free space in a Windows drive",
)
def get_drive_status(drive: str) -> DriveInfo:
"""Get status information about the used and free space in a Windows drive."""
import os
try:
# Normalize drive letter and construct root path
drive_letter = drive.strip().upper().rstrip(':\\/')
if len(drive_letter) != 1 or not drive_letter.isalpha():
return DriveInfo(name=drive, used_spaceGB=0.0, free_spaceGB=0.0)
root = f"{drive_letter}:\\"
if not os.path.exists(root):
return DriveInfo(name=drive_letter, used_spaceGB=0.0, free_spaceGB=0.0)
used_gb, free_gb = utils._disk_usage_gb(root)
return DriveInfo(name=drive_letter, used_spaceGB=used_gb, free_spaceGB=free_gb)
except Exception:
return DriveInfo(name=drive, used_spaceGB=0.0, free_spaceGB=0.0)
@mcp.tool(
name="Windows-drives-status-simple",
description="Get status for specific drives using a comma-separated drive letters as string (e.g., 'C,D,F')",
)
def get_drives_status_simple(drives_string: str) -> list[DriveInfo]:
"""Get status for specific drives using a comma-separated drive letters as string (e.g., 'C,D,F')"""
# Parse the comma-separated string into a list
drives = [drive.strip().rstrip(':') for drive in drives_string.split(',')]
return [get_drive_status(drive) for drive in drives if drive]
# @mcp.tool(
# name="Windows-drives-status",
# description="Get status information about the used and free space for multiple Windows drives as a list of drive letters",
# )
# def get_drives_status(drives: list[str]) -> list[DriveInfo]:
# """Get status information about the used and free space for multiple Windows drives as a list of drive letters"""
# return [get_drive_status(drive.rstrip(":").strip()) for drive in drives]
@mcp.tool(
name="Windows-memory-info",
description="Get memory information of the Windows system."
)
def get_memory_info() -> dict[str, str]:
"""Get memory information of the Windows system."""
import psutil
memory = psutil.virtual_memory()
total_memory = memory.total / (1024 ** 3)
available_memory = memory.available / (1024 ** 3)
used_memory = memory.used / (1024 ** 3)
return {
"total_memory": f"{total_memory:.2f} GB",
"available_memory": f"{available_memory:.2f} GB",
"used_memory": f"{used_memory:.2f} GB"
}
@mcp.tool(
name="Windows-network-info",
description="Get network information of the Windows system."
)
def get_network_info() -> dict[str, str]:
"""Get network information of the Windows system."""
import psutil
import socket
net_info = psutil.net_if_addrs()
result = {}
for interface, addresses in net_info.items():
for address in addresses:
if address.family == socket.AF_INET:
result[interface] = address.address
return result if result else {"error": "No network interfaces found."}
@mcp.tool(
name="Windows-cpu-info",
description="Get CPU information of the Windows system."
)
def get_cpu_info() -> str:
"""Get CPU information of the Windows system."""
import psutil
import platform
cpu_count = psutil.cpu_count(logical=True)
cpu_freq = psutil.cpu_freq()
cpu_model = platform.processor()
if cpu_freq is not None:
current, min_f, max_f = cpu_freq.current, cpu_freq.min, cpu_freq.max
else:
current, min_f, max_f = 0.0, 0.0, 0.0
return f"Model: {cpu_model}, CPU Count: {cpu_count}, Frequency: {current} MHz, Min: {min_f} MHz, Max: {max_f} MHz"
@mcp.tool(
name="Windows-gpu-info",
description="Get GPU information of the Windows system."
)
def get_gpu_info() -> str:
"""Get GPU information of the Windows system."""
import json
try:
output = utils._run_powershell(
"Get-CimInstance -Class Win32_VideoController | Select-Object Name, DriverVersion | ConvertTo-Json -Depth 3"
).strip()
if not output:
return "No GPU information found."
data = json.loads(output)
if isinstance(data, dict):
data = [data]
gpu_info = []
for item in data:
name = item.get("Name")
driver_version = item.get("DriverVersion")
if name and driver_version:
gpu_info.append(f"GPU: {name}, Driver Version: {driver_version}")
return "\n".join(gpu_info) if gpu_info else "No GPU information found."
except Exception as e:
return f"Error retrieving GPU info: {e}"
@mcp.tool(
name="Windows-top-processes-by-memory",
description="Get the top X processes by memory usage."
)
def get_top_processes_by_memory(amount: int = 5) -> list[ProcessInfo]:
"""Get the top X processes by memory usage. Returns structured results."""
import psutil
procs: list[ProcessInfo] = []
for p in psutil.process_iter(['pid', 'name', 'memory_info']):
try:
mem_info = p.info.get('memory_info')
rss = mem_info.rss if mem_info else p.memory_info().rss
mem_mb = rss / (1024 ** 2)
name = p.info.get('name') or p.name()
procs.append(ProcessInfo(pid=p.pid, name=name, memoryMB=mem_mb))
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
procs.sort(key=lambda x: x.memoryMB or 0.0, reverse=True)
return procs[:amount]
@mcp.tool(
name="Windows-top-processes-by-cpu",
description="Get the top X processes by CPU usage."
)
def get_top_processes_by_cpu(amount: int = 5) -> list[ProcessInfo]:
"""Get the top X processes by CPU usage. Uses sampling for accurate percentages and returns structured results."""
import psutil
import time
# Prime CPU percentage counters
tracked = []
for p in psutil.process_iter(['pid', 'name']):
try:
p.cpu_percent(None)
tracked.append(p)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
# Short sampling interval
time.sleep(0.3)
results: list[ProcessInfo] = []
for p in tracked:
try:
cpu = p.cpu_percent(None)
rss = p.memory_info().rss
mem_mb = rss / (1024 ** 2)
name = p.info.get('name') if hasattr(p, 'info') and p.info.get('name') else p.name()
results.append(ProcessInfo(pid=p.pid, name=name, cpu_percent=cpu, memoryMB=mem_mb))
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
results.sort(key=lambda x: x.cpu_percent or 0.0, reverse=True)
return results[:amount]