Skip to main content
Glama

MCP Port Scanner

DEVELOPMENT_GUIDE.md14.5 kB
# MCP智能端口扫描器 - 开发文档 ## 目录 1. [项目概述](#项目概述) 2. [技术架构](#技术架构) 3. [核心组件](#核心组件) 4. [数据模型](#数据模型) 5. [接口协议](#接口协议) 6. [扩展开发](#扩展开发) 7. [性能优化](#性能优化) 8. [部署指南](#部署指南) ## 项目概述 MCP智能端口扫描器是一个基于MCP(Model Context Protocol)协议的分层递进式端口扫描服务。该项目实现了智能扫描策略,能够根据扫描结果动态调整扫描深度,提供高效准确的网络服务发现能力。 ### 核心特性 - **智能扫描模式**:根据端口数量动态决定扫描策略 - **分层递进架构**:端口扫描 → HTTP检测 → Web深度探测 - **多接口支持**:MCP协议(stdio)、HTTP/SSE、Cursor优化接口 - **并发优化**:高性能异步扫描,支持批量和网段扫描 - **实时反馈**:SSE推送进度,阶段性结果展示 ### 技术栈 - **语言**:Python 3.12+ - **异步框架**:asyncio - **Web框架**:FastAPI - **HTTP客户端**:httpx - **端口扫描**:RustScan(外部工具) - **协议支持**:MCP SDK、SSE(Server-Sent Events) ## 技术架构 ### 整体架构图 ``` ┌─────────────────────────────────────────────────────────┐ │ 接口层 (Interfaces) │ ├─────────────┬──────────────┬──────────────┬─────────────┤ │ CLI接口 │ MCP Server │ HTTP/SSE │ Cursor SSE │ │ (cli.py) │ (mcp_server) │ (http_sse) │ (cursor) │ ├─────────────┴──────────────┴──────────────┴─────────────┤ │ 适配器层 (Adapters) │ ├─────────────┬──────────────┬──────────────┬─────────────┤ │ CLI适配器 │ MCP本地适配器 │ MCP远程适配器│ SSE适配器 │ ├─────────────┴──────────────┴──────────────┴─────────────┤ │ 服务层 (Service Layer) │ │ ScanService │ ├──────────────────────────────────────────────────────────┤ │ 业务层 (Business Layer) │ ├─────────────┬──────────────┬──────────────┬─────────────┤ │ PortScanner │ HTTPDetector │ WebProber │ Models │ │ (第一层) │ (第二层) │ (第三层) │ (数据模型) │ └─────────────┴──────────────┴──────────────┴─────────────┘ ``` ### 分层扫描流程 ``` 1. 端口扫描层(PortScanner) ├─ RustScan快速端口发现 ├─ Banner信息获取 └─ 智能决策:少端口→全端口扫描 2. HTTP检测层(HTTPDetector) ├─ 基于Banner的智能识别 ├─ HTTP/HTTPS服务验证 └─ 服务器信息提取 3. Web探测层(WebProber) ├─ 管理界面发现 ├─ 技术栈特定路径扫描 └─ 深度目录探测 ``` ## 核心组件 ### 1. ScanService(统一服务层) `src/mcp_port_scanner/service.py` 核心服务类,提供统一的扫描接口: ```python class ScanService: """统一的端口扫描服务""" # 同步接口 def scan_sync(ip: str, ports: Optional[List[int]] = None) -> ScanResult def batch_scan_sync(targets: List[str]) -> List[ScanResult] # 异步接口 async def scan_async(ip: str, ports: Optional[List[int]] = None) -> ScanResult async def batch_scan_async(targets: List[str]) -> List[ScanResult] # 流式接口 async def scan_async_with_progress(ip: str, progress_callback: Callable) -> ScanResult # 任务管理 def get_scan_status(scan_id: str) -> ScanStatus def list_active_scans() -> List[str] ``` **智能扫描逻辑**: - 预设端口扫描(1-1000 + 常用端口) - 发现端口数 < 阈值(默认3)→ 执行全端口扫描 - 发现端口数 ≥ 阈值 → 跳过全端口,直接Web检测 ### 2. PortScanner(第一层扫描) `src/mcp_port_scanner/scanner.py` 基于RustScan的高速端口扫描器: ```python class PortScanner: """端口扫描器 - 第一层扫描功能""" async def scan_target(target: ScanTarget) -> List[PortInfo] async def _rustscan_ports(target: ScanTarget) -> List[int] async def _grab_banners(ip: str, ports: List[int]) -> List[PortInfo] ``` **关键特性**: - RustScan集成:极速端口发现 - 智能Banner获取:TCP连接获取服务信息 - 并发优化:异步Banner抓取 ### 3. HTTPDetector(第二层检测) `src/mcp_port_scanner/http_detector.py` 智能HTTP服务识别器: ```python class HTTPDetector: """HTTP服务检测器 - 第二层检测功能""" async def detect_http_services(ip: str, port_infos: List[PortInfo]) -> List[HTTPInfo] def _identify_http_candidates(port_infos: List[PortInfo]) -> List[PortInfo] async def _verify_http_services(ip: str, candidates: List[PortInfo]) -> List[HTTPInfo] ``` **检测规则引擎**: - Banner模式匹配 - 端口提示权重 - 置信度评分系统 - HTTP/HTTPS自动识别 ### 4. WebProber(第三层探测) `src/mcp_port_scanner/web_prober.py` Web应用深度探测器: ```python class WebProber: """Web深度探测器 - 第三层探测功能""" async def probe_web_services(http_services: List[HTTPInfo]) -> List[DirectoryInfo] def _select_applicable_rules(http_service: HTTPInfo) -> List[AdminDirectoryRule] async def _scan_directories(base_url: str, paths: List[str]) -> List[DirectoryInfo] ``` **探测规则库**: - 通用管理路径 - 技术栈特定路径(Tomcat、WebLogic、Jenkins等) - 智能规则匹配 - 并发目录扫描 ## 数据模型 ### 核心模型 ```python # 扫描目标 class ScanTarget(BaseModel): ip: str ports: Optional[List[int]] = None # 扫描配置 class ScanConfig(BaseModel): # 智能扫描配置 smart_scan_enabled: bool = True smart_scan_threshold: int = 3 # 预设端口 preset_ports: List[int] = [...] web_ports: List[int] = [80, 443, 8080, ...] # RustScan配置 rustscan_timeout: int = 500 rustscan_batch_size: int = 65535 # HTTP配置 http_timeout: float = 10.0 http_user_agent: str = "..." # 扫描结果 class ScanResult(BaseModel): target: ScanTarget scan_id: str status: ScanStatus # 分层结果 open_ports: List[PortInfo] http_services: List[HTTPInfo] admin_directories: List[DirectoryInfo] ``` ### 信息模型 ```python # 端口信息 class PortInfo(BaseModel): port: int protocol: str = "tcp" state: str = "open" service: Optional[str] = None version: Optional[str] = None banner: Optional[str] = None # HTTP服务信息 class HTTPInfo(BaseModel): url: str status_code: int headers: Dict[str, str] title: Optional[str] = None server: Optional[str] = None is_https: bool = False # 目录信息 class DirectoryInfo(BaseModel): path: str status_code: int title: Optional[str] = None is_admin: bool = False ``` ## 接口协议 ### 1. MCP工具接口 标准MCP协议工具定义: ```python # scan_target - 扫描单个目标 { "name": "scan_target", "parameters": { "ip": "192.168.1.1", "ports": [80, 443], # 可选 "scan_layers": ["port_scan", "http_detection", "web_probe"], "config": {...} # 可选配置 } } # batch_scan - 批量扫描 { "name": "batch_scan", "parameters": { "targets": ["192.168.1.1", "192.168.1.2"], "scan_layers": [...], "max_concurrent": 5 } } # get_scan_status - 查询状态 { "name": "get_scan_status", "parameters": { "scan_id": "uuid-xxx" } } ``` ### 2. HTTP/SSE接口 RESTful API + Server-Sent Events: ```http # 启动扫描 POST /scan { "target": "192.168.1.1", "ports": [80, 443], "scan_layers": ["port_scan", "http_detection", "web_probe"] } # SSE实时进度 GET /scan/{scan_id}/stream Content-Type: text/event-stream data: {"type": "progress", "data": {...}} data: {"type": "layer_complete", "data": {...}} data: {"type": "complete", "data": {...}} # 查询结果 GET /scan/{scan_id}/result ``` ### 3. Cursor优化接口 针对Cursor编辑器优化的SSE接口: ```http # Cursor扫描 POST /cursor/scan { "ip": "192.168.1.1", "real_time": true, "config": {...} } # Cursor SSE流(0.5秒更新频率) GET /cursor/scan/{scan_id}/stream data: {"type": "start", "timestamp": "...", "data": {...}} data: {"type": "progress", "timestamp": "...", "data": {...}} data: {"type": "complete", "timestamp": "...", "data": {...}} ``` ## 扩展开发 ### 1. 添加新的扫描层 创建新的扫描器类: ```python # src/mcp_port_scanner/custom_scanner.py from .models import BaseModel class CustomScanner: """自定义扫描器""" def __init__(self, config: Optional[ScanConfig] = None): self.config = config or ScanConfig() async def scan_custom(self, target: str) -> List[CustomInfo]: """执行自定义扫描""" # 实现扫描逻辑 pass ``` 在服务层注册: ```python # service.py self.custom_scanner = CustomScanner(self.config) # 在扫描流程中调用 if "custom_scan" in layers: custom_results = await self.custom_scanner.scan_custom(target) ``` ### 2. 添加新的检测规则 扩展检测规则: ```python # 在 http_detector.py 中添加 HTTPDetectionRule( name="Custom Service", banner_patterns=[r"CustomServer/\d+"], port_hints=[9999], confidence_boost=0.3, priority=1 ) # 在 web_prober.py 中添加 AdminDirectoryRule( technology="CustomApp", paths=["/custom-admin", "/custom-panel"], indicators=["custom-header"], priority=1 ) ``` ### 3. 自定义适配器 实现新的接口适配器: ```python # adapters/custom_adapter.py from . import BaseAdapter class CustomAdapter(BaseAdapter): async def handle_request(self, request_data: Dict[str, Any]) -> Any: # 处理请求 pass def format_response(self, result: ScanResult) -> Any: # 格式化响应 pass ``` ## 性能优化 ### 1. 扫描性能优化 **RustScan参数调优**: ```python rustscan_timeout=200 # 极速超时 rustscan_batch_size=65535 # 最大并发 rustscan_ulimit=8192 # 文件描述符限制 ``` **并发控制**: - 端口扫描:RustScan内部并发 - Banner获取:asyncio并发(信号量控制) - HTTP检测:httpx异步客户端池 - 目录扫描:线程池 + 信号量 ### 2. 智能扫描策略 **阈值优化**: - 默认阈值:3个端口 - 小于阈值:深度扫描(全端口) - 大于阈值:快速模式(跳过全端口) **端口优先级**: - 预设端口:1-1000 + 常用服务端口 - Web端口:优先HTTP服务检测 - 高价值端口:管理界面、数据库等 ### 3. 资源管理 **内存优化**: - 流式处理大量结果 - 结果缓存限制 - Banner长度限制 **连接管理**: - HTTP连接池复用 - 超时控制 - 错误重试机制 ## 部署指南 ### 1. Docker部署 ```dockerfile FROM python:3.12-slim # 安装RustScan RUN apt-get update && \ apt-get install -y wget && \ wget https://github.com/RustScan/RustScan/releases/download/2.0.1/rustscan_2.0.1_amd64.deb && \ dpkg -i rustscan_2.0.1_amd64.deb # 安装Python依赖 COPY requirements.txt . RUN pip install -r requirements.txt # 复制代码 COPY . /app WORKDIR /app # 启动服务 CMD ["python", "-m", "mcp_port_scanner.mcp_server"] ``` ### 2. 系统要求 **最低配置**: - CPU:2核 - 内存:2GB - 存储:1GB - 网络:100Mbps **推荐配置**: - CPU:4核+ - 内存:4GB+ - 存储:10GB(日志存储) - 网络:1Gbps ### 3. 配置文件 `config/default.toml`: ```toml [scan] smart_scan_enabled = true smart_scan_threshold = 3 [rustscan] timeout = 500 batch_size = 65535 [http] timeout = 10.0 max_redirects = 3 [logging] enabled = true level = "INFO" ``` ## 开发最佳实践 ### 1. 代码规范 - 使用类型注解 - 遵循PEP 8规范 - 完整的文档字符串 - 异步函数命名:`_async`后缀 ### 2. 错误处理 ```python try: result = await scanner.scan_target(target) except asyncio.TimeoutError: logger.error(f"扫描超时: {target}") except Exception as e: logger.exception(f"扫描失败: {target}") # 优雅降级处理 ``` ### 3. 日志规范 ```python logger.info(f"开始扫描: {target}") # 关键操作 logger.debug(f"端口详情: {port_info}") # 调试信息 logger.warning(f"连接失败: {url}") # 警告 logger.error(f"扫描失败: {error}") # 错误 ``` ### 4. 测试建议 - 单元测试:核心业务逻辑 - 集成测试:完整扫描流程 - 性能测试:并发和延迟 - 安全测试:输入验证 ## 常见问题 ### Q1: 如何调整扫描速度? 调整RustScan参数和并发数: ```python config = ScanConfig( rustscan_timeout=1000, # 增加超时 rustscan_batch_size=10000, # 减少并发 max_concurrent_targets=3 # 减少目标并发 ) ``` ### Q2: 如何添加自定义端口? 修改配置中的预设端口: ```python config = ScanConfig( preset_ports=[22, 80, 443, 3306, 6379, 自定义端口...] ) ``` ### Q3: 如何禁用某个扫描层? 在scan_layers参数中排除: ```python # 只进行端口扫描 layers = ["port_scan"] # 跳过Web探测 layers = ["port_scan", "http_detection"] ``` --- 本文档持续更新中,最新版本请查看项目仓库。

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/relaxcloud-cn/mcp-port-scanner'

If you have feedback or need assistance with the MCP directory API, please join our Discord server