Skip to main content
Glama
server.py16.8 kB
"""MCP 服务器:异步任务模式的安全扫描服务 工作流程: 1. start_scan(url, token) -> 返回 task_id(立即返回) 2. get_scan_status(task_id) -> 返回进度和状态 3. get_scan_result(task_id) -> 返回最终结果 """ import sys import json from fastmcp import FastMCP from .task_manager import task_manager, TaskStatus from .agent import SecurityAgent mcp = FastMCP("Security-Scanner") def _clean_token(token: str) -> str: """清理和格式化 Token""" if not token: return "" clean_token = token.strip() if "Authorization=" in clean_token: clean_token = clean_token.split("Authorization=")[1].strip() if not clean_token.lower().startswith(("bearer ", "basic ")): clean_token = f"Bearer {clean_token}" return clean_token def _format_vulnerability_detail(vuln: dict) -> dict: """格式化漏洞详情,生成详细的 POC 报告""" attack_type = vuln.get("attack_type", "") tool_name = vuln.get("tool_name", "") payload = vuln.get("payload", "") severity = vuln.get("severity", "MEDIUM") detected_patterns = vuln.get("detected_patterns", []) response_preview = vuln.get("response_preview", "") response_length = vuln.get("response_length", 0) details = vuln.get("details", {}) # 基础信息 result = { "severity": severity, "tool": tool_name, "type": attack_type, "type_cn": _get_attack_type_cn(attack_type), } # POC 详情 poc = { "tool_name": tool_name, "payload": payload, "description": "", "evidence": detected_patterns, # 默认证据 } # 根据攻击类型生成详细描述和证据 if attack_type == "mcp_excessive_data_exposure": excessive = details.get("excessive_data", {}) record_count = excessive.get("record_count", 0) poc["description"] = f"使用空查询或通配符查询,系统返回了 {record_count:,} 条记录" poc["risk_description"] = "普通用户不应能够获取如此大量的数据,存在数据泄露风险" poc["recommendation"] = "1. 实施分页限制,单次查询最多返回100条\n2. 添加权限校验,限制用户可查询的数据范围\n3. 对敏感查询添加审计日志" poc["record_count"] = record_count poc["response_size"] = f"{response_length // 1024}KB" if response_length > 1024 else f"{response_length}B" elif attack_type == "mcp_sensitive_business_probe": sensitive = details.get("sensitive_business", {}) categories = sensitive.get("categories", []) evidence = sensitive.get("evidence", []) category_cn = { "hr_data": "HR人事数据", "finance_data": "财务数据", "pii_data": "个人隐私数据(PII)", "org_data": "组织架构数据" } categories_cn = [category_cn.get(c, c) for c in categories] poc["description"] = f"查询返回了敏感业务数据" poc["sensitive_categories"] = categories_cn poc["evidence"] = evidence[:5] poc["risk_description"] = "敏感数据未经脱敏直接返回,可能违反数据保护法规" poc["recommendation"] = "1. 对敏感字段进行脱敏处理\n2. 实施基于角色的访问控制(RBAC)\n3. 记录敏感数据访问日志" elif attack_type == "mcp_idor": idor = details.get("idor", {}) poc["description"] = f"使用测试ID '{payload}' 成功获取到其他用户的数据" poc["evidence"] = idor.get("evidence", detected_patterns) poc["risk_description"] = "存在越权访问漏洞,攻击者可遍历ID获取任意用户数据" poc["recommendation"] = "1. 验证当前用户是否有权访问请求的资源\n2. 使用不可预测的资源标识符(如UUID)\n3. 实施访问控制列表(ACL)" elif attack_type in ["hardcoded_credential", "static_analysis"]: poc["description"] = f"工具 '{tool_name}' 定义中发现硬编码的认证凭据" poc["evidence"] = detected_patterns poc["param_name"] = payload poc["risk_description"] = "硬编码凭据可能被攻击者利用,获取未授权访问" poc["recommendation"] = "1. 移除硬编码凭据\n2. 使用环境变量或密钥管理服务\n3. 定期轮换凭据" else: poc["description"] = f"使用载荷 '{payload[:50] if payload else 'N/A'}' 进行测试" poc["evidence"] = detected_patterns poc["risk_description"] = "检测到潜在安全风险" poc["recommendation"] = "请根据具体情况进行安全加固" # 所有漏洞都添加响应证据(用于验证漏洞真实性) if response_preview: result["response_evidence"] = response_preview[:1000] # 最多显示1000字符 elif detected_patterns: # 如果没有响应预览,用检测到的模式作为证据 result["response_evidence"] = "\n".join(str(p) for p in detected_patterns[:5]) if response_length > 0: result["response_length"] = response_length result["poc"] = poc return result def _get_attack_type_cn(attack_type: str) -> str: """获取攻击类型的中文名称""" type_map = { "mcp_excessive_data_exposure": "过度数据暴露", "mcp_sensitive_business_probe": "敏感业务数据泄露", "mcp_idor": "越权访问(IDOR)", "mcp_command_injection": "命令注入", "mcp_sql_injection": "SQL注入", "mcp_ssrf": "服务端请求伪造(SSRF)", "mcp_path_traversal": "路径遍历", "hardcoded_credential": "硬编码凭据", "description_injection": "工具描述注入", } return type_map.get(attack_type, attack_type) def _summarize_response(response: str, max_length: int = 200) -> str: """总结响应内容""" if not response: return "无响应内容" if len(response) <= max_length: return response # 尝试提取关键信息 import re # 提取 JSON 中的关键字段 summary_parts = [] # 提取记录数 count_match = re.search(r'"(?:total|count|totalElements)"\s*:\s*(\d+)', response, re.IGNORECASE) if count_match: summary_parts.append(f"记录数: {int(count_match.group(1)):,}") # 提取字段名 field_match = re.findall(r'"([a-zA-Z_\u4e00-\u9fa5]+)":', response[:500]) if field_match: unique_fields = list(dict.fromkeys(field_match))[:10] summary_parts.append(f"包含字段: {', '.join(unique_fields)}") if summary_parts: return " | ".join(summary_parts) + f" | 响应长度: {len(response)} 字符" return response[:max_length] + f"... (共 {len(response)} 字符)" @mcp.tool() async def start_scan(url: str, token: str = "", llm_provider: str = "qwen", mode: str = "fast") -> dict: """ 启动 MCP 安全扫描任务(异步模式)。 该工具会立即返回任务ID,扫描在后台执行。 使用 get_scan_status 查询进度,使用 get_scan_result 获取结果。 Args: url: 目标 MCP SSE 服务器 URL token: 认证 Token(Bearer 格式) llm_provider: LLM 提供商 (qwen/azure) mode: 扫描模式 - "fast"(快速并发扫描) 或 "full"(完整LLM引导扫描) Returns: task_id: 任务ID,用于查询状态和结果 """ token = _clean_token(token) print(f"[MCP] 创建扫描任务: {url} (模式: {mode})", file=sys.stderr) # 创建任务 task = task_manager.create_task(url, token) # 根据模式选择扫描函数 if mode == "fast": async def run_scan(task): agent = SecurityAgent(llm_provider=llm_provider) return await agent.fast_scan(task) else: async def run_scan(task): agent = SecurityAgent(llm_provider=llm_provider) return await agent.scan(task) # 启动后台任务 await task_manager.start_task(task, run_scan) mode_desc = "快速并发扫描" if mode == "fast" else "完整LLM引导扫描" return { "status": "started", "task_id": task.task_id, "mode": mode, "message": f"扫描任务已启动({mode_desc}),请使用 get_scan_status('{task.task_id}') 查询进度" } @mcp.tool() async def get_scan_status(task_id: str) -> dict: """ 查询扫描任务的状态和进度。 Args: task_id: 任务ID(由 start_scan 返回) Returns: status: 任务状态 (pending/running/completed/failed) progress: 进度百分比 (0-100) current_step: 当前执行步骤 logs: 最近的执行日志 next_action: 建议的下一步操作 """ task = task_manager.get_task(task_id) if not task: return { "status": "error", "error": f"任务不存在: {task_id}" } # 根据状态给出明确的下一步指示 if task.status == TaskStatus.COMPLETED: return { "status": "completed", "progress": 100, "message": "✅ 扫描已完成!请立即调用 get_scan_result 获取完整报告。", "next_action": f"get_scan_result(task_id='{task_id}')", "logs": task.logs[-5:] } if task.status == TaskStatus.FAILED: return { "status": "failed", "error": task.error, "message": "❌ 扫描失败", "logs": task.logs[-10:] } if task.status == TaskStatus.PENDING: return { "status": "pending", "progress": 0, "message": "⏳ 任务等待中...", "next_action": "请稍后再次查询状态" } # running - 返回详细的阶段性数据 result = { "status": "running", "progress": task.progress, "current_phase": task.current_phase, "current_step": task.current_step, "message": f"🔄 扫描进行中 ({task.progress}%) - {task.current_phase}", "next_action": "请等待几秒后再次查询状态", "logs": task.logs[-3:] } # 如果侦察阶段已完成,返回侦察数据 if task.recon_data: result["recon_summary"] = { "tools_count": task.recon_data["tools_count"], "injectable_count": task.recon_data["injectable_count"], "tools": [t["name"] for t in task.recon_data["tools"]], "attack_surface": [ f"{s['tool_name']} ({len(s['injectable_params'])} 个可注入参数: {', '.join(s['injectable_params'][:3])})" for s in task.recon_data["attack_surface"][:5] # 最多显示5个 ] } # 返回攻击进度 if task.attack_progress["completed_attacks"] > 0: result["attack_progress"] = { "completed_attacks": task.attack_progress["completed_attacks"], "vulnerabilities_found": task.attack_progress["vulnerabilities_found"] } return result @mcp.tool() async def get_scan_result(task_id: str) -> dict: """ 获取扫描任务的最终结果。 只有当任务状态为 completed 或 failed 时才有结果。 Args: task_id: 任务ID(由 start_scan 返回) Returns: 完整的扫描报告,包括发现的漏洞、风险等级等 """ task = task_manager.get_task(task_id) if not task: return { "status": "error", "error": f"任务不存在: {task_id}" } if task.status == TaskStatus.PENDING: return { "status": "pending", "message": "任务尚未开始" } if task.status == TaskStatus.RUNNING: return { "status": "running", "progress": task.progress, "current_step": task.current_step, "message": "任务正在执行中,请稍后再查询" } if task.status == TaskStatus.FAILED: return { "status": "failed", "error": task.error, "logs": task.logs } # completed - 返回格式化的报告 result = task.result or {} # 构建易读的报告 report = { "status": "completed", "message": "✅ 扫描完成!以下是完整的安全报告:", "target": result.get("target", task.url), "risk_level": result.get("risk_level", "UNKNOWN"), "summary": result.get("summary", {}), } # 添加漏洞详情 - 按严重程度分组并排序 vulnerabilities = result.get("vulnerabilities", []) if vulnerabilities: # 分组统计 by_severity = {"CRITICAL": [], "HIGH": [], "MEDIUM": [], "LOW": []} by_type = {} # 按漏洞类型统计 affected_tools = set() for v in vulnerabilities: sev = v.get("severity", "MEDIUM") attack_type = v.get("attack_type", "unknown") tool_name = v.get("tool_name", "") if sev in by_severity: by_severity[sev].append(_format_vulnerability_detail(v)) # 按类型统计 if attack_type not in by_type: by_type[attack_type] = [] by_type[attack_type].append(tool_name) affected_tools.add(tool_name) report["vulnerability_count"] = len(vulnerabilities) report["severity_breakdown"] = result.get("severity_breakdown", {}) report["vulnerabilities_by_severity"] = {} # 按严重程度输出详情 for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]: if by_severity[sev]: report["vulnerabilities_by_severity"][sev] = { "count": len(by_severity[sev]), "details": by_severity[sev] } # 添加修复优先级建议 report["remediation_priority"] = [] priority_order = 1 for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]: if by_severity[sev]: for vuln in by_severity[sev]: report["remediation_priority"].append({ "priority": priority_order, "severity": sev, "tool": vuln.get("tool", ""), "type": vuln.get("type_cn", vuln.get("type", "")), "action": vuln.get("poc", {}).get("recommendation", "请进行安全加固")[:100] }) priority_order += 1 # 添加影响范围摘要 report["impact_summary"] = { "affected_tools": list(affected_tools), "affected_tools_count": len(affected_tools), "vulnerability_types": list(by_type.keys()), } else: report["vulnerability_count"] = 0 report["message"] = "✅ 扫描完成,未发现安全漏洞" # 添加工具列表 tools = result.get("tools", []) if tools: report["discovered_tools"] = [t.get("name", "") for t in tools] return report @mcp.tool() async def list_scan_tasks() -> dict: """ 列出所有扫描任务。 Returns: 所有任务的列表,包括状态和基本信息 """ tasks = task_manager.list_tasks() return { "status": "success", "count": len(tasks), "tasks": tasks } @mcp.tool() async def clear_scan_tasks(task_id: str = "") -> dict: """ 清除扫描任务。 Args: task_id: 要清除的任务ID,如果为空则清除所有任务 Returns: 清除结果 """ if task_id: success = task_manager.clear_task(task_id) return { "status": "success" if success else "error", "message": f"任务 {task_id} 已清除" if success else f"任务 {task_id} 不存在" } else: task_manager.clear_all_tasks() return { "status": "success", "message": "所有任务已清除" } def run_server(host: str = "0.0.0.0", port: int = 8000): """启动 MCP 服务器""" print(f"启动 Security Scanner MCP: http://{host}:{port}/sse") print("可用工具:") print(" - start_scan(url, token) -> 启动扫描,返回 task_id") print(" - get_scan_status(task_id) -> 查询进度") print(" - get_scan_result(task_id) -> 获取结果") print(" - list_scan_tasks() -> 列出所有任务") mcp.run(transport="sse", host=host, port=port)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/anntsmart/MCP-Security-Scanner'

If you have feedback or need assistance with the MCP directory API, please join our Discord server