Skip to main content
Glama

MCP Port Scanner

scanner.py27.2 kB
""" 第一层:RustScan基础端口扫描和Banner获取 """ import asyncio import socket import subprocess import json import re import os from typing import List, Optional, Dict, Any, Tuple from .logger_config import logger import time from .models import PortInfo, ScanTarget, ScanConfig, ServiceProtocol from .rustscan_manager import get_rustscan_manager class PortScanner: """端口扫描器 - 第一层扫描功能""" def __init__(self, config: Optional[ScanConfig] = None): self.config = config or ScanConfig() self.rustscan_manager = get_rustscan_manager() # 验证 RustScan 可用性 verified, version_info = self.rustscan_manager.verify_rustscan() if verified: logger.info(f"RustScan 初始化成功: {version_info}") else: logger.warning(f"RustScan 初始化失败: {version_info}") logger.info("将回退到 Python socket 扫描") logger.debug("PortScanner 初始化完成,配置: timeout={}ms, batch_size={}", self.config.rustscan_timeout, self.config.rustscan_batch_size) async def scan_target(self, target: ScanTarget, progress_callback: Optional[callable] = None) -> List[PortInfo]: """ 扫描目标的开放端口 Args: target: 扫描目标 progress_callback: 进度回调函数,参数为(stage, message) Returns: List[PortInfo]: 端口信息列表 """ try: # Step 1: 端口发现 if progress_callback: await progress_callback("端口发现", "正在扫描端口...") open_ports = await self._rustscan_ports(target) if not open_ports: logger.info(f"未发现开放端口: {target.ip}") return [] # Step 2: Banner抓取 if progress_callback: await progress_callback("Banner抓取", f"正在获取 {len(open_ports)} 个端口的服务信息...") port_infos = await self._grab_banners(target.ip, open_ports, progress_callback) logger.info(f"扫描完成: {target.ip},发现 {len(port_infos)} 个开放端口") return port_infos except Exception as e: logger.error(f"端口扫描失败: {target.ip} - {e}") return [] async def _rustscan_ports(self, target: ScanTarget) -> List[int]: """ 使用RustScan进行端口扫描 """ try: # 如果指定了端口范围,直接扫描 if target.ports: return await self._execute_rustscan_batch(target, target.ports) # 全端口扫描:分批处理避免参数过长 logger.info(f"🔍 开始分批全端口扫描: {target.ip}") all_open_ports = [] # 定义扫描批次 - 分成更小的范围 port_ranges = [ "1-1000", # 常用端口 "1001-5000", # 扩展端口1 "5001-10000", # 扩展端口2 "10001-20000", # 扩展端口3 "20001-30000", # 扩展端口4 "30001-40000", # 扩展端口5 "40001-50000", # 扩展端口6 "50001-60000", # 扩展端口7 "60001-65535" # 高位端口 ] for i, port_range in enumerate(port_ranges, 1): logger.info(f"📡 扫描端口范围 {i}/{len(port_ranges)}: {port_range}") try: batch_ports = await self._execute_rustscan_range(target, port_range) all_open_ports.extend(batch_ports) # 如果这批找到了端口,记录一下 if batch_ports: logger.info(f"✅ 范围 {port_range} 发现 {len(batch_ports)} 个开放端口: {batch_ports}") except Exception as e: logger.warning(f"⚠️ 范围 {port_range} 扫描失败: {e}") continue logger.info(f"🎯 全端口扫描完成,总共发现 {len(all_open_ports)} 个开放端口") return sorted(list(set(all_open_ports))) # 去重并排序 except FileNotFoundError: logger.warning("RustScan二进制文件未找到,回退到Python socket扫描") logger.info(self.rustscan_manager.install_suggestions()) return await self._socket_scan_ports(target) except Exception as e: logger.error(f"RustScan扫描失败: {e}") return await self._socket_scan_ports(target) async def _execute_rustscan_range(self, target: ScanTarget, port_range: str) -> List[int]: """ 执行单个端口范围的RustScan扫描 """ try: cmd = self.rustscan_manager.get_command_args( target.ip, timeout=self.config.rustscan_timeout, batch_size=self.config.rustscan_batch_size, tries=self.config.rustscan_tries, ulimit=self.config.rustscan_ulimit, port_range=port_range ) except FileNotFoundError as e: logger.error(f"构建RustScan命令失败: {e}") raise # 执行命令 process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: error_msg = stderr.decode().strip() if "Permission denied" in error_msg or "ulimit" in error_msg: logger.warning(f"RustScan权限或ulimit问题,范围 {port_range}: {error_msg}") else: logger.warning(f"RustScan范围 {port_range} 执行失败: {error_msg}") return [] # 解析结果 return self._parse_rustscan_greppable_output(stdout.decode()) async def _execute_rustscan_batch(self, target: ScanTarget, ports: List[int]) -> List[int]: """ 执行指定端口列表的RustScan扫描 """ # 如果端口数量太多,也需要分批 if len(ports) > 1000: logger.info(f"📋 端口数量 {len(ports)} 较多,分批扫描") all_results = [] batch_size = 1000 for i in range(0, len(ports), batch_size): batch_ports = ports[i:i + batch_size] logger.info(f"📡 扫描端口批次 {i//batch_size + 1}: {len(batch_ports)} 个端口") try: batch_results = await self._execute_rustscan_port_list(target, batch_ports) all_results.extend(batch_results) except Exception as e: logger.warning(f"⚠️ 端口批次扫描失败: {e}") continue return sorted(list(set(all_results))) else: return await self._execute_rustscan_port_list(target, ports) async def _execute_rustscan_port_list(self, target: ScanTarget, ports: List[int]) -> List[int]: """ 执行具体端口列表的RustScan扫描 """ try: cmd = self.rustscan_manager.get_command_args( target.ip, timeout=self.config.rustscan_timeout, batch_size=self.config.rustscan_batch_size, tries=self.config.rustscan_tries, ulimit=self.config.rustscan_ulimit, ports=ports ) except FileNotFoundError as e: logger.error(f"构建RustScan命令失败: {e}") raise logger.info(f"💨 RustScan极速配置: timeout={self.config.rustscan_timeout}ms, batch={self.config.rustscan_batch_size}") # 安全的命令调试输出 if len(ports) > 100: logger.debug(f"执行RustScan命令 (包含{len(ports)}个端口): rustscan -a {target.ip} ... -p [端口列表...]") else: logger.debug(f"执行RustScan命令: {' '.join(cmd)}") # 执行命令 process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: error_msg = stderr.decode().strip() logger.error(f"RustScan执行失败: {error_msg}") return [] # 解析greppable输出 return self._parse_rustscan_greppable_output(stdout.decode()) def _parse_rustscan_greppable_output(self, output: str) -> List[int]: """ 解析RustScan greppable输出 Args: output: RustScan greppable输出文本 Returns: List[int]: 端口列表 """ ports = [] # greppable格式: ip -> [port1,port2,...] for line in output.strip().split('\n'): line = line.strip() if '->' in line and '[' in line and ']' in line: try: # 提取方括号内的端口列表 bracket_content = line.split('[')[1].split(']')[0] # 解析端口列表 port_strs = bracket_content.split(',') for port_str in port_strs: port_str = port_str.strip() if port_str: ports.append(int(port_str)) except (ValueError, IndexError): continue return sorted(list(set(ports))) # 去重并排序 async def _socket_scan_ports(self, target: ScanTarget) -> List[int]: """ 使用Python socket进行端口扫描(RustScan的备选方案) Args: target: 扫描目标 Returns: List[int]: 开放端口列表 """ try: # 确定要扫描的端口 if target.ports: ports_to_scan = target.ports else: # 使用配置中的预设端口列表 ports_to_scan = self._get_preset_ports() logger.debug(f"开始socket扫描,目标端口数: {len(ports_to_scan)}") # 并发扫描端口 semaphore = asyncio.Semaphore(50) # 限制并发数 tasks = [] for port in ports_to_scan: task = asyncio.create_task(self._check_port_socket(target.ip, port, semaphore)) tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True) # 收集开放端口 open_ports = [] for i, result in enumerate(results): if isinstance(result, Exception): logger.debug(f"端口 {ports_to_scan[i]} 检查失败: {result}") elif result: open_ports.append(ports_to_scan[i]) logger.debug(f"Socket扫描完成,发现 {len(open_ports)} 个开放端口") return open_ports except Exception as e: logger.error(f"Socket扫描失败: {e}") return [] def _get_preset_ports(self) -> List[int]: """ 获取预设端口列表,合并RustScan端口范围和配置中的preset_ports Returns: List[int]: 合并后的端口列表 """ # 解析RustScan端口范围 rustscan_ports = [] try: port_range = self.config.rustscan_ports if '-' in port_range: start, end = map(int, port_range.split('-')) rustscan_ports = list(range(start, end + 1)) else: # 如果不是范围,可能是单个端口或逗号分隔的端口列表 rustscan_ports = [int(p.strip()) for p in port_range.split(',')] except (ValueError, AttributeError) as e: logger.warning(f"解析RustScan端口范围失败: {e},使用默认21-1000") rustscan_ports = list(range(21, 1001)) # 合并RustScan端口和预设端口 all_ports = set(rustscan_ports) all_ports.update(self.config.preset_ports) # 排序并返回 final_ports = sorted(list(all_ports)) logger.debug(f"预设端口合并: RustScan({len(rustscan_ports)}) + 预设({len(self.config.preset_ports)}) = 总计({len(final_ports)})") return final_ports async def _check_port_socket(self, ip: str, port: int, semaphore: asyncio.Semaphore) -> bool: """ 使用socket检查单个端口是否开放 Args: ip: 目标IP port: 端口号 semaphore: 并发控制信号量 Returns: bool: 端口是否开放 """ async with semaphore: try: # 创建socket连接测试 future = asyncio.open_connection(ip, port) reader, writer = await asyncio.wait_for(future, timeout=3.0) # 连接成功,端口开放 writer.close() await writer.wait_closed() return True except (asyncio.TimeoutError, ConnectionRefusedError, OSError): # 连接失败,端口关闭 return False except Exception as e: logger.debug(f"检查端口 {ip}:{port} 时发生异常: {e}") return False async def _grab_banners(self, ip: str, ports: List[int], progress_callback: Optional[callable] = None) -> List[PortInfo]: """ 收集端口Banner信息 Args: ip: 目标IP ports: 端口列表 progress_callback: 进度回调函数 Returns: List[PortInfo]: 端口信息列表 """ port_infos = [] # 并发获取Banner信息 semaphore = asyncio.Semaphore(20) # 限制并发数 tasks = [] for port in ports: task = asyncio.create_task(self._grab_single_banner(ip, port)) tasks.append(task) # 逐个等待并显示进度 completed = 0 for i, task in enumerate(asyncio.as_completed(tasks)): try: result = await task port_infos.append(result) completed += 1 if progress_callback: await progress_callback("Banner抓取", f"正在获取服务信息... ({completed}/{len(ports)}) - 端口 {result.port}") except Exception as e: logger.debug(f"获取端口 {ports[i]} Banner失败: {e}") # 创建基础端口信息 port_infos.append(PortInfo( port=ports[i], state="open", protocol=ServiceProtocol.TCP, service=self._identify_service_by_port(ports[i]) )) completed += 1 if progress_callback: await progress_callback("Banner抓取", f"正在获取服务信息... ({completed}/{len(ports)}) - 端口 {ports[i]} (failed)") return port_infos async def _grab_single_banner(self, ip: str, port: int) -> PortInfo: """ 获取单个端口的Banner信息 Args: ip: 目标IP port: 端口号 Returns: PortInfo: 端口信息 """ # 获取Banner banner = await self._get_banner(ip, port) # 识别服务 service_info = self._identify_service(port, banner) # 创建端口信息 port_info = PortInfo( port=port, state="open", protocol=ServiceProtocol.TCP, service=service_info.get("service", "unknown"), version=service_info.get("version"), banner=banner, confidence=service_info.get("confidence", 0.5) ) return port_info async def _get_banner(self, ip: str, port: int, timeout: float = 5.0) -> Optional[str]: """ 获取端口Banner信息 Args: ip: 目标IP port: 端口号 timeout: 超时时间 Returns: Optional[str]: Banner信息 """ try: # 创建连接 reader, writer = await asyncio.wait_for( asyncio.open_connection(ip, port), timeout=timeout ) # 尝试读取Banner try: # 等待服务器主动发送Banner banner_data = await asyncio.wait_for(reader.read(1024), timeout=2.0) if banner_data: banner = banner_data.decode('utf-8', errors='ignore').strip() if banner: return banner except asyncio.TimeoutError: pass # 如果没有主动Banner,尝试发送HTTP请求 if port in [80, 8080, 8000, 8001, 8008, 8081, 8082, 8888, 9000, 9090, 9999]: writer.write(b"GET / HTTP/1.1\r\nHost: " + ip.encode() + b"\r\n\r\n") await writer.drain() response_data = await asyncio.wait_for(reader.read(1024), timeout=3.0) if response_data: response = response_data.decode('utf-8', errors='ignore').strip() if response: return response # 关闭连接 writer.close() await writer.wait_closed() return None except Exception as e: logger.debug(f"获取Banner失败 {ip}:{port}: {e}") return None def _identify_service(self, port: int, banner: str) -> Dict[str, Any]: """ 基于端口号和Banner识别服务 Args: port: 端口号 banner: Banner信息 Returns: Dict[str, Any]: 服务信息 """ # 首先基于端口号识别 service_info = self._identify_by_port(port) # 然后基于Banner改进识别 if banner: banner_lower = banner.lower() # HTTP服务检测 if any(keyword in banner_lower for keyword in ["http/", "server:", "apache", "nginx", "iis"]): service_info["name"] = "http" # 提取服务器信息 if "server:" in banner_lower: server_match = re.search(r"server:\s*([^\r\n]+)", banner_lower) if server_match: service_info["version"] = server_match.group(1).strip() # SSH服务检测 elif "ssh-" in banner_lower: service_info["name"] = "ssh" ssh_match = re.search(r"ssh-[\d\.]+", banner_lower) if ssh_match: service_info["version"] = ssh_match.group(0) # FTP服务检测 elif any(keyword in banner_lower for keyword in ["ftp", "220 "]): service_info["name"] = "ftp" # SMTP服务检测 elif "220 " in banner and any(keyword in banner_lower for keyword in ["smtp", "mail"]): service_info["name"] = "smtp" # 恶意软件检测 elif "morte c2" in banner_lower: service_info["name"] = "morte-c2" service_info["threat"] = "C2服务器" elif "usoppgo" in banner_lower or "king of snipers" in banner_lower: service_info["name"] = "usoppgo-ftp" service_info["threat"] = "可疑FTP服务" elif "cobaltstrike" in banner_lower or "beacon" in banner_lower: service_info["name"] = "cobaltstrike" service_info["threat"] = "CobaltStrike" return service_info def _identify_service_by_port(self, port: int) -> str: """ 仅基于端口号识别服务 Args: port: 端口号 Returns: str: 服务名称 """ return self._identify_by_port(port).get("name", "unknown") def _identify_by_port(self, port: int) -> Dict[str, Any]: """ 基于端口号识别服务 Args: port: 端口号 Returns: Dict[str, Any]: 服务信息 """ port_service_map = { # 常规服务 21: {"name": "ftp", "description": "File Transfer Protocol"}, 22: {"name": "ssh", "description": "Secure Shell"}, 23: {"name": "telnet", "description": "Telnet"}, 25: {"name": "smtp", "description": "Simple Mail Transfer Protocol"}, 53: {"name": "dns", "description": "Domain Name System"}, 80: {"name": "http", "description": "HyperText Transfer Protocol"}, 110: {"name": "pop3", "description": "Post Office Protocol v3"}, 135: {"name": "msrpc", "description": "Microsoft RPC"}, 139: {"name": "netbios-ssn", "description": "NetBIOS Session Service"}, 143: {"name": "imap", "description": "Internet Message Access Protocol"}, 443: {"name": "https", "description": "HTTP Secure"}, 445: {"name": "smb", "description": "Server Message Block"}, 993: {"name": "imaps", "description": "IMAP Secure"}, 995: {"name": "pop3s", "description": "POP3 Secure"}, 1433: {"name": "mssql", "description": "Microsoft SQL Server"}, 3306: {"name": "mysql", "description": "MySQL Database"}, 3389: {"name": "rdp", "description": "Remote Desktop Protocol"}, 5432: {"name": "postgresql", "description": "PostgreSQL Database"}, 6379: {"name": "redis", "description": "Redis Database"}, 27017: {"name": "mongodb", "description": "MongoDB Database"}, # VPN端口 1194: {"name": "openvpn", "description": "OpenVPN", "category": "vpn"}, 1723: {"name": "pptp", "description": "PPTP VPN", "category": "vpn"}, 4500: {"name": "ipsec", "description": "IPSec VPN", "category": "vpn"}, 51820: {"name": "wireguard", "description": "WireGuard VPN", "category": "vpn"}, 500: {"name": "ike", "description": "IKE (IPSec)", "category": "vpn"}, # VNC端口 5800: {"name": "vnc-http", "description": "VNC HTTP", "category": "remote"}, 5900: {"name": "vnc", "description": "Virtual Network Computing", "category": "remote"}, 5901: {"name": "vnc", "description": "VNC Display 1", "category": "remote"}, 5902: {"name": "vnc", "description": "VNC Display 2", "category": "remote"}, 5903: {"name": "vnc", "description": "VNC Display 3", "category": "remote"}, 5904: {"name": "vnc", "description": "VNC Display 4", "category": "remote"}, 5905: {"name": "vnc", "description": "VNC Display 5", "category": "remote"}, # 远程管理工具 6568: {"name": "anydesk", "description": "AnyDesk Remote Desktop", "category": "remote"}, 5938: {"name": "teamviewer", "description": "TeamViewer", "category": "remote"}, 6129: {"name": "dameware", "description": "DameWare Remote Control", "category": "remote"}, 8200: {"name": "gotomypc", "description": "GoToMyPC", "category": "remote"}, # 恶意软件和后门端口 666: {"name": "malware", "description": "多种恶意软件", "category": "malware", "threat": "高"}, 1080: {"name": "socks-proxy", "description": "SOCKS代理/恶意软件", "category": "proxy", "threat": "中"}, 1234: {"name": "ultors-trojan", "description": "Ultors Trojan", "category": "malware", "threat": "高"}, 1243: {"name": "subseven", "description": "SubSeven Backdoor", "category": "malware", "threat": "高"}, 1337: {"name": "hacker-tools", "description": "Empire/CrackMapExec等黑客工具", "category": "malware", "threat": "高"}, 2222: {"name": "c2-channel", "description": "DoHC2/ExternalC2/Qakbot C2", "category": "malware", "threat": "高"}, 3000: {"name": "beef-panel", "description": "BeEF项目HTTP面板", "category": "malware", "threat": "中"}, 4444: {"name": "metasploit", "description": "Metasploit默认监听端口", "category": "malware", "threat": "高"}, 6666: {"name": "irc-botnet", "description": "IRC僵尸网络", "category": "malware", "threat": "高"}, 6667: {"name": "irc", "description": "IRC (可能是僵尸网络)", "category": "irc", "threat": "中"}, 8080: {"name": "http-proxy", "description": "HTTP代理/多种恶意软件", "category": "proxy", "threat": "中"}, 9050: {"name": "tor-socks", "description": "Tor SOCKS代理", "category": "proxy", "threat": "中"}, 12345: {"name": "netbus", "description": "NetBus Trojan", "category": "malware", "threat": "高"}, 31337: {"name": "elite-tools", "description": "SliverC2/Back Orifice", "category": "malware", "threat": "高"}, 50050: {"name": "cobaltstrike", "description": "CobaltStrike TeamServer", "category": "malware", "threat": "高"}, } return port_service_map.get(port, {"name": "unknown", "description": f"未知服务 (端口 {port})"}) async def test_scanner(): """测试扫描器功能""" config = ScanConfig() scanner = PortScanner(config) # 测试目标 target = ScanTarget(ip="127.0.0.1") try: result = await scanner.scan_target(target) print(f"扫描结果: {len(result)} 个开放端口") for port_info in result: print(f" 端口 {port_info.port}: {port_info.service}") if port_info.banner: print(f" Banner: {port_info.banner[:100]}...") except Exception as e: print(f"扫描失败: {e}") if __name__ == "__main__": asyncio.run(test_scanner())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/relaxcloud-cn/mcp-port-scanner'

If you have feedback or need assistance with the MCP directory API, please join our Discord server