Skip to main content
Glama

kill-process-mcp

kill_process_mcp.py9.06 kB
""" Author: Michal Szymanski <misiektoja-github@rm-rf.ninja> v1.1 Cross-platform MCP server exposing tools to list and kill OS processes: https://github.com/misiektoja/kill-process-mcp MCP tools: - process_list(): List running processes with optional filtering, sorting and limit - process_kill(): Kill the selected process """ from __future__ import annotations import asyncio from typing import Any, Dict, List, Literal import sys import os if sys.platform == "win32": SYSTEM_USERS = {"SYSTEM", "NT AUTHORITY\\SYSTEM", "LocalService", "NetworkService"} else: SYSTEM_USERS = {"root"} import ctypes import ctypes.util # Cross-platform stub, overridden on macOS def _phys_footprint(pid: int) -> int: return 0 # macOS-only helper for Activity Monitor-style memory if sys.platform == "darwin": _RUSAGE_INFO_V2 = 2 class _RUsageInfoV2(ctypes.Structure): _fields_ = [ ("ri_uuid", ctypes.c_uint8 * 16), ("ri_user_time", ctypes.c_uint64), ("ri_system_time", ctypes.c_uint64), ("ri_pkg_idle_wkups", ctypes.c_uint64), ("ri_interrupt_wkups", ctypes.c_uint64), ("ri_pageins", ctypes.c_uint64), ("ri_wired_size", ctypes.c_uint64), ("ri_resident_size", ctypes.c_uint64), ("ri_phys_footprint", ctypes.c_uint64), ("ri_proc_start_abstime", ctypes.c_uint64), ("ri_proc_exit_abstime", ctypes.c_uint64), ("ri_child_user_time", ctypes.c_uint64), ("ri_child_system_time", ctypes.c_uint64), ("ri_child_pkg_idle_wkups", ctypes.c_uint64), ("ri_child_interrupt_wkups", ctypes.c_uint64), ("ri_child_pageins", ctypes.c_uint64), ("ri_child_elapsed_abstime", ctypes.c_uint64), ("ri_diskio_bytesread", ctypes.c_uint64), ("ri_diskio_byteswritten", ctypes.c_uint64), ] _libproc = ctypes.CDLL(ctypes.util.find_library("proc"), use_errno=True) _proc_pid_rusage = _libproc.proc_pid_rusage _proc_pid_rusage.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.POINTER(_RUsageInfoV2)] _proc_pid_rusage.restype = ctypes.c_int def _phys_footprint(pid: int) -> int: info = _RUsageInfoV2() try: ret = _proc_pid_rusage(pid, _RUSAGE_INFO_V2, ctypes.byref(info)) if ret == 0: return int(info.ri_phys_footprint) except Exception: pass return 0 import psutil from mcp.server.fastmcp import Context, FastMCP mcp = FastMCP("kill_process_mcp", "MCP server exposing tools to list and kill OS processes", auth_server_provider=None, auth=None) # Prepares per-process CPU counters and returns initial sample def _snapshot_cpu() -> Dict[int, float]: data: Dict[int, float] = {} for proc in psutil.process_iter(attrs=["pid"]): try: data[proc.pid] = proc.cpu_percent(None) except psutil.Error: continue return data # Retrieves a safe list of process handles def _collect_processes() -> List[psutil.Process]: procs: List[psutil.Process] = [] for proc in psutil.process_iter(attrs=["pid", "username"]): try: procs.append(proc) except psutil.Error: continue return procs # Converts a psutil.Process to a JSON-serialisable dict def _serialize(proc: psutil.Process) -> Dict[str, Any]: try: try: mem = _phys_footprint(proc.pid) except Exception: mem = 0 if not mem: try: mi = proc.memory_full_info() mem = getattr(mi, "uss", mi.rss) except Exception: try: mem = proc.memory_info().rss except Exception: mem = 0 if sys.platform == "win32": try: exe_name = os.path.basename(proc.exe()) name = exe_name if exe_name else proc.name() except (psutil.Error, OSError, FileNotFoundError): name = proc.name() else: try: name = proc.name() except psutil.Error: name = "<unknown>" cpu = proc.cpu_percent(None) return {"pid": proc.pid, "name": name, "username": proc.username(), "status": proc.status(), "cpu_percent": cpu, "rss": mem} except psutil.Error: return {"pid": proc.pid, "name": "<terminated>", "username": "<unknown>", "status": "<terminated>", "cpu_percent": 0.0, "rss": 0} # Coerces int-like strings to int, else raises ValueError def _to_int(val: Any, field: str) -> int: if isinstance(val, int): return val if isinstance(val, str): try: return int(val.strip()) except ValueError: raise ValueError(f"{field} must be an integer or null, got: '{val}' (type: {type(val)})") raise ValueError(f"{field} must be an integer or null, got: {val} (type: {type(val)})") # Coerces float-like strings to float, else raises ValueError def _to_float(val: Any, field: str) -> float: if isinstance(val, (int, float)): return float(val) if isinstance(val, str): try: return float(val.strip()) except ValueError: raise ValueError(f"{field} must be a number, got: '{val}' (type: {type(val)})") raise ValueError(f"{field} must be a number, got: {val} (type: {type(val)})") @mcp.tool() async def process_list(sort_by: Literal["cpu", "memory"] = "cpu", duration: int | str = 2, limit: int | str | None = None, name_filter: str | None = None, user_filter: str | None = None, status_filter: Literal["running", "sleeping", "stopped", "zombie"] | None = None, min_cpu: float | str | None = None, min_memory: int | str | None = None, include_system: bool = False, sort_asc: bool = False, ctx: Context | None = None,) -> List[Dict[str, Any]]: """List running processes sorted by CPU or memory with optional name, user, status, CPU/memory thresholds, system-process filtering, sort order and limit.""" if ctx: await ctx.info( f"process_list called sort_by={sort_by} duration={duration} (type={type(duration)}) " f"limit={limit} (type={type(limit)}) name_filter={name_filter} user_filter={user_filter} " f"status_filter={status_filter} min_cpu={min_cpu} (type={type(min_cpu)}) " f"min_memory={min_memory} (type={type(min_memory)}) include_system={include_system} " f"sort_asc={sort_asc}" ) if sort_by not in {"cpu", "memory"}: raise ValueError(f"sort_by must be 'cpu' or 'memory', got: {sort_by} (type: {type(sort_by)})") if limit is not None: limit = _to_int(limit, "limit") if limit < 0: raise ValueError(f"limit must be non-negative, got: {limit}") duration = _to_int(duration, "duration") if duration < 0: raise ValueError(f"duration must be non-negative, got: {duration}") if min_cpu is not None: min_cpu = _to_float(min_cpu, "min_cpu") if min_cpu < 0: raise ValueError(f"min_cpu must be non-negative, got: {min_cpu}") if min_memory is not None: min_memory = _to_int(min_memory, "min_memory") if min_memory < 0: raise ValueError(f"min_memory must be non-negative, got: {min_memory}") _snapshot_cpu() await asyncio.sleep(max(0.5, duration if sort_by == "cpu" else duration)) procs = _collect_processes() serialised = [_serialize(p) for p in procs] if not include_system: serialised = [p for p in serialised if p["username"] not in SYSTEM_USERS] if name_filter is not None: serialised = [p for p in serialised if name_filter.lower() in p["name"].lower()] if user_filter is not None: serialised = [p for p in serialised if user_filter.lower() in p["username"].lower()] if status_filter is not None: serialised = [p for p in serialised if p["status"] == status_filter] if min_cpu is not None: serialised = [p for p in serialised if p["cpu_percent"] >= min_cpu] if min_memory is not None: serialised = [p for p in serialised if p["rss"] >= min_memory] key = "cpu_percent" if sort_by == "cpu" else "rss" result = sorted(serialised, key=lambda p: p[key], reverse=not sort_asc) if limit is not None: result = result[:limit] return result @mcp.tool() async def process_kill(pid: int, ctx: Context | None = None) -> str: """Kill the process identified by the given PID""" if ctx: await ctx.info(f"process_kill called pid={pid}") if pid == os.getpid(): return "Refusing to kill MCP server process" try: proc = psutil.Process(pid) proc.kill() proc.wait(timeout=5) return f"Process {pid} terminated" except (psutil.NoSuchProcess, psutil.AccessDenied) as err: return f"Failed to kill {pid}: {err}" def main() -> None: mcp.run() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/misiektoja/kill-process-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server