Skip to main content
Glama

MCP-PST-Server

mcp_server.py12.3 kB
#!/usr/bin/env python3 # MCP 客户端(Windows PST)。将常见渗透工具封装为 MCP 工具,转发到 PST API Server。 import sys import os import argparse import logging from typing import Dict, Any, Optional import requests from mcp.server.fastmcp import FastMCP # 日志配置 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) # 默认配置 DEFAULT_PST_SERVER = "http://localhost:5100" DEFAULT_REQUEST_TIMEOUT = 300 # seconds class PSTToolsClient: """Windows PST API Server 客户端""" def __init__(self, server_url: str, timeout: int = DEFAULT_REQUEST_TIMEOUT): self.server_url = server_url.rstrip("/") self.timeout = timeout logger.info(f"Initialized PST Tools Client connecting to {server_url}") def safe_get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: if params is None: params = {} url = f"{self.server_url}/{endpoint}" try: response = requests.get(url, params=params, timeout=self.timeout) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Request failed: {str(e)}") return {"error": f"Request failed: {str(e)}", "success": False} except Exception as e: logger.error(f"Unexpected error: {str(e)}") return {"error": f"Unexpected error: {str(e)}", "success": False} def safe_post(self, endpoint: str, json_data: Optional[Dict[str, Any]] = None, timeout: Optional[int] = None) -> Dict[str, Any]: if json_data is None: json_data = {} url = f"{self.server_url}/{endpoint}" try: response = requests.post(url, json=json_data, timeout=timeout or self.timeout) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Request failed: {str(e)}") return {"error": f"Request failed: {str(e)}", "success": False} except Exception as e: logger.error(f"Unexpected error: {str(e)}") return {"error": f"Unexpected error: {str(e)}", "success": False} def execute_command(self, command: str) -> Dict[str, Any]: return self.safe_post("api/command", {"command": command}) def check_health(self) -> Dict[str, Any]: return self.safe_get("health") def setup_mcp_server(pst_client: PSTToolsClient) -> FastMCP: mcp = FastMCP("pst-mcp") # 基础网络与目录枚举 @mcp.tool() def nmap_scan(target: str, scan_type: str = "-sV", ports: str = "", additional_args: str = "-T4 -Pn") -> Dict[str, Any]: data = {"target": target, "scan_type": scan_type, "ports": ports, "additional_args": additional_args} return pst_client.safe_post("api/tools/nmap", data) @mcp.tool() def httpx_probe(target: str = "", list_file: str = "", additional_args: str = "-status -title -tech-detect") -> Dict[str, Any]: data = {"target": target, "list_file": list_file, "additional_args": additional_args} return pst_client.safe_post("api/tools/httpx", data) @mcp.tool() def ffuf_scan(url: str, wordlist: str, additional_args: str = "") -> Dict[str, Any]: data = {"url": url, "wordlist": wordlist, "additional_args": additional_args} return pst_client.safe_post("api/tools/ffuf", data) @mcp.tool() def feroxbuster_scan(url: str, wordlist: str = "", additional_args: str = "") -> Dict[str, Any]: data = {"url": url, "wordlist": wordlist, "additional_args": additional_args} return pst_client.safe_post("api/tools/feroxbuster", data) # 内网综合扫描与爆破 @mcp.tool() def fscan_network(target: str, additional_args: str = "") -> Dict[str, Any]: data = {"target": target, "additional_args": additional_args} return pst_client.safe_post("api/tools/fscan", data) @mcp.tool() def hydra_attack(target: str, service: str, username: str = "", username_file: str = "", password: str = "", password_file: str = "", additional_args: str = "") -> Dict[str, Any]: data = { "target": target, "service": service, "username": username, "username_file": username_file, "password": password, "password_file": password_file, "additional_args": additional_args } return pst_client.safe_post("api/tools/hydra", data) # ProjectDiscovery 套件 @mcp.tool() def subfinder_enum(domain: str = "", list_file: str = "", additional_args: str = "-all -silent") -> Dict[str, Any]: data = {"domain": domain, "list_file": list_file, "additional_args": additional_args} return pst_client.safe_post("api/tools/subfinder", data) @mcp.tool() def dnsx_resolve(domain: str = "", list_file: str = "", additional_args: str = "-a -resp -silent") -> Dict[str, Any]: data = {"domain": domain, "list_file": list_file, "additional_args": additional_args} return pst_client.safe_post("api/tools/dnsx", data) @mcp.tool() def naabu_scan(host: str = "", list_file: str = "", ports: str = "", additional_args: str = "-silent") -> Dict[str, Any]: data = {"host": host, "list_file": list_file, "ports": ports, "additional_args": additional_args} return pst_client.safe_post("api/tools/naabu", data) @mcp.tool() def nuclei_scan(target: str = "", list_file: str = "", template: str = "", tags: str = "", severity: str = "", additional_args: str = "-silent") -> Dict[str, Any]: data = {"target": target, "list_file": list_file, "template": template, "tags": tags, "severity": severity, "additional_args": additional_args} return pst_client.safe_post("api/tools/nuclei", data) @mcp.tool() def katana_crawl(url: str = "", list_file: str = "", depth: str = "3", additional_args: str = "-silent") -> Dict[str, Any]: data = {"url": url, "list_file": list_file, "depth": depth, "additional_args": additional_args} return pst_client.safe_post("api/tools/katana", data) # 其他常用 @mcp.tool() def afrog_scan(target: str = "", list_file: str = "", pocs: str = "", additional_args: str = "") -> Dict[str, Any]: data = {"target": target, "list_file": list_file, "pocs": pocs, "additional_args": additional_args} return pst_client.safe_post("api/tools/afrog", data) @mcp.tool() def ehole_fingerprint(target: str = "", list_file: str = "", fingerprints: str = "", output: str = "", additional_args: str = "") -> Dict[str, Any]: data = {"target": target, "list_file": list_file, "fingerprints": fingerprints, "output": output, "additional_args": additional_args} return pst_client.safe_post("api/tools/ehole", data) @mcp.tool() def hackbrowserdata_dump(browser: str = "", output_dir: str = "./hbdata_output", additional_args: str = "") -> Dict[str, Any]: data = {"browser": browser, "output_dir": output_dir, "additional_args": additional_args} return pst_client.safe_post("api/tools/hackbrowserdata", data) @mcp.tool() def sqlmap_scan(url: str, data: str = "", additional_args: str = "--batch") -> Dict[str, Any]: post_data = {"url": url, "data": data, "additional_args": additional_args} return pst_client.safe_post("api/tools/sqlmap", post_data) @mcp.tool() def metasploit_console(msf_cmd: str = "", rc_file: str = "", additional_args: str = "") -> Dict[str, Any]: data = {"msf_cmd": msf_cmd, "rc_file": rc_file, "additional_args": additional_args} return pst_client.safe_post("api/tools/metasploit", data, timeout=900) @mcp.tool() def john_crack(hash_file: str, wordlist: str = "", format: str = "", mask: str = "", rules: bool = False, additional_args: str = "") -> Dict[str, Any]: data = {"hash_file": hash_file, "wordlist": wordlist, "format": format, "mask": mask, "rules": ("1" if rules else ""), "additional_args": additional_args} return pst_client.safe_post("api/tools/john", data) @mcp.tool() def nikto_scan(target: str, port: str = "", ssl: bool = False, additional_args: str = "") -> Dict[str, Any]: data = {"target": target, "port": port, "ssl": ("1" if ssl else ""), "additional_args": additional_args} return pst_client.safe_post("api/tools/nikto", data) @mcp.tool() def gobuster_scan(mode: str = "dir", url: str = "", domain: str = "", wordlist: str = "", additional_args: str = "") -> Dict[str, Any]: data = {"mode": mode, "url": url, "domain": domain, "wordlist": wordlist, "additional_args": additional_args} return pst_client.safe_post("api/tools/gobuster", data) @mcp.tool() def masscan_scan(target: str, ports: str = "", rate: str = "", iface: str = "", additional_args: str = "") -> Dict[str, Any]: data = {"target": target, "ports": ports, "rate": rate, "iface": iface, "additional_args": additional_args} return pst_client.safe_post("api/tools/masscan", data) @mcp.tool() def netcat_run(mode: str = "client", host: str = "", port: str = "", listen_port: str = "", binary: str = "", additional_args: str = "") -> Dict[str, Any]: data = {"mode": mode, "host": host, "port": port, "listen_port": listen_port, "binary": binary, "additional_args": additional_args} return pst_client.safe_post("api/tools/netcat", data) @mcp.tool() def bbot_scan(target: str = "", preset: str = "", modules: str = "", flags: str = "", output_modules: str = "", output_dir: str = "", whitelist: str = "", blacklist: str = "", additional_args: str = "") -> Dict[str, Any]: data = { "target": target, "preset": preset, "modules": modules, "flags": flags, "output_modules": output_modules, "output_dir": output_dir, "whitelist": whitelist, "blacklist": blacklist, "additional_args": additional_args } return pst_client.safe_post("api/tools/bbot", data) # 健康检查与通用命令 @mcp.tool() def server_health() -> Dict[str, Any]: return pst_client.check_health() @mcp.tool() def execute_command(command: str) -> Dict[str, Any]: return pst_client.execute_command(command) @mcp.tool() def pw_list_tools(branch: str = "main") -> Dict[str, Any]: return pst_client.safe_get("api/catalog/pentest_windows", {"branch": branch}) @mcp.tool() def pst_installed_tools() -> Dict[str, Any]: return pst_client.safe_get("api/catalog/installed") return mcp def parse_args(): parser = argparse.ArgumentParser(description="Run the Windows PST MCP Client") parser.add_argument("--server", type=str, default=DEFAULT_PST_SERVER, help=f"PST API server URL (default: {DEFAULT_PST_SERVER})") parser.add_argument("--timeout", type=int, default=DEFAULT_REQUEST_TIMEOUT, help=f"Request timeout in seconds (default: {DEFAULT_REQUEST_TIMEOUT})") parser.add_argument("--debug", action="store_true", help="Enable debug logging") return parser.parse_args() def main(): args = parse_args() if args.debug: logger.setLevel(logging.DEBUG) pst_client = PSTToolsClient(args.server, args.timeout) health = pst_client.check_health() if "error" in health: logger.warning(f"Unable to connect to PST API server at {args.server}: {health['error']}") logger.warning("MCP server will start, but tool execution may fail") else: logger.info(f"Connected to PST API server at {args.server}") logger.info(f"Server health status: {health.get('status')}") if not health.get("all_essential_tools_available", False): missing = [t for t, ok in health.get("tools_status", {}).items() if not ok] if missing: logger.warning(f"Missing tools: {', '.join(missing)}") mcp = setup_mcp_server(pst_client) logger.info("Starting PST MCP server") mcp.run() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/arch3rPro/MCP-PST-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server