#!/usr/bin/env python3
# MCP 客户端(Windows PST)。将常见渗透工具封装为 MCP 工具,转发到 PST API Server。
import sys
import os
import argparse
import logging
import tempfile
import shutil
import time
import uuid
import traceback
import json
from typing import Dict, Any, Optional, List, Callable
from functools import wraps
import requests
import uvicorn
from mcp.server.fastmcp import FastMCP
# 日志配置
def setup_logging(debug: bool = False, log_file: Optional[str] = None):
"""设置日志配置"""
level = logging.DEBUG if debug else logging.INFO
# 创建格式化器
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s - %(message)s"
)
# 设置处理器
handlers = [logging.StreamHandler(sys.stdout)]
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
handlers.append(file_handler)
# 配置根日志记录器
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
handlers=handlers,
force=True
)
return logging.getLogger(__name__)
# 全局日志记录器
logger = logging.getLogger(__name__)
# 性能监控装饰器
def monitor_performance(func: Callable) -> Callable:
"""监控函数执行性能的装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
function_name = func.__name__
try:
logger.debug(f"开始执行函数: {function_name}")
result = func(*args, **kwargs)
execution_time = time.time() - start_time
logger.info(f"函数 {function_name} 执行成功,耗时: {execution_time:.2f}秒")
return result
except Exception as e:
execution_time = time.time() - start_time
logger.error(f"函数 {function_name} 执行失败,耗时: {execution_time:.2f}秒,错误: {str(e)}")
logger.debug(f"函数 {function_name} 异常详情:\n{traceback.format_exc()}")
raise
return wrapper
# 错误处理装饰器
def handle_errors(error_message: str = "操作失败", return_on_error: Any = None, log_error: bool = True) -> Callable:
"""错误处理装饰器"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
if log_error:
logger.error(f"{error_message}: {str(e)}")
logger.debug(f"异常详情:\n{traceback.format_exc()}")
return return_on_error
return wrapper
return decorator
# 重试机制装饰器
def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0, exceptions: tuple = (Exception,)):
"""重试机制装饰器"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
attempt = 0
current_delay = delay
while attempt < max_attempts:
try:
return func(*args, **kwargs)
except exceptions as e:
attempt += 1
if attempt >= max_attempts:
logger.error(f"函数 {func.__name__} 在 {max_attempts} 次尝试后仍然失败: {str(e)}")
raise
logger.warning(f"函数 {func.__name__} 第 {attempt} 次尝试失败: {str(e)},{current_delay}秒后重试")
time.sleep(current_delay)
current_delay *= backoff
return wrapper
return decorator
# 默认配置
DEFAULT_PST_SERVER = "http://localhost:5100"
DEFAULT_REQUEST_TIMEOUT = 300 # seconds
class TempFileManager:
"""临时文件管理器,用于跟踪和清理临时文件"""
def __init__(self):
self.temp_files = {} # {file_id: {"path": path, "created_at": timestamp, "purpose": purpose}}
self.temp_dirs = [] # 临时目录列表
self._lock = False # 简单的锁机制,防止并发操作
logger.debug("初始化临时文件管理器")
@monitor_performance
@handle_errors("创建临时文件失败", None)
def create_temp_file(self, content: str = "", suffix: str = ".txt", prefix: str = "mcp_") -> str:
"""创建临时文件并返回文件路径"""
if self._lock:
logger.warning("临时文件管理器正忙,请稍后再试")
raise Exception("临时文件管理器正忙")
try:
temp_file = tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix=suffix, prefix=prefix)
if content:
temp_file.write(content)
temp_file.close()
file_id = str(uuid.uuid4())
self.temp_files[file_id] = {
"path": temp_file.name,
"created_at": time.time(),
"purpose": "general"
}
logger.info(f"创建临时文件成功: {temp_file.name} (ID: {file_id})")
return temp_file.name
except Exception as e:
logger.error(f"创建临时文件失败: {str(e)}")
raise
@monitor_performance
@handle_errors("创建指定ID的临时文件失败", None)
def create_temp_file_with_id(self, file_id: str, content: str = "", suffix: str = ".txt", prefix: str = "mcp_") -> str:
"""创建带指定ID的临时文件"""
if self._lock:
logger.warning("临时文件管理器正忙,请稍后再试")
raise Exception("临时文件管理器正忙")
if file_id in self.temp_files:
logger.warning(f"文件ID {file_id} 已存在,将覆盖原有文件")
self.delete_temp_file(file_id)
try:
temp_file = tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix=suffix, prefix=prefix)
if content:
temp_file.write(content)
temp_file.close()
self.temp_files[file_id] = {
"path": temp_file.name,
"created_at": time.time(),
"purpose": "general"
}
logger.info(f"创建带ID的临时文件成功: {file_id} -> {temp_file.name}")
return temp_file.name
except Exception as e:
logger.error(f"创建带ID的临时文件失败: {str(e)}")
raise
@handle_errors("获取临时文件路径失败", None)
def get_temp_file_path(self, file_id: str) -> Optional[str]:
"""根据ID获取临时文件路径"""
if file_id in self.temp_files:
return self.temp_files[file_id]["path"]
logger.warning(f"未找到ID为 {file_id} 的临时文件")
return None
@monitor_performance
@handle_errors("删除临时文件失败", False)
def delete_temp_file(self, file_id: str) -> bool:
"""根据ID删除临时文件"""
if file_id not in self.temp_files:
logger.warning(f"尝试删除不存在的临时文件ID: {file_id}")
return False
file_path = self.temp_files[file_id]["path"]
try:
if os.path.exists(file_path):
os.unlink(file_path)
logger.info(f"删除临时文件成功: {file_path} (ID: {file_id})")
else:
logger.warning(f"尝试删除不存在的临时文件: {file_path}")
del self.temp_files[file_id]
return True
except Exception as e:
logger.error(f"删除临时文件失败 {file_path}: {str(e)}")
return False
@monitor_performance
@handle_errors("根据路径删除临时文件失败", False)
def delete_temp_file_by_path(self, file_path: str) -> bool:
"""根据路径删除临时文件"""
for file_id, file_info in list(self.temp_files.items()):
if file_info["path"] == file_path:
return self.delete_temp_file(file_id)
# 如果不在跟踪列表中,尝试直接删除
try:
if os.path.exists(file_path):
os.unlink(file_path)
logger.info(f"删除未跟踪的临时文件成功: {file_path}")
return True
else:
logger.warning(f"尝试删除不存在的未跟踪临时文件: {file_path}")
except Exception as e:
logger.error(f"删除未跟踪的临时文件失败 {file_path}: {str(e)}")
return False
@monitor_performance
@handle_errors("创建临时目录失败", "")
def create_temp_dir(self, prefix: str = "mcp_") -> str:
"""创建临时目录"""
try:
temp_dir = tempfile.mkdtemp(prefix=prefix)
self.temp_dirs.append(temp_dir)
logger.info(f"创建临时目录成功: {temp_dir}")
return temp_dir
except Exception as e:
logger.error(f"创建临时目录失败: {str(e)}")
raise
@monitor_performance
@handle_errors("清理临时文件失败", 0)
def cleanup_temp_files(self, max_age_hours: float = 24.0) -> int:
"""清理超过指定时间的临时文件"""
current_time = time.time()
max_age_seconds = max_age_hours * 3600
deleted_count = 0
logger.info(f"开始清理超过 {max_age_hours} 小时的临时文件")
# 清理临时文件
for file_id, file_info in list(self.temp_files.items()):
age_seconds = current_time - file_info["created_at"]
if age_seconds > max_age_seconds:
if self.delete_temp_file(file_id):
deleted_count += 1
# 清理临时目录
for temp_dir in self.temp_dirs[:]:
try:
dir_stat = os.stat(temp_dir)
age_seconds = current_time - dir_stat.st_mtime
if age_seconds > max_age_seconds:
shutil.rmtree(temp_dir)
self.temp_dirs.remove(temp_dir)
logger.info(f"删除临时目录成功: {temp_dir}")
deleted_count += 1
except Exception as e:
logger.error(f"删除临时目录失败 {temp_dir}: {str(e)}")
logger.info(f"清理完成,共删除 {deleted_count} 个超过 {max_age_hours} 小时的临时项目")
return deleted_count
@monitor_performance
@handle_errors("清理所有临时文件失败", 0)
def cleanup_all_temp_files(self) -> int:
"""清理所有临时文件和目录"""
deleted_count = 0
logger.info("开始清理所有临时文件和目录")
# 清理所有临时文件
for file_id in list(self.temp_files.keys()):
if self.delete_temp_file(file_id):
deleted_count += 1
# 清理所有临时目录
for temp_dir in self.temp_dirs[:]:
try:
shutil.rmtree(temp_dir)
self.temp_dirs.remove(temp_dir)
logger.info(f"删除临时目录成功: {temp_dir}")
deleted_count += 1
except Exception as e:
logger.error(f"删除临时目录失败 {temp_dir}: {str(e)}")
logger.info(f"清理完成,共删除 {deleted_count} 个临时项目")
return deleted_count
@handle_errors("列出临时文件失败", {})
def list_temp_files(self) -> Dict[str, Dict[str, Any]]:
"""列出所有临时文件信息"""
result = {}
try:
for file_id, file_info in self.temp_files.items():
file_exists = os.path.exists(file_info["path"])
file_size = 0
if file_exists:
try:
file_size = os.path.getsize(file_info["path"])
except Exception:
pass
result[file_id] = {
"path": file_info["path"],
"created_at": file_info["created_at"],
"age_hours": (time.time() - file_info["created_at"]) / 3600,
"purpose": file_info["purpose"],
"exists": file_exists,
"size_bytes": file_size
}
except Exception as e:
logger.error(f"列出临时文件失败: {str(e)}")
raise
return result
@handle_errors("获取临时文件统计信息失败", {})
def get_temp_file_stats(self) -> Dict[str, Any]:
"""获取临时文件统计信息"""
total_files = len(self.temp_files)
total_dirs = len(self.temp_dirs)
total_size = 0
existing_files = 0
try:
for file_info in self.temp_files.values():
try:
if os.path.exists(file_info["path"]):
total_size += os.path.getsize(file_info["path"])
existing_files += 1
except Exception:
pass
except Exception as e:
logger.error(f"计算临时文件大小失败: {str(e)}")
return {
"total_files": total_files,
"existing_files": existing_files,
"total_directories": total_dirs,
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2)
}
# 全局临时文件管理器实例
# 确保在所有装饰器和依赖项定义后正确初始化
global_temp_file_manager = None
def init_temp_file_manager():
"""初始化临时文件管理器"""
global global_temp_file_manager
if global_temp_file_manager is None:
global_temp_file_manager = TempFileManager()
logger.info("临时文件管理器初始化成功")
return global_temp_file_manager
# 立即初始化临时文件管理器
global_temp_file_manager = init_temp_file_manager()
class PSTToolsClient:
"""Windows PST API Server 客户端"""
def __init__(self, server_url: str, timeout: int = DEFAULT_REQUEST_TIMEOUT):
self.server_url = server_url.rstrip("/")
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/json",
"User-Agent": "MCP-PST-Client/1.0"
})
logger.info(f"初始化PST工具客户端,连接到: {server_url}")
@monitor_performance
@retry(max_attempts=3, delay=1.0, backoff=2.0, exceptions=(requests.exceptions.ConnectionError, requests.exceptions.Timeout))
def safe_get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""安全的GET请求,带有重试机制"""
if params is None:
params = {}
url = f"{self.server_url}/{endpoint}"
logger.debug(f"发送GET请求到: {url},参数: {params}")
try:
response = self.session.get(url, params=params, timeout=self.timeout)
response.raise_for_status()
# 尝试解析JSON响应
try:
result = response.json()
logger.debug(f"GET请求成功,响应状态: {response.status_code}")
return result
except json.JSONDecodeError as e:
logger.error(f"解析JSON响应失败: {str(e)}")
return {
"error": f"解析JSON响应失败: {str(e)}",
"success": False,
"response_text": response.text[:500] # 只返回前500个字符
}
except requests.exceptions.RequestException as e:
logger.error(f"GET请求失败: {str(e)}")
return {"error": f"GET请求失败: {str(e)}", "success": False}
except Exception as e:
logger.error(f"GET请求发生意外错误: {str(e)}")
return {"error": f"GET请求发生意外错误: {str(e)}", "success": False}
@monitor_performance
@retry(max_attempts=3, delay=1.0, backoff=2.0, exceptions=(requests.exceptions.ConnectionError, requests.exceptions.Timeout))
def safe_post(self, endpoint: str, json_data: Optional[Dict[str, Any]] = None, timeout: Optional[int] = None) -> Dict[str, Any]:
"""安全的POST请求,带有重试机制"""
if json_data is None:
json_data = {}
url = f"{self.server_url}/{endpoint}"
logger.debug(f"发送POST请求到: {url},数据: {json_data}")
try:
response = self.session.post(url, json=json_data, timeout=timeout or self.timeout)
response.raise_for_status()
# 尝试解析JSON响应
try:
result = response.json()
logger.debug(f"POST请求成功,响应状态: {response.status_code}")
return result
except json.JSONDecodeError as e:
logger.error(f"解析JSON响应失败: {str(e)}")
return {
"error": f"解析JSON响应失败: {str(e)}",
"success": False,
"response_text": response.text[:500] # 只返回前500个字符
}
except requests.exceptions.RequestException as e:
logger.error(f"POST请求失败: {str(e)}")
return {"error": f"POST请求失败: {str(e)}", "success": False}
except Exception as e:
logger.error(f"POST请求发生意外错误: {str(e)}")
return {"error": f"POST请求发生意外错误: {str(e)}", "success": False}
@monitor_performance
@handle_errors("执行命令失败", {"error": "命令执行失败", "success": False})
def execute_command(self, command: str) -> Dict[str, Any]:
"""执行命令"""
logger.info(f"执行命令: {command}")
return self.safe_post("api/command", {"command": command})
@monitor_performance
@handle_errors("健康检查失败", {"error": "健康检查失败", "success": False})
def check_health(self) -> Dict[str, Any]:
"""检查服务器健康状态"""
logger.debug("执行健康检查")
return self.safe_get("health")
@monitor_performance
def close(self):
"""关闭客户端会话"""
try:
self.session.close()
logger.info("PST工具客户端会话已关闭")
except Exception as e:
logger.error(f"关闭客户端会话失败: {str(e)}")
def setup_mcp_server(pst_client: PSTToolsClient) -> FastMCP:
mcp = FastMCP("pst-mcp")
# 基础网络与目录枚举
@mcp.tool()
def nmap_scan(
target: str,
scan_type: str = "-sV",
ports: str = "",
timing: str = "T4",
ping_scan: str = "-Pn",
os_detection: str = "",
service_detection: str = "",
script_scan: str = "",
script_args: str = "",
output_format: str = "",
output_file: str = "",
min_rate: str = "",
max_rate: str = "",
additional_args: str = ""
) -> Dict[str, Any]:
"""
Nmap是一款强大的网络扫描和安全审计工具,用于发现网络中的主机、服务和潜在的安全漏洞。
"""
data = {
"target": target,
"scan_type": scan_type,
"ports": ports,
"timing": timing,
"ping_scan": ping_scan,
"os_detection": os_detection,
"service_detection": service_detection,
"script_scan": script_scan,
"script_args": script_args,
"output_format": output_format,
"output_file": output_file,
"min_rate": min_rate,
"max_rate": max_rate,
"additional_args": additional_args
}
return pst_client.safe_post("api/tools/nmap", data)
@mcp.tool()
def rustscan_scan(
target: str = "",
addresses: str = "",
ports: str = "",
port_range: str = "",
batch_size: str = "",
timeout: str = "",
tries: str = "",
scan_order: str = "",
udp: bool = False,
greppable: bool = False,
accessible: bool = False,
no_banner: bool = False,
scripts: str = "default",
nmap_args: str = "",
additional_args: str = ""
) -> Dict[str, Any]:
"""RustScan高速端口扫描器,快速发现开放端口并自动调用Nmap进行深度分析"""
data = {
"target": target,
"addresses": addresses,
"ports": ports,
"port_range": port_range,
"batch_size": batch_size,
"timeout": timeout,
"tries": tries,
"scan_order": scan_order,
"udp": str(udp) if udp else "",
"greppable": str(greppable) if greppable else "",
"accessible": str(accessible) if accessible else "",
"no_banner": str(no_banner) if no_banner else "",
"scripts": scripts,
"nmap_args": nmap_args,
"additional_args": additional_args
}
return pst_client.safe_post("api/tools/rustscan", data)
@mcp.tool()
def httpx_probe(
target: str = "",
list_file: str = "",
status_code: bool = True,
title: bool = False,
tech_detect: bool = False,
silent: bool = False,
output_file: str = "",
additional_args: str = ""
) -> Dict[str, Any]:
"""
HTTPX是一款快速且多功能的HTTP探测器,用于发现和分析Web服务器。
"""
post_data = {
"target": target,
"list_file": list_file,
"status_code": "1" if status_code else "",
"title": "1" if title else "",
"tech_detect": "1" if tech_detect else "",
"silent": "1" if silent else "",
"output_file": output_file,
"additional_args": additional_args
}
return pst_client.safe_post("api/tools/httpx", post_data)
@mcp.tool()
def ffuf_scan(
url: str,
wordlist: str,
method: str = "GET",
data: str = "",
headers: str = "",
extensions: str = "",
match_status: str = "200,204,301,302,307,401,403,405,500",
filter_status: str = "",
filter_size: str = "",
threads: str = "40",
recursion: bool = False,
output_file: str = "",
additional_args: str = ""
) -> Dict[str, Any]:
"""
快速的Web模糊测试工具,用于发现隐藏的目录、文件和参数。
"""
post_data = {
"url": url,
"wordlist": wordlist,
"method": method,
"data": data,
"headers": headers,
"extensions": extensions,
"match_status": match_status,
"filter_status": filter_status,
"filter_size": filter_size,
"threads": threads,
"recursion": "1" if recursion else "",
"output_file": output_file,
"additional_args": additional_args
}
return pst_client.safe_post("api/tools/ffuf", post_data)
@mcp.tool()
def feroxbuster_scan(
url: str = "",
extensions: str = "",
methods: str = "",
headers: str = "",
cookies: str = "",
status_codes: str = "",
threads: str = "",
no_recursion: bool = False,
depth: str = "",
wordlist: str = "",
rate_limit: str = "",
time_limit: str = "",
silent: bool = False,
json: bool = False,
output: str = "",
burp: bool = False,
additional_args: str = "",
seclists_path: str = "D:\\Global\\apps\\SecLists\\current"
) -> Dict[str, Any]:
"""
快速、灵活的目录和文件扫描工具,专为Web应用安全测试设计。
"""
post_data = {
"url": url,
"extensions": extensions,
"methods": methods,
"headers": headers,
"cookies": cookies,
"status_codes": status_codes,
"threads": threads,
"no_recursion": "1" if no_recursion else "",
"depth": depth,
"wordlist": wordlist,
"rate_limit": rate_limit,
"time_limit": time_limit,
"silent": "1" if silent else "",
"json": "1" if json else "",
"output": output,
"burp": "1" if burp else "",
"additional_args": additional_args,
"seclists_path": seclists_path
}
return pst_client.safe_post("api/tools/feroxbuster", post_data)
# 内网综合扫描与爆破
@mcp.tool()
def fscan_network(
target: str,
ports: str = "",
username: str = "",
password: str = "",
scan_mode: str = "All",
threads: str = "60",
timeout: str = "3",
url: str = "",
proxy: str = "",
output_file: str = "result.txt",
additional_args: str = ""
) -> Dict[str, Any]:
"""
Fscan是一款内网综合扫描工具,集成了端口扫描、服务识别、漏洞检测和密码爆破等功能。
"""
post_data = {
"target": target,
"ports": ports,
"username": username,
"password": password,
"scan_mode": scan_mode,
"threads": threads,
"timeout": timeout,
"url": url,
"proxy": proxy,
"output_file": output_file,
"additional_args": additional_args
}
return pst_client.safe_post("api/tools/fscan", post_data)
@mcp.tool()
def hydra_attack(
target: str,
service: str,
username: str = "",
username_file: str = "",
password: str = "",
password_file: str = "",
port: str = "",
tasks: str = "",
wait_time: str = "",
timeout: str = "",
login_attempts: str = "",
retry_time: str = "",
exit_on_success: bool = False,
skip_default_passwords: bool = False,
skip_empty_passwords: bool = False,
skip_login: bool = False,
use_ssl: bool = False,
additional_args: str = ""
) -> Dict[str, Any]:
"""
THC Hydra是一款强大的密码爆破工具,支持多种协议和服务。
"""
post_data = {
"target": target,
"service": service,
"username": username,
"username_file": username_file,
"password": password,
"password_file": password_file,
"port": port,
"tasks": tasks,
"wait_time": wait_time,
"timeout": timeout,
"login_attempts": login_attempts,
"retry_time": retry_time,
"exit_on_success": "1" if exit_on_success else "",
"skip_default_passwords": "1" if skip_default_passwords else "",
"skip_empty_passwords": "1" if skip_empty_passwords else "",
"skip_login": "1" if skip_login else "",
"use_ssl": "1" if use_ssl else "",
"additional_args": additional_args
}
return pst_client.safe_post("api/tools/hydra", post_data)
# ProjectDiscovery 套件
@mcp.tool()
def subfinder_enum(
domain: str = "",
list_file: str = "",
sources: str = "",
exclude_sources: str = "",
recursive: bool = False,
all_sources: bool = False,
match: str = "",
filter: str = "",
rate_limit: str = "",
silent: bool = True,
json_output: bool = False,
verbose: bool = False,
timeout: str = "",
max_time: str = "",
active: bool = False,
config: str = "",
provider_config: str = "",
resolvers: str = "",
proxy: str = "",
additional_args: str = ""
) -> Dict[str, Any]:
"""Subfinder被动子域名枚举工具,快速发现目标域名的所有子域名"""
data = {
"domain": domain,
"list_file": list_file,
"sources": sources,
"exclude_sources": exclude_sources,
"recursive": "1" if recursive else "",
"all": "1" if all_sources else "",
"match": match,
"filter": filter,
"rate_limit": rate_limit,
"silent": "1" if silent else "",
"json_output": "1" if json_output else "",
"verbose": "1" if verbose else "",
"timeout": timeout,
"max_time": max_time,
"active": "1" if active else "",
"config": config,
"provider_config": provider_config,
"resolvers": resolvers,
"proxy": proxy,
"additional_args": additional_args
}
return pst_client.safe_post("api/tools/subfinder", data)
@mcp.tool()
def dnsx_resolve(
domain: str = "",
list_file: str = "",
wordlist: str = "",
query_type: str = "a",
response: bool = False,
response_only: bool = False,
json_output: bool = False,
silent: bool = True,
threads: str = "100",
rate_limit: str = "",
resolvers: str = "",
retry: str = "2",
timeout: str = "3",
cdn: bool = False,
asn: bool = False,
wildcard_threshold: str = "5",
additional_args: str = ""
) -> Dict[str, Any]:
"""DNS解析工具,支持多种DNS记录类型查询和子域名验证"""
data = {
"domain": domain,
"list_file": list_file,
"wordlist": wordlist,
"query_type": query_type,
"response": "1" if response else "",
"response_only": "1" if response_only else "",
"json_output": "1" if json_output else "",
"silent": "1" if silent else "",
"threads": threads,
"rate_limit": rate_limit,
"resolvers": resolvers,
"retry": retry,
"timeout": timeout,
"cdn": "1" if cdn else "",
"asn": "1" if asn else "",
"wildcard_threshold": wildcard_threshold,
"additional_args": additional_args
}
return pst_client.safe_post("api/tools/dnsx", data)
@mcp.tool()
def naabu_scan(host: str = "", list_file: str = "", ports: str = "", additional_args: str = "-silent") -> Dict[str, Any]:
data = {"host": host, "list_file": list_file, "ports": ports, "additional_args": additional_args}
return pst_client.safe_post("api/tools/naabu", data)
@mcp.tool()
def nuclei_scan(target: str = "", list_file: str = "", template: str = "", tags: str = "", severity: str = "", additional_args: str = "-silent") -> Dict[str, Any]:
data = {"target": target, "list_file": list_file, "template": template, "tags": tags, "severity": severity, "additional_args": additional_args}
return pst_client.safe_post("api/tools/nuclei", data)
@mcp.tool()
def katana_crawl(url: str = "", list_file: str = "", depth: str = "3", additional_args: str = "-silent") -> Dict[str, Any]:
data = {"url": url, "list_file": list_file, "depth": depth, "additional_args": additional_args}
return pst_client.safe_post("api/tools/katana", data)
# 其他常用
@mcp.tool()
def afrog_scan(target: str = "", list_file: str = "", pocs: str = "", additional_args: str = "") -> Dict[str, Any]:
data = {"target": target, "list_file": list_file, "pocs": pocs, "additional_args": additional_args}
return pst_client.safe_post("api/tools/afrog", data)
@mcp.tool()
def ehole_fingerprint(target: str = "", list_file: str = "", fingerprints: str = "", output: str = "", additional_args: str = "") -> Dict[str, Any]:
data = {"target": target, "list_file": list_file, "fingerprints": fingerprints, "output": output, "additional_args": additional_args}
return pst_client.safe_post("api/tools/ehole", data)
@mcp.tool()
def hackbrowserdata_dump(browser: str = "", output_dir: str = "./hbdata_output", additional_args: str = "") -> Dict[str, Any]:
data = {"browser": browser, "output_dir": output_dir, "additional_args": additional_args}
return pst_client.safe_post("api/tools/hackbrowserdata", data)
@mcp.tool()
def sqlmap_scan(
url: str,
data: str = "",
cookie: str = "",
user_agent: str = "",
referer: str = "",
headers: str = "",
proxy: str = "",
level: str = "1",
risk: str = "1",
threads: str = "1",
dbms: str = "",
technique: str = "",
os: str = "",
batch: bool = False,
fresh_queries: bool = False,
tamper: str = "",
random_agent: bool = False,
output_file: str = "",
additional_args: str = ""
) -> Dict[str, Any]:
"""
自动化的SQL注入和数据库接管工具,能够检测和利用SQL注入漏洞。
"""
post_data = {
"url": url,
"data": data,
"cookie": cookie,
"user_agent": user_agent,
"referer": referer,
"headers": headers,
"proxy": proxy,
"level": level,
"risk": risk,
"threads": threads,
"dbms": dbms,
"technique": technique,
"os": os,
"batch": "1" if batch else "",
"fresh_queries": "1" if fresh_queries else "",
"tamper": tamper,
"random_agent": "1" if random_agent else "",
"output_file": output_file,
"additional_args": additional_args
}
return pst_client.safe_post("api/tools/sqlmap", post_data)
# 企业信息收集工具
@mcp.tool()
def ens_scan(
keyword: str = "",
company_id: str = "",
input_file: str = "",
is_pid: bool = False,
scan_type: str = "aqc",
field: str = "",
invest: str = "",
deep: int = 0,
hold: bool = False,
supplier: bool = False,
branch: bool = False,
is_branch: bool = False,
branch_filter: str = "",
out_dir: str = "outs",
out_type: str = "xlsx",
json: bool = False,
no_merge: bool = False,
is_show: bool = True,
delay: int = 0,
proxy: str = "",
timeout: int = 1,
debug: bool = False,
additional_args: str = ""
) -> Dict[str, Any]:
"""
ENScan是一款企业信息收集工具,专门用于获取企业工商信息、股权结构、投资关系等数据。
"""
post_data = {
"keyword": keyword,
"company_id": company_id,
"input_file": input_file,
"is_pid": "1" if is_pid else "",
"scan_type": scan_type,
"field": field,
"invest": invest,
"deep": str(deep),
"hold": "1" if hold else "",
"supplier": "1" if supplier else "",
"branch": "1" if branch else "",
"is_branch": "1" if is_branch else "",
"branch_filter": branch_filter,
"out_dir": out_dir,
"out_type": out_type,
"json": "1" if json else "",
"no_merge": "1" if no_merge else "",
"is_show": "1" if is_show else "",
"delay": str(delay),
"proxy": proxy,
"timeout": str(timeout),
"debug": "1" if debug else "",
"additional_args": additional_args
}
return pst_client.safe_post("api/tools/enscan", post_data)
@mcp.tool()
def metasploit_console(msf_cmd: str = "", rc_file: str = "", additional_args: str = "") -> Dict[str, Any]:
data = {"msf_cmd": msf_cmd, "rc_file": rc_file, "additional_args": additional_args}
return pst_client.safe_post("api/tools/metasploit", data, timeout=900)
@mcp.tool()
def john_crack(hash_file: str, wordlist: str = "", format: str = "", mask: str = "", rules: bool = False, additional_args: str = "") -> Dict[str, Any]:
data = {"hash_file": hash_file, "wordlist": wordlist, "format": format, "mask": mask, "rules": ("1" if rules else ""), "additional_args": additional_args}
return pst_client.safe_post("api/tools/john", data)
@mcp.tool()
def searchsploit_exploit(
term: str = "",
title: bool = False,
path: bool = False,
url: bool = False,
port: bool = False,
author: bool = False,
platform: str = "",
type: str = "",
date: str = "",
nmap: str = "",
vulns: str = "",
cve: str = "",
cwe: str = "",
cbb: str = "",
msb: str = "",
osvdb: str = "",
bid: str = "",
edb: str = "",
github: str = "",
exploitdb: str = "",
packetstorm: str = "",
zdi: str = "",
additional_args: str = ""
) -> Dict[str, Any]:
"""
SearchSploit是Exploit-DB的命令行搜索工具。
"""
post_data = {
"term": term,
"title": "1" if title else "",
"path": "1" if path else "",
"url": "1" if url else "",
"port": "1" if port else "",
"author": "1" if author else "",
"platform": platform,
"type": type,
"date": date,
"nmap": nmap,
"vulns": vulns,
"cve": cve,
"cwe": cwe,
"cbb": cbb,
"msb": msb,
"osvdb": osvdb,
"bid": bid,
"edb": edb,
"github": github,
"exploitdb": exploitdb,
"packetstorm": packetstorm,
"zdi": zdi,
"additional_args": additional_args
}
return pst_client.safe_post("api/tools/searchsploit", post_data)
@mcp.tool()
def nikto_scan(target: str, port: str = "", ssl: bool = False, additional_args: str = "") -> Dict[str, Any]:
data = {"target": target, "port": port, "ssl": ("1" if ssl else ""), "additional_args": additional_args}
return pst_client.safe_post("api/tools/nikto", data)
@mcp.tool()
def seclists_wordlist(
action: str = "list",
category: str = "",
list_type: str = "",
output_file: str = "",
seclists_path: str = "D:\\Global\\apps\\SecLists\\current",
copy_to_workdir: bool = False,
workdir_path: str = "",
return_content: bool = False,
max_lines: int = 1000,
additional_args: str = ""
) -> Dict[str, Any]:
"""
安全测试领域最全面的字典文件集合,为其他爆破和扫描工具提供各种类型的字典文件。
"""
post_data = {
"action": action,
"category": category,
"list_type": list_type,
"output_file": output_file,
"seclists_path": seclists_path,
"copy_to_workdir": copy_to_workdir,
"workdir_path": workdir_path,
"return_content": return_content,
"max_lines": max_lines,
"additional_args": additional_args
}
return pst_client.safe_post("api/tools/seclists", post_data)
@mcp.tool()
def gobuster_scan(
mode: str = "dir",
url: str = "",
domain: str = "",
wordlist: str = "",
additional_args: str = ""
) -> Dict[str, Any]:
data = {"mode": mode, "url": url, "domain": domain, "wordlist": wordlist, "additional_args": additional_args}
return pst_client.safe_post("api/tools/gobuster", data)
@mcp.tool()
def masscan_scan(target: str, ports: str = "", rate: str = "", iface: str = "", additional_args: str = "") -> Dict[str, Any]:
data = {"target": target, "ports": ports, "rate": rate, "iface": iface, "additional_args": additional_args}
return pst_client.safe_post("api/tools/masscan", data)
@mcp.tool()
def netcat_run(mode: str = "client", host: str = "", port: str = "", listen_port: str = "", binary: str = "", additional_args: str = "") -> Dict[str, Any]:
data = {"mode": mode, "host": host, "port": port, "listen_port": listen_port, "binary": binary, "additional_args": additional_args}
return pst_client.safe_post("api/tools/netcat", data)
@mcp.tool()
def bbot_scan(target: str = "", preset: str = "", modules: str = "", flags: str = "", output_modules: str = "", output_dir: str = "", whitelist: str = "", blacklist: str = "", additional_args: str = "") -> Dict[str, Any]:
data = {
"target": target,
"preset": preset,
"modules": modules,
"flags": flags,
"output_modules": output_modules,
"output_dir": output_dir,
"whitelist": whitelist,
"blacklist": blacklist,
"additional_args": additional_args
}
return pst_client.safe_post("api/tools/bbot", data)
@mcp.tool()
def amass_scan(
target: str = "",
mode: str = "enum",
passive: bool = False,
active: bool = True,
brute: bool = False,
min_for_recursive: int = 1,
max_dns_queries: int = 10000,
timeout: int = 30,
include_unresolvable: bool = False,
output_file: str = "",
additional_args: str = ""
) -> Dict[str, Any]:
"""
OWASP Amass是一款强大的开源网络攻击面映射和资产发现工具。
"""
post_data = {
"target": target,
"mode": mode,
"passive": "1" if passive else "",
"active": "1" if active else "",
"brute": "1" if brute else "",
"min_for_recursive": str(min_for_recursive),
"max_dns_queries": str(max_dns_queries),
"timeout": str(timeout),
"include_unresolvable": "1" if include_unresolvable else "",
"output_file": output_file,
"additional_args": additional_args
}
return pst_client.safe_post("api/tools/amass", post_data)
@mcp.prompt()
def seclists_wordlist_guide() -> str:
"""
# SecLists字典文件使用指南
## 核心用法
```python
# 查看可用字典分类
seclists_wordlist(action="list")
# 获取密码字典
seclists_wordlist(action="list", category="passwords", list_type="common")
# 获取Web内容发现字典并复制到工作目录
seclists_wordlist(action="list", category="discovery", list_type="web-content", copy_to_workdir=True)
# 使用自定义SecLists路径
seclists_wordlist(action="list", seclists_path="C:\\CustomPath\\SecLists")
```
## 核心参数
- action: 操作类型,默认"list"
- category: 字典分类(passwords/usernames/discovery/fuzzing)
- list_type: 字典类型(common/default/top10k等)
- seclists_path: SecLists安装路径,默认"D:\\Global\\apps\\SecLists\\current"
- copy_to_workdir: 是否复制到工作目录,默认False
- workdir_path: 工作目录路径,默认当前工作目录
- return_content: 是否返回内存内容,默认False
- max_lines: 内存传递时的最大行数,默认1000
## 字典分类
- Passwords: common(常见密码)、rockyou(泄露密码)、top10k/top100k(按频率排序)
- Usernames: common(常见用户名)、names(基于姓名)
- Discovery: web-content(目录文件)、dns(子域名)、api(端点发现)
- Fuzzing: common(通用载荷)、web(Web应用)、xss(XSS测试)、sqli(SQL注入)
## 路径配置
系统默认使用"D:\\Global\\apps\\SecLists\\current"作为SecLists安装路径。如果安装在其他位置,通过seclists_path参数指定。系统会自动验证路径是否存在。
## 注意事项
- SecLists主要为其他工具提供字典文件,本身不执行扫描
- 字典文件通常较大,建议根据需要选择特定类型
- 使用字典文件进行爆破时请确保已获得合法授权
- 如果默认路径不存在,请使用seclists_path参数指定正确的安装路径
- 使用copy_to_workdir=True可以解决与其他工具的文件路径同步问题
- 复制的文件会保留原始文件名,如果目标目录已存在同名文件,将被覆盖
- 使用return_content=True时,注意内存使用情况,特别是对于大型字典文件
"""
@mcp.tool()
def githacker_git_leak(
url: str,
output: str = "githacker_output",
threads: int = 10,
additional_args: Optional[str] = None
) -> str:
"""
使用GitHacker工具从泄露的.git仓库中恢复源代码
"""
return client.execute_tool(
"githacker",
url=url,
output=output,
threads=threads,
additional_args=additional_args
)
@mcp.tool()
def gowitness_screenshot(
scan_type: str = "single",
url: str = "",
file: str = "",
cidr: str = "",
nmap_file: str = "",
nessus_file: str = "",
threads: int = 10,
timeout: int = 30,
delay: int = 0,
screenshot_path: str = "gowitness",
screenshot_format: str = "png",
chrome_path: str = "",
chrome_proxy: str = "",
chrome_user_agent: str = "",
write_db: bool = False,
write_csv: bool = False,
write_jsonl: bool = False,
screenshot_fullpage: bool = False,
save_content: bool = False,
additional_args: Optional[str] = None
) -> str:
"""
使用GoWitness工具对Web界面进行截图,用于信息收集和泄露检测
"""
return client.execute_tool(
"gowitness",
scan_type=scan_type,
url=url,
file=file,
cidr=cidr,
nmap_file=nmap_file,
nessus_file=nessus_file,
threads=threads,
timeout=timeout,
delay=delay,
screenshot_path=screenshot_path,
screenshot_format=screenshot_format,
chrome_path=chrome_path,
chrome_proxy=chrome_proxy,
chrome_user_agent=chrome_user_agent,
write_db=write_db,
write_csv=write_csv,
write_jsonl=write_jsonl,
screenshot_fullpage=screenshot_fullpage,
save_content=save_content,
additional_args=additional_args
)
@mcp.tool()
def wordlist_integration_workflow(
target_url: str,
scan_type: str = "directory",
wordlist_category: str = "discovery",
wordlist_type: str = "web-content",
use_memory: bool = False,
max_lines: int = 1000,
workdir_path: str = "",
scan_options: Dict[str, Any] = None
) -> Dict[str, Any]:
"""
工具链集成流程:SecLists生成→路径验证→工具扫描→结果分析
"""
if scan_options is None:
scan_options = {}
# 步骤1: 生成或获取字典
wordlist_result = {}
if use_memory:
# 内存传递方式
wordlist_result = seclists_wordlist(
action="list",
category=wordlist_category,
list_type=wordlist_type,
return_content=True,
max_lines=max_lines
)
# 检查是否成功获取字典内容
if not wordlist_result.get("success") or not wordlist_result.get("wordlist_content"):
return {
"success": False,
"error": "Failed to retrieve wordlist content",
"wordlist_result": wordlist_result
}
# 使用临时文件管理器创建临时文件
file_id = f"wordlist_{scan_type}_{int(time.time())}"
wordlist_path = global_temp_file_manager.create_temp_file_with_id(
file_id=file_id,
content=wordlist_result["wordlist_content"],
suffix=".txt",
prefix=f"wordlist_{scan_type}_"
)
# 更新wordlist_result以包含临时文件信息
wordlist_result["temp_file_id"] = file_id
wordlist_result["temp_file_path"] = wordlist_path
else:
# 文件复制方式
wordlist_result = seclists_wordlist(
action="list",
category=wordlist_category,
list_type=wordlist_type,
copy_to_workdir=True,
workdir_path=workdir_path if workdir_path else os.getcwd()
)
# 检查是否成功复制文件
if not wordlist_result.get("success") or not wordlist_result.get("copied_files"):
return {
"success": False,
"error": "Failed to copy wordlist file",
"wordlist_result": wordlist_result
}
wordlist_path = wordlist_result["copied_files"][0]
# 步骤2: 验证字典文件路径
if not os.path.exists(wordlist_path):
return {
"success": False,
"error": f"Wordlist file not found at path: {wordlist_path}",
"wordlist_result": wordlist_result
}
# 步骤3: 执行扫描
scan_result = {}
if scan_type == "directory":
# 目录扫描
scan_result = ffuf_scan(
url=target_url,
wordlist=wordlist_path,
**scan_options
)
elif scan_type == "subdomain":
# 子域名扫描
scan_result = subfinder_enum(
domain=target_url.replace("http://", "").replace("https://", "").split("/")[0],
**scan_options
)
elif scan_type == "vhost":
# 虚拟主机扫描
scan_result = ffuf_scan(
url=target_url,
wordlist=wordlist_path,
headers="Host: FUZZ",
**scan_options
)
else:
# 默认使用目录扫描
scan_result = ffuf_scan(
url=target_url,
wordlist=wordlist_path,
**scan_options
)
# 步骤4: 清理临时文件(如果是内存传递方式)
if use_memory and wordlist_result.get("temp_file_id"):
try:
global_temp_file_manager.delete_temp_file(wordlist_result["temp_file_id"])
temp_cleanup_result = {"success": True, "message": "Temporary file cleaned up successfully"}
except Exception as e:
temp_cleanup_result = {"success": False, "error": str(e)}
else:
temp_cleanup_result = None
# 步骤5: 分析结果
analysis = {
"scan_type": scan_type,
"wordlist_info": {
"category": wordlist_category,
"type": wordlist_type,
"size": wordlist_result.get("size", "unknown"),
"lines": wordlist_result.get("lines", "unknown")
},
"scan_success": scan_result.get("success", False),
"findings_count": 0,
"high_priority_findings": []
}
# 分析扫描结果
if scan_result.get("success") and scan_result.get("output"):
output_lines = scan_result["output"].split("\n")
analysis["findings_count"] = len(output_lines)
# 提取高优先级发现
for line in output_lines:
if any(status in line for status in ["200", "301", "302", "403"]):
analysis["high_priority_findings"].append(line)
# 返回综合结果
return {
"success": True,
"workflow": {
"wordlist_generation": wordlist_result,
"scan_execution": scan_result,
"analysis": analysis,
"temp_file_cleanup": temp_cleanup_result
},
"summary": {
"target": target_url,
"scan_type": scan_type,
"wordlist_used": f"{wordlist_category}/{wordlist_type}",
"transfer_method": "memory" if use_memory else "file_copy",
"total_findings": analysis["findings_count"],
"high_priority_findings": len(analysis["high_priority_findings"])
}
}
@mcp.prompt()
def wordlist_integration_guide() -> str:
"""
# 工具链集成流程指南
wordlist_integration_workflow 提供自动化工作流程,集成SecLists字典生成和扫描工具。
## 核心参数
- target_url (必需): 目标URL或域名
- scan_type: 扫描类型 ("directory"/"subdomain"/"vhost")
- wordlist_category: 字典分类 (passwords/usernames/discovery/fuzzing等)
- wordlist_type: 字典类型 (common/default/rockyou等)
- use_memory: 是否使用内存传递方式 (默认: False)
- max_lines: 内存传递时的最大行数限制 (默认: 1000)
- scan_options: 扫描工具的额外选项 (字典格式)
## 工作流程
1. 字典生成: 根据分类和类型获取SecLists字典,支持内存传递和文件复制
2. 扫描执行: 根据扫描类型选择工具,自动配置参数和字典路径
3. 结果分析: 统计发现数量,提取高优先级发现,生成分析报告
## 注意事项
- 确保目标URL有效且可访问
- 根据扫描目标选择合适的字典类型
- 大型字典建议使用文件复制方式
- 确保有足够的权限执行扫描操作
"""
@mcp.tool()
def auto_wordlist_management(action: str, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
自动化的词汇表生成和清理流程
"""
try:
# 设置默认配置
default_config = {
"max_age_hours": 24.0,
"wordlist_types": ["common", "admin", "api"],
"cleanup_after_generate": True
}
# 合并用户配置
if config:
final_config = {**default_config, **config}
else:
final_config = default_config
result = {
"success": True,
"action": action,
"config": final_config,
"timestamp": datetime.datetime.now().isoformat(),
"operations": []
}
# 根据操作类型执行相应功能
if action in ["cleanup", "full"]:
# 清理过期临时文件
cleanup_result = temp_file_cleanup(max_age_hours=final_config["max_age_hours"])
result["operations"].append({
"operation": "cleanup",
"status": "success" if cleanup_result["success"] else "failed",
"details": cleanup_result
})
if action in ["generate", "full"]:
# 生成预定义字典
for wordlist_type in final_config["wordlist_types"]:
try:
if wordlist_type == "common":
# 生成常用密码字典
common_passwords = [
"admin", "password", "123456", "root", "test",
"guest", "user", "login", "welcome", "qwerty"
]
wordlist_content = "\n".join(common_passwords)
wordlist_result = temp_file_create(
file_id=f"auto_common_{int(time.time())}",
content=wordlist_content,
suffix=".txt"
)
result["operations"].append({
"operation": "generate_common",
"status": "success",
"file_id": wordlist_result["file_id"],
"file_path": wordlist_result["file_path"],
"size": wordlist_result["size"]
})
elif wordlist_type == "admin":
# 生成管理员账户字典
admin_accounts = [
"admin", "administrator", "root", "superuser",
"sa", "manager", "supervisor", "operator"
]
wordlist_content = "\n".join(admin_accounts)
wordlist_result = temp_file_create(
file_id=f"auto_admin_{int(time.time())}",
content=wordlist_content,
suffix=".txt"
)
result["operations"].append({
"operation": "generate_admin",
"status": "success",
"file_id": wordlist_result["file_id"],
"file_path": wordlist_result["file_path"],
"size": wordlist_result["size"]
})
elif wordlist_type == "api":
# 生成API端点字典
api_endpoints = [
"/api", "/api/v1", "/api/v2", "/rest", "/rest/api",
"/graphql", "/swagger", "/docs", "/admin", "/login"
]
wordlist_content = "\n".join(api_endpoints)
wordlist_result = temp_file_create(
file_id=f"auto_api_{int(time.time())}",
content=wordlist_content,
suffix=".txt"
)
result["operations"].append({
"operation": "generate_api",
"status": "success",
"file_id": wordlist_result["file_id"],
"file_path": wordlist_result["file_path"],
"size": wordlist_result["size"]
})
except Exception as e:
result["operations"].append({
"operation": f"generate_{wordlist_type}",
"status": "failed",
"error": str(e)
})
# 如果配置了生成后清理,则执行清理
if action == "generate" and final_config["cleanup_after_generate"]:
cleanup_result = temp_file_cleanup(max_age_hours=final_config["max_age_hours"])
result["operations"].append({
"operation": "post_generate_cleanup",
"status": "success" if cleanup_result["success"] else "failed",
"details": cleanup_result
})
# 统计操作结果
success_count = sum(1 for op in result["operations"] if op["status"] == "success")
result["summary"] = {
"total_operations": len(result["operations"]),
"successful_operations": success_count,
"failed_operations": len(result["operations"]) - success_count
}
return result
except Exception as e:
return {
"success": False,
"error": f"Auto wordlist management failed: {str(e)}",
"action": action,
"config": config
}
# 系统工具 - 健康检查与通用命令
@mcp.tool()
def server_health() -> Dict[str, Any]:
"""
[系统工具] 检查PST API服务器健康状态。
"""
return pst_client.check_health()
@mcp.tool()
def execute_command(command: str) -> Dict[str, Any]:
return pst_client.execute_command(command)
@mcp.tool()
def pw_list_tools(branch: str = "main") -> Dict[str, Any]:
"""
[系统工具] 列出PST工具库中的所有可用工具包括工具描述和分类信息
"""
return pst_client.safe_get("api/catalog/pentest_windows", {"branch": branch})
@mcp.tool()
def pst_installed_tools() -> Dict[str, Any]:
"""
[系统工具] 列出已安装的PST工具,包括工具版本和状态信息,用于检查系统环境配置和工具可用性
"""
return pst_client.safe_get("api/catalog/installed")
# 系统工具 - 临时文件管理
@mcp.tool()
def manage_temp_files(
action: str,
file_id: str = "",
file_path: str = "",
content: str = "",
suffix: str = ".txt",
prefix: str = "mcp_",
max_age_hours: float = 24.0
) -> Dict[str, Any]:
"""
[系统工具] 临时文件和目录管理工具,提供临时文件和目录的创建、查询、删除、列表和清理功能
"""
try:
if action == "create":
# 创建临时文件
if file_id:
file_path = global_temp_file_manager.create_temp_file_with_id(
file_id=file_id,
content=content,
suffix=suffix,
prefix=prefix
)
return {
"success": True,
"file_id": file_id,
"file_path": file_path,
"message": f"创建临时文件成功,ID: {file_id}"
}
else:
file_path = global_temp_file_manager.create_temp_file(
content=content,
suffix=suffix,
prefix=prefix
)
# 获取生成的文件ID
for fid, finfo in global_temp_file_manager.temp_files.items():
if finfo["path"] == file_path:
return {
"success": True,
"file_id": fid,
"file_path": file_path,
"message": f"创建临时文件成功,自动生成ID: {fid}"
}
return {
"success": False,
"error": "无法获取文件ID"
}
elif action == "create_dir":
# 创建临时目录
dir_path = global_temp_file_manager.create_temp_dir(prefix)
return {
"success": True,
"dir_path": dir_path,
"message": f"创建临时目录成功: {dir_path}"
}
elif action == "get_path":
# 获取文件路径
if not file_id:
return {
"success": False,
"error": "获取文件路径需要提供file_id参数"
}
file_path = global_temp_file_manager.get_temp_file_path(file_id)
if file_path:
return {
"success": True,
"file_id": file_id,
"file_path": file_path,
"exists": os.path.exists(file_path)
}
else:
return {
"success": False,
"error": f"未找到ID为 {file_id} 的文件"
}
elif action == "delete":
# 删除文件
if file_id:
success = global_temp_file_manager.delete_temp_file(file_id)
if success:
return {
"success": True,
"message": f"删除临时文件成功,ID: {file_id}"
}
else:
return {
"success": False,
"error": f"删除临时文件失败,ID: {file_id} 或文件不存在"
}
elif file_path:
success = global_temp_file_manager.delete_temp_file_by_path(file_path)
if success:
return {
"success": True,
"message": f"删除临时文件成功,路径: {file_path}"
}
else:
return {
"success": False,
"error": f"删除临时文件失败,路径: {file_path} 或文件不存在"
}
else:
return {
"success": False,
"error": "删除文件需要提供file_id或file_path参数"
}
elif action == "list":
# 列出所有文件
temp_files = global_temp_file_manager.list_temp_files()
stats = global_temp_file_manager.get_temp_file_stats()
return {
"success": True,
"files": temp_files,
"stats": stats,
"count": len(temp_files)
}
elif action == "cleanup":
# 清理旧文件
deleted_count = global_temp_file_manager.cleanup_temp_files(max_age_hours)
return {
"success": True,
"deleted_count": deleted_count,
"max_age_hours": max_age_hours,
"message": f"清理了 {deleted_count} 个超过 {max_age_hours} 小时的临时项目"
}
elif action == "cleanup_all":
# 清理所有文件
deleted_count = global_temp_file_manager.cleanup_all_temp_files()
return {
"success": True,
"deleted_count": deleted_count,
"message": f"清理了所有 {deleted_count} 个临时项目"
}
else:
return {
"success": False,
"error": f"不支持的操作类型: {action}。支持的操作: create, create_dir, get_path, delete, list, cleanup, cleanup_all"
}
except Exception as e:
return {
"success": False,
"error": f"临时文件管理操作失败: {str(e)}"
}
return mcp
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description="运行Windows PST MCP客户端")
parser.add_argument("--server", type=str, default=DEFAULT_PST_SERVER, help=f"PST API服务器URL (默认: {DEFAULT_PST_SERVER})")
parser.add_argument("--timeout", type=int, default=DEFAULT_REQUEST_TIMEOUT, help=f"请求超时时间(秒) (默认: {DEFAULT_REQUEST_TIMEOUT})")
parser.add_argument("--debug", action="store_true", help="启用调试日志")
parser.add_argument("--log-file", type=str, help="日志文件路径")
# 新增参数
parser.add_argument("--host", type=str, default="127.0.0.1", help="MCP服务器主机 (默认: 127.0.0.1)")
parser.add_argument("--port", type=int, default=8000, help="MCP服务器端口 (默认: 8000)")
parser.add_argument("--path", type=str, default="/mcp", help="MCP服务器访问路径 (默认: /mcp)")
parser.add_argument("--transport", type=str, default="studio", choices=["studio", "sse", "http"], help="MCP服务器启动模式 (默认: studio)")
return parser.parse_args()
@monitor_performance
def main():
"""主函数"""
try:
# 解析命令行参数
args = parse_args()
# 设置日志
logger = setup_logging(args.debug, args.log_file)
logger.info("启动PST MCP服务器")
# 初始化PST客户端
logger.info(f"初始化PST工具客户端,服务器地址: {args.server}")
pst_client = PSTToolsClient(args.server, args.timeout)
# 检查服务器健康状态
logger.info("检查PST API服务器健康状态")
health = pst_client.check_health()
if "error" in health:
logger.warning(f"无法连接到PST API服务器 {args.server}: {health['error']}")
logger.warning("MCP服务器将启动,但工具执行可能会失败")
else:
logger.info(f"成功连接到PST API服务器 {args.server}")
logger.info(f"服务器健康状态: {health.get('status')}")
if not health.get("all_essential_tools_available", False):
missing = [t for t, ok in health.get("tools_status", {}).items() if not ok]
if missing:
logger.warning(f"缺少工具: {', '.join(missing)}")
# 设置MCP服务器
logger.info("设置MCP服务器")
mcp = setup_mcp_server(pst_client)
logger.info(f"以{args.transport}模式启动PST MCP服务器")
# 启动服务器
transport_map = {
"studio": "stdio",
"sse": "sse",
"http": "streamable-http"
}
transport_mode = transport_map.get(args.transport)
if not transport_mode:
logger.error(f"无效的启动模式: {args.transport}")
return 1
try:
if transport_mode == 'sse':
logger.info(f"启动SSE服务器,监听 {args.host}:{args.port}")
app = mcp.sse_app()
uvicorn.run(app, host=args.host, port=args.port)
elif transport_mode == 'streamable-http':
logger.info(f"启动HTTP服务器,监听 {args.host}:{args.port}")
app = mcp.streamable_http_app()
uvicorn.run(app, host=args.host, port=args.port)
else:
logger.info("启动stdio模式服务器")
mcp.run(transport=transport_mode)
except KeyboardInterrupt:
logger.info("收到中断信号,正在关闭服务器")
return 0
except Exception as e:
logger.error(f"服务器启动失败: {str(e)}")
logger.debug(f"服务器启动异常详情:\n{traceback.format_exc()}")
return 1
finally:
# 清理资源
try:
pst_client.close()
global_temp_file_manager.cleanup_all_temp_files()
logger.info("资源清理完成")
except Exception as e:
logger.error(f"资源清理失败: {str(e)}")
return 0
except Exception as e:
logger.error(f"主函数执行失败: {str(e)}")
logger.debug(f"主函数异常详情:\n{traceback.format_exc()}")
return 1
if __name__ == "__main__":
sys.exit(main())