portmaster_mcp.pyā¢30.4 kB
#!/usr/bin/env python3
"""
PortMaster MCP - Windows Process and Service Management with UAC Elevation
Handles any Windows service or process by PID, service name, or port
"""
import asyncio
import subprocess
import sys
import logging
import os
from typing import Dict, Any, List
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("portmaster-mcp")
# Server
server = Server("portmaster-mcp")
def run_powershell(script: str) -> str:
"""Run PowerShell and return output"""
try:
result = subprocess.run(
["powershell", "-ExecutionPolicy", "Bypass", "-NoProfile", "-Command", script],
capture_output=True,
text=True,
timeout=30
)
return result.stdout if result.returncode == 0 else f"Error: {result.stderr}"
except Exception as e:
return f"Error: {str(e)}"
def run_elevated_operation(operation: str, target: str, force: bool = False) -> str:
"""Run an operation with UAC elevation using the admin helper script"""
try:
# Path to the admin helper script
script_path = os.path.join(os.path.dirname(__file__), "admin_helper.ps1")
# Build the command
force_arg = " -Force" if force else ""
cmd = [
"powershell",
"-ExecutionPolicy", "Bypass",
"-Command",
f"Start-Process powershell -ArgumentList '-ExecutionPolicy Bypass -File \"{script_path}\" -Operation {operation} -Target \"{target}\"{force_arg}' -Verb RunAs -Wait"
]
# Execute with UAC prompt
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0:
return "Operation completed successfully (UAC elevation was used)"
else:
return f"Operation failed or was cancelled by user: {result.stderr}"
except Exception as e:
return f"Error during elevated operation: {str(e)}"
def check_admin_privileges() -> bool:
"""Check if currently running with administrator privileges"""
script = "([Security.Principal.WindowsPrincipal][Security.Principal.WindowsIdentity]::GetCurrent()).IsInRole([Security.Principal.WindowsBuiltInRole]::Administrator)"
result = run_powershell(script)
return result.strip().lower() == "true"
@server.list_tools()
async def list_tools() -> List[Tool]:
"""List available tools"""
return [
Tool(
name="find_processes_by_service",
description="Find processes for any Windows service by name (supports wildcards)",
inputSchema={
"type": "object",
"properties": {
"service_pattern": {
"type": "string",
"description": "Service name pattern (supports wildcards like 'SQL*' or '*Web*')"
}
},
"required": ["service_pattern"]
}
),
Tool(
name="find_processes_by_port",
description="Find processes listening on specific port(s)",
inputSchema={
"type": "object",
"properties": {
"ports": {
"type": "array",
"items": {"type": "integer"},
"description": "List of port numbers to search for"
}
},
"required": ["ports"]
}
),
Tool(
name="get_all_listening_ports",
description="Get all listening ports on the system",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="get_process_info",
description="Get detailed information about a process by PID",
inputSchema={
"type": "object",
"properties": {
"process_id": {
"type": "integer",
"description": "Process ID to get information for"
}
},
"required": ["process_id"]
}
),
Tool(
name="get_multiple_process_info",
description="Get detailed information about multiple processes by PID list",
inputSchema={
"type": "object",
"properties": {
"process_ids": {
"type": "array",
"items": {"type": "integer"},
"description": "List of process IDs to get information for"
}
},
"required": ["process_ids"]
}
),
Tool(
name="get_service_info",
description="Get detailed information about a Windows service",
inputSchema={
"type": "object",
"properties": {
"service_name": {
"type": "string",
"description": "Service name (exact name or pattern with wildcards)"
}
},
"required": ["service_name"]
}
),
Tool(
name="kill_process",
description="Terminate a specific process by PID (will prompt for UAC elevation)",
inputSchema={
"type": "object",
"properties": {
"process_id": {
"type": "integer",
"description": "Process ID to terminate"
},
"force": {
"type": "boolean",
"default": False,
"description": "Skip confirmation prompt in elevated session"
}
},
"required": ["process_id"]
}
),
Tool(
name="kill_processes_by_port",
description="Kill all processes listening on specific port(s) (will prompt for UAC elevation)",
inputSchema={
"type": "object",
"properties": {
"ports": {
"type": "array",
"items": {"type": "integer"},
"description": "List of port numbers - will kill all processes using these ports"
},
"force": {
"type": "boolean",
"default": False,
"description": "Skip confirmation prompt in elevated session"
}
},
"required": ["ports"]
}
),
Tool(
name="kill_multiple_processes",
description="Kill multiple processes by their PIDs (will prompt for UAC elevation)",
inputSchema={
"type": "object",
"properties": {
"process_ids": {
"type": "array",
"items": {"type": "integer"},
"description": "List of process IDs to terminate"
},
"force": {
"type": "boolean",
"default": False,
"description": "Skip confirmation prompt in elevated session"
}
},
"required": ["process_ids"]
}
),
Tool(
name="stop_service",
description="Stop any Windows service by name (will prompt for UAC elevation)",
inputSchema={
"type": "object",
"properties": {
"service_name": {
"type": "string",
"description": "Service name to stop"
},
"force": {
"type": "boolean",
"default": False,
"description": "Skip confirmation prompt in elevated session"
}
},
"required": ["service_name"]
}
),
Tool(
name="start_service",
description="Start any Windows service by name (will prompt for UAC elevation)",
inputSchema={
"type": "object",
"properties": {
"service_name": {
"type": "string",
"description": "Service name to start"
}
},
"required": ["service_name"]
}
),
Tool(
name="restart_service",
description="Restart any Windows service by name (will prompt for UAC elevation)",
inputSchema={
"type": "object",
"properties": {
"service_name": {
"type": "string",
"description": "Service name to restart"
},
"force": {
"type": "boolean",
"default": False,
"description": "Skip confirmation prompt in elevated session"
}
},
"required": ["service_name"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any] | None) -> List[TextContent]:
"""Handle tool calls"""
if arguments is None:
arguments = {}
# Read-only operations (no elevation needed)
if name == "find_processes_by_service":
service_pattern = arguments.get("service_pattern", "")
if not service_pattern:
return [TextContent(type="text", text="Error: No service pattern specified")]
script = f"""
Write-Host "=== Service Search: {service_pattern} ==="
$services = Get-Service | Where-Object {{ $_.Name -like "{service_pattern}" -or $_.DisplayName -like "*{service_pattern}*" }}
if ($services) {{
foreach ($service in $services) {{
Write-Host ""
Write-Host "Service: $($service.Name) - $($service.DisplayName)"
Write-Host "Status: $($service.Status)"
Write-Host "Start Type: $($service.StartType)"
if ($service.Status -eq "Running") {{
$wmiService = Get-WmiObject -Class Win32_Service -Filter "Name='$($service.Name)'"
$processId = $wmiService.ProcessId
if ($processId -gt 0) {{
$process = Get-Process -Id $processId -ErrorAction SilentlyContinue
if ($process) {{
Write-Host "Main Process: $($process.ProcessName) (PID: $processId)"
Write-Host "Memory Usage: $([math]::Round($process.WorkingSet64 / 1MB, 1)) MB"
# Get all related processes (process tree)
$allProcesses = @($processId)
function Get-AllChildren($parentPid) {{
$children = Get-WmiObject Win32_Process -Filter "ParentProcessId=$parentPid" -ErrorAction SilentlyContinue
foreach ($child in $children) {{
$script:allProcesses += $child.ProcessId
Get-AllChildren $child.ProcessId
}}
}}
Get-AllChildren $processId
if ($allProcesses.Count -gt 1) {{
Write-Host "Process Tree:"
foreach ($processIdVar in $allProcesses) {{
$proc = Get-Process -Id $processIdVar -ErrorAction SilentlyContinue
if ($proc) {{
Write-Host " $($proc.ProcessName) (PID: $processIdVar)"
# Check for ports
$tcpPorts = Get-NetTCPConnection -OwningProcess $processIdVar -State Listen -ErrorAction SilentlyContinue
if ($tcpPorts) {{
Write-Host " TCP Ports:"
$tcpPorts | ForEach-Object {{ Write-Host " $($_.LocalAddress):$($_.LocalPort)" }}
}}
}}
}}
Write-Host ""
Write-Host "=== Management Options ==="
Write-Host "All Process IDs: $($allProcesses -join ', ')"
Write-Host "Use 'kill_multiple_processes' with these PIDs to terminate the entire tree"
Write-Host "Use 'stop_service' with service name '$($service.Name)' to stop the service"
}} else {{
$tcpPorts = Get-NetTCPConnection -OwningProcess $processId -State Listen -ErrorAction SilentlyContinue
if ($tcpPorts) {{
Write-Host "TCP Ports:"
$tcpPorts | ForEach-Object {{ Write-Host " $($_.LocalAddress):$($_.LocalPort)" }}
}}
}}
}}
}}
}} else {{
Write-Host "Service is not running"
}}
}}
}} else {{
Write-Host "No services found matching pattern: {service_pattern}"
}}
"""
output = run_powershell(script)
return [TextContent(type="text", text=output)]
elif name == "get_service_info":
service_name = arguments.get("service_name", "")
if not service_name:
return [TextContent(type="text", text="Error: No service name specified")]
# Use the same logic as find_processes_by_service but for a specific service
return await call_tool("find_processes_by_service", {"service_pattern": service_name})
elif name == "find_processes_by_port":
ports = arguments.get("ports", [])
if not ports:
return [TextContent(type="text", text="Error: No ports specified")]
port_list = ",".join(map(str, ports))
script = f"""
Write-Host "=== Port Search: {port_list} ==="
$foundProcesses = @()
foreach ($port in @({port_list})) {{
$tcpResult = Get-NetTCPConnection -LocalPort $port -State Listen -ErrorAction SilentlyContinue
if ($tcpResult) {{
Write-Host ""
Write-Host "Port $port (TCP):"
$tcpResult | ForEach-Object {{
$proc = Get-Process -Id $_.OwningProcess -ErrorAction SilentlyContinue
if ($proc) {{
Write-Host " Process: $($proc.ProcessName) (PID: $($_.OwningProcess))"
Write-Host " Address: $($_.LocalAddress):$($_.LocalPort)"
$foundProcesses += $_.OwningProcess
# Try to find parent service
$currentPid = $_.OwningProcess
for ($i = 0; $i -lt 5; $i++) {{
$service = Get-WmiObject Win32_Service | Where-Object {{ $_.ProcessId -eq $currentPid }}
if ($service) {{
Write-Host " Parent Service: $($service.Name) - $($service.DisplayName)"
break
}}
$parentProc = Get-WmiObject Win32_Process -Filter "ProcessId=$currentPid"
if ($parentProc -and $parentProc.ParentProcessId) {{
$currentPid = $parentProc.ParentProcessId
}} else {{
break
}}
}}
}}
}}
}}
}}
if ($foundProcesses.Count -eq 0) {{
Write-Host "No processes found listening on ports: {port_list}"
}} else {{
$uniqueProcesses = $foundProcesses | Sort-Object -Unique
Write-Host ""
Write-Host "=== Management Options ==="
Write-Host "Found $($uniqueProcesses.Count) unique process(es) using the specified ports"
Write-Host "Process IDs: $($uniqueProcesses -join ', ')"
Write-Host "Use 'kill_processes_by_port' to terminate all processes using these ports"
Write-Host "Use 'kill_multiple_processes' to terminate specific PIDs"
}}
"""
output = run_powershell(script)
return [TextContent(type="text", text=output)]
elif name == "get_all_listening_ports":
script = """
Write-Host "=== All Listening TCP Ports ==="
$tcpPorts = Get-NetTCPConnection -State Listen | Sort-Object LocalPort
foreach ($port in $tcpPorts) {
$proc = Get-Process -Id $port.OwningProcess -ErrorAction SilentlyContinue
if ($proc) {
Write-Host "Port $($port.LocalPort): $($proc.ProcessName) (PID: $($port.OwningProcess))"
}
}
"""
output = run_powershell(script)
return [TextContent(type="text", text=output)]
elif name == "get_process_info":
process_id = arguments.get("process_id", 0)
if process_id <= 0:
return [TextContent(type="text", text="Error: Invalid process ID")]
script = f"""
$process = Get-Process -Id {process_id} -ErrorAction SilentlyContinue
if ($process) {{
Write-Host "=== Process Information for PID {process_id} ==="
Write-Host "Process Name: $($process.ProcessName)"
Write-Host "Memory Usage: $([math]::Round($process.WorkingSet64 / 1MB, 1)) MB"
$wmiProcess = Get-WmiObject Win32_Process -Filter "ProcessId={process_id}"
if ($wmiProcess -and $wmiProcess.CommandLine) {{
Write-Host "Command Line: $($wmiProcess.CommandLine)"
}}
# Check for listening ports
$tcpPorts = Get-NetTCPConnection -OwningProcess {process_id} -State Listen -ErrorAction SilentlyContinue
if ($tcpPorts) {{
Write-Host "TCP Listening Ports:"
$tcpPorts | ForEach-Object {{ Write-Host " $($_.LocalAddress):$($_.LocalPort)" }}
}}
# Check for parent service
$service = Get-WmiObject Win32_Service | Where-Object {{ $_.ProcessId -eq {process_id} }}
if ($service) {{
Write-Host "Direct Service: $($service.Name) - $($service.DisplayName)"
}}
# Check for child processes
$children = Get-WmiObject Win32_Process -Filter "ParentProcessId={process_id}" -ErrorAction SilentlyContinue
if ($children) {{
Write-Host "Child Processes:"
foreach ($child in $children) {{
$childProc = Get-Process -Id $child.ProcessId -ErrorAction SilentlyContinue
if ($childProc) {{
Write-Host " $($childProc.ProcessName) (PID: $($child.ProcessId))"
}}
}}
}}
}} else {{
Write-Host "Process with PID {process_id} not found"
}}
"""
output = run_powershell(script)
return [TextContent(type="text", text=output)]
elif name == "get_multiple_process_info":
process_ids = arguments.get("process_ids", [])
if not process_ids or len(process_ids) == 0:
return [TextContent(type="text", text="Error: No process IDs provided")]
# Full detailed version using the working individual approach
script = f"""
Write-Host "=== Multiple Process Information ==="
Write-Host "Analyzing {len(process_ids)} processes..."
Write-Host ""
"""
# Add each process individually with full details
for i, pid in enumerate(process_ids):
script += f"""
Write-Host "=== Process Information for PID {pid} ==="
$process{i} = Get-Process -Id {pid} -ErrorAction SilentlyContinue
if ($process{i}) {{
Write-Host "Process Name: $($process{i}.ProcessName)"
Write-Host "Memory Usage: $([math]::Round($process{i}.WorkingSet64 / 1MB, 1)) MB"
# Format CPU Time and Start Time
if ($process{i}.TotalProcessorTime) {{
Write-Host "CPU Time: $($process{i}.TotalProcessorTime)"
}} else {{
Write-Host "CPU Time: Not available"
}}
if ($process{i}.StartTime) {{
Write-Host "Start Time: $($process{i}.StartTime)"
}} else {{
Write-Host "Start Time: Not available"
}}
Write-Host "Priority: $($process{i}.BasePriority)"
Write-Host "Thread Count: $($process{i}.Threads.Count)"
# Get WMI info for command line and parent
$wmiProcess{i} = Get-WmiObject Win32_Process -Filter "ProcessId={pid}" -ErrorAction SilentlyContinue
if ($wmiProcess{i}) {{
if ($wmiProcess{i}.CommandLine) {{
Write-Host "Command Line: $($wmiProcess{i}.CommandLine)"
}} else {{
Write-Host "Command Line: Not available"
}}
if ($wmiProcess{i}.ExecutablePath) {{
Write-Host "Executable Path: $($wmiProcess{i}.ExecutablePath)"
}} else {{
Write-Host "Executable Path: Not available"
}}
# Parent process
if ($wmiProcess{i}.ParentProcessId) {{
$parentProc{i} = Get-Process -Id $wmiProcess{i}.ParentProcessId -ErrorAction SilentlyContinue
if ($parentProc{i}) {{
Write-Host "Parent Process: $($parentProc{i}.ProcessName) (PID: $($wmiProcess{i}.ParentProcessId))"
}} else {{
Write-Host "Parent Process: (PID: $($wmiProcess{i}.ParentProcessId)) - Process no longer exists"
}}
}} else {{
Write-Host "Parent Process: Not available"
}}
}}
# Check for listening ports
$tcpPorts{i} = Get-NetTCPConnection -OwningProcess {pid} -State Listen -ErrorAction SilentlyContinue
if ($tcpPorts{i}) {{
Write-Host "TCP Listening Ports:"
$tcpPorts{i} | ForEach-Object {{ Write-Host " $($_.LocalAddress):$($_.LocalPort)" }}
}}
# Check for active connections
$activePorts{i} = Get-NetTCPConnection -OwningProcess {pid} -ErrorAction SilentlyContinue | Where-Object {{ $_.State -ne "Listen" }}
if ($activePorts{i}) {{
Write-Host "Active Connections:"
$activePorts{i} | Select-Object -First 5 | ForEach-Object {{
Write-Host " $($_.LocalAddress):$($_.LocalPort) -> $($_.RemoteAddress):$($_.RemotePort) [$($_.State)]"
}}
if ($activePorts{i}.Count -gt 5) {{
Write-Host " ... and $($activePorts{i}.Count - 5) more connections"
}}
}}
# Check for child processes
$children{i} = Get-WmiObject Win32_Process -Filter "ParentProcessId={pid}" -ErrorAction SilentlyContinue
if ($children{i}) {{
Write-Host "Child Processes:"
foreach ($child in $children{i}) {{
$childProc = Get-Process -Id $child.ProcessId -ErrorAction SilentlyContinue
if ($childProc) {{
Write-Host " $($childProc.ProcessName) (PID: $($child.ProcessId))"
}}
}}
}}
# Check for direct service
$service{i} = Get-WmiObject Win32_Service | Where-Object {{ $_.ProcessId -eq {pid} }}
if ($service{i}) {{
Write-Host "Direct Service: $($service{i}.Name) - $($service{i}.DisplayName)"
}}
Write-Host ""
}} else {{
Write-Host "Process with PID {pid} not found"
Write-Host ""
}}
"""
# Add summary section
script += f"""
Write-Host "=== Summary ==="
$totalMemory = 0
$foundCount = 0
$processNames = @()
"""
for i, pid in enumerate(process_ids):
script += f"""
if ($process{i}) {{
$totalMemory += $process{i}.WorkingSet64
$foundCount++
$processNames += $process{i}.ProcessName
}}
"""
script += f"""
Write-Host "Found: $foundCount of {len(process_ids)} processes"
if ($foundCount -gt 0) {{
Write-Host "Total Memory: $([math]::Round($totalMemory / 1MB, 1)) MB"
$grouped = $processNames | Group-Object
Write-Host "Process Types:"
foreach ($group in $grouped) {{
Write-Host " $($group.Name): $($group.Count) instances"
}}
}}
"""
output = run_powershell(script)
return [TextContent(type="text", text=output)]
# Operations requiring elevation
elif name == "kill_process":
process_id = arguments.get("process_id", 0)
force = arguments.get("force", False)
if process_id <= 0:
return [TextContent(type="text", text="Error: Invalid process ID")]
if check_admin_privileges():
# Run directly if already admin
script = f"""
try {{
$process = Get-Process -Id {process_id} -ErrorAction SilentlyContinue
if ($process) {{
Write-Host "Terminating process: $($process.ProcessName) (PID: {process_id})"
Stop-Process -Id {process_id} -Force
Write-Host "Process {process_id} terminated successfully"
}} else {{
Write-Host "Process with PID {process_id} not found"
}}
}} catch {{
Write-Host "Error terminating process {process_id}: $_"
}}
"""
output = run_powershell(script)
else:
# Use UAC elevation
output = run_elevated_operation("kill-process", str(process_id), force)
return [TextContent(type="text", text=output)]
elif name == "kill_processes_by_port":
ports = arguments.get("ports", [])
force = arguments.get("force", False)
if not ports:
return [TextContent(type="text", text="Error: No ports specified")]
port_string = ",".join(map(str, ports))
if check_admin_privileges():
output = f"Would kill processes on ports {port_string} (direct admin mode - full implementation needed)"
else:
output = run_elevated_operation("kill-by-port", port_string, force)
return [TextContent(type="text", text=output)]
elif name == "kill_multiple_processes":
process_ids = arguments.get("process_ids", [])
force = arguments.get("force", False)
if not process_ids:
return [TextContent(type="text", text="Error: No process IDs specified")]
pid_string = ",".join(map(str, process_ids))
if check_admin_privileges():
output = f"Would kill processes {pid_string} (direct admin mode - full implementation needed)"
else:
output = run_elevated_operation("kill-multiple", pid_string, force)
return [TextContent(type="text", text=output)]
elif name in ["stop_service", "start_service", "restart_service"]:
service_name = arguments.get("service_name", "")
force = arguments.get("force", False)
if not service_name:
return [TextContent(type="text", text="Error: No service name specified")]
action = name.replace("_service", "").replace("_", "-")
if check_admin_privileges():
output = f"Would {action} service '{service_name}' (direct admin mode - full implementation needed)"
else:
output = run_elevated_operation(f"{action}-service", service_name, force)
return [TextContent(type="text", text=output)]
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
async def main():
"""Main function"""
logger.info("Starting Generic Windows Process and Service Management MCP Server with UAC Elevation")
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())