import psutil
import time
import typing as t
import json
import sys
from datetime import datetime
from mcp.server.fastmcp import FastMCP
from .config import DEFAULT_PROCESSES, Settings
mcp = FastMCP(
name="Build Unblocker",
description="Terminate hung Windows build executables so Cursor can unblock itself."
)
def is_process_idle(proc: psutil.Process, idle_seconds: int) -> bool:
"""Checks if a process is idle based on CPU usage and age."""
try:
# Check CPU usage over a 1-second interval
cpu_percent = proc.cpu_percent(interval=1.0)
if cpu_percent >= 1.0:
return False
# Check process age
create_time = proc.create_time()
process_age = time.time() - create_time
if process_age < idle_seconds:
return False
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
# Process may have exited or access denied, treat as not idle for safety
return False
@mcp.tool(name="unblock_build", description="Kill build executables idle > idle_seconds.")
def unblock_build(idle_seconds: int = 90,
process_names: t.List[str] | None = None,
dry_run: bool = False) -> dict:
"""
Terminates hung build executables that are idle for a specified duration.
Args:
idle_seconds: The duration in seconds a process must be idle to be considered hung.
process_names: A list of process names to monitor. Defaults to DEFAULT_PROCESSES.
dry_run: If True, logs actions without actually killing processes.
Returns:
A dictionary summarizing the examination and killing of processes.
"""
settings = Settings(idle_seconds=idle_seconds, process_names=process_names or DEFAULT_PROCESSES, dry_run=dry_run)
examined_count = 0
killed_count = 0
killed_processes = []
for proc in psutil.process_iter(['pid', 'name', 'create_time']):
if proc.info['name'] in settings.process_names:
examined_count += 1
if is_process_idle(proc, settings.idle_seconds):
if settings.dry_run:
print(f"Dry run: Would kill process {proc.info['name']} (PID: {proc.info['pid']}) - idle for > {settings.idle_seconds}s", file=sys.stderr)
else:
try:
print(f"Killing process {proc.info['name']} (PID: {proc.info['pid']}) - idle for > {settings.idle_seconds}s", file=sys.stderr)
proc.kill()
killed_count += 1
killed_processes.append({"name": proc.info['name'], "pid": proc.info['pid']})
except (psutil.NoSuchProcess, psutil.AccessDenied):
print(f"Failed to kill process {proc.info['name']} (PID: {proc.info['pid']})", file=sys.stderr)
return {
"examined": examined_count,
"killed": killed_count,
"killed_processes": killed_processes,
"timestamp": datetime.utcnow().isoformat() + "Z",
"dry_run": settings.dry_run,
"idle_seconds_threshold": settings.idle_seconds,
"process_names_monitored": settings.process_names,
}
def main():
"""Main entry point for the CLI."""
# FastMCP handles CLI parsing and tool execution
mcp.run()
if __name__ == "__main__":
main()