Skip to main content
Glama
scanner.py6.64 kB
""" Nmap 扫描器模块 """ import asyncio import subprocess import xml.etree.ElementTree as ET from typing import Optional import shlex from config import config from models import ScanResult, HostInfo, PortInfo, TaskType class NmapScanner: """Nmap 扫描器""" def __init__(self, nmap_path: str = None): self.nmap_path = nmap_path or config.nmap_path def build_quick_scan_command(self, target: str) -> list[str]: """构建快速扫描命令""" return [ self.nmap_path, "-F", # 快速扫描(常用端口) "-T4", # 较快的扫描速度 "-oX", "-", # XML 输出到 stdout target ] def build_full_scan_command(self, target: str) -> list[str]: """构建全量扫描命令""" return [ self.nmap_path, "-p", "1-65535", # 所有端口 "-T4", # 较快的扫描速度 "-sV", # 服务版本检测 "-oX", "-", # XML 输出到 stdout target ] def build_custom_scan_command(self, command: str) -> list[str]: """构建自定义扫描命令""" # 解析用户提供的命令 parts = shlex.split(command) # 如果用户没有指定 nmap,添加它 if parts and parts[0] != "nmap" and parts[0] != self.nmap_path: parts.insert(0, self.nmap_path) elif parts and parts[0] == "nmap": parts[0] = self.nmap_path return parts async def run_scan( self, command: list[str], task_type: TaskType, ) -> ScanResult: """执行扫描""" try: # 使用 asyncio 运行子进程 process = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode != 0: error_msg = stderr.decode('utf-8', errors='replace') raise RuntimeError(f"Nmap 执行失败: {error_msg}") output = stdout.decode('utf-8', errors='replace') # 根据任务类型决定是否解析 XML if task_type in (TaskType.QUICK, TaskType.FULL): # 尝试解析 XML 输出 return self._parse_xml_output(output, command) else: # 自定义命令,返回原始输出 return self._create_raw_result(output, command) except Exception as e: raise RuntimeError(f"扫描执行错误: {str(e)}") def _parse_xml_output(self, xml_output: str, command: list[str]) -> ScanResult: """解析 Nmap XML 输出""" try: root = ET.fromstring(xml_output) except ET.ParseError: # XML 解析失败,返回原始输出 return self._create_raw_result(xml_output, command) # 获取扫描信息 runstats = root.find('runstats') scan_time = "unknown" if runstats is not None: finished = runstats.find('finished') if finished is not None: elapsed = finished.get('elapsed', 'unknown') scan_time = f"{elapsed}s" # 获取目标 args = root.get('args', '') target = args.split()[-1] if args else 'unknown' # 解析主机信息 hosts = [] for host_elem in root.findall('host'): host_info = self._parse_host(host_elem) if host_info: hosts.append(host_info) return ScanResult( target=target, scan_time=scan_time, hosts=hosts, raw_output=xml_output, command=' '.join(command), ) def _parse_host(self, host_elem: ET.Element) -> Optional[HostInfo]: """解析单个主机信息""" # 获取状态 status_elem = host_elem.find('status') status = status_elem.get('state', 'unknown') if status_elem is not None else 'unknown' # 获取地址 address = "unknown" for addr_elem in host_elem.findall('address'): if addr_elem.get('addrtype') == 'ipv4': address = addr_elem.get('addr', 'unknown') break elif addr_elem.get('addrtype') == 'ipv6': address = addr_elem.get('addr', 'unknown') # 获取主机名 hostname = "" hostnames_elem = host_elem.find('hostnames') if hostnames_elem is not None: hostname_elem = hostnames_elem.find('hostname') if hostname_elem is not None: hostname = hostname_elem.get('name', '') # 解析端口信息 ports = [] ports_elem = host_elem.find('ports') if ports_elem is not None: for port_elem in ports_elem.findall('port'): port_info = self._parse_port(port_elem) if port_info: ports.append(port_info) return HostInfo( address=address, status=status, hostname=hostname, ports=ports, ) def _parse_port(self, port_elem: ET.Element) -> Optional[PortInfo]: """解析端口信息""" port_id = int(port_elem.get('portid', 0)) protocol = port_elem.get('protocol', 'tcp') # 获取状态 state_elem = port_elem.find('state') state = state_elem.get('state', 'unknown') if state_elem is not None else 'unknown' # 获取服务信息 service_elem = port_elem.find('service') service = "" version = "" if service_elem is not None: service = service_elem.get('name', '') product = service_elem.get('product', '') ver = service_elem.get('version', '') if product or ver: version = f"{product} {ver}".strip() return PortInfo( port=port_id, protocol=protocol, state=state, service=service, version=version, ) def _create_raw_result(self, output: str, command: list[str]) -> ScanResult: """创建原始输出结果""" # 尝试从命令中提取目标 target = command[-1] if command else "unknown" return ScanResult( target=target, scan_time="unknown", hosts=[], raw_output=output, command=' '.join(command), ) # 全局扫描器实例 scanner = NmapScanner()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/flagify-com/nmap-mcp-http'

If you have feedback or need assistance with the MCP directory API, please join our Discord server