"""微步在线威胁分析漏洞情报模块"""
import logging
from typing import Any, Dict, List
import mcp.types as types
from threatbook_mcp.response_handler import ThreatBookResponseHandler
logger = logging.getLogger("threatbook-mcp.vulnerability")
class VulnerabilityTool:
"""漏洞情报工具"""
def __init__(self, client):
self.client = client
def get_tool_definition(self) -> types.Tool:
"""获取工具定义"""
return types.Tool(
name="vulnerability",
description="漏洞情报:获取公开漏洞的基础信息、风险评估、PoC、处置建议、补丁等信息",
inputSchema={
"type": "object",
"properties": {
"vuln_id": {
"type": "string",
"description": "漏洞编号,支持XVE、CVE、CNVD、CNNVD或NVDB编号,支持批量查询(逗号分隔,最多100个)"
},
"vendor": {
"type": "string",
"description": "漏洞影响厂商,精确查询"
},
"product": {
"type": "string",
"description": "漏洞影响产品,支持批量查询(逗号分隔,最多100个)"
},
"component_name": {
"type": "string",
"description": "漏洞影响组件"
},
"update_time": {
"type": "string",
"description": "按更新时间筛选:30d/7d/3d/1d",
"enum": ["30d", "7d", "3d", "1d"]
},
"is_highrisk": {
"type": "boolean",
"description": "是否只返回高风险漏洞"
},
"limit": {
"type": "integer",
"description": "每页数据量,默认10条,最大50条",
"minimum": 1,
"maximum": 50,
"default": 10
},
"cursor": {
"type": "string",
"description": "翻页标识,用于获取下一页数据"
}
},
"required": []
}
)
async def execute(self, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""执行漏洞情报查询"""
try:
# 调用微步在线威胁分析API
result = await self.client.get_vulnerability_info(arguments)
# 格式化结果
formatted_result = self.format_result(result)
return [types.TextContent(
type="text",
text=formatted_result
)]
except Exception as e:
logger.error(f"漏洞情报查询失败: {e}")
return [types.TextContent(
type="text",
text=f"查询失败: {str(e)}"
)]
def format_result(self, result: Dict[str, Any]) -> str:
"""格式化漏洞情报结果"""
# 使用响应处理器检查状态
is_success, error_msg = ThreatBookResponseHandler.check_response(result)
if not is_success:
return ThreatBookResponseHandler.format_error_message(result)
# 处理部分成功的情况
if error_msg:
# 部分成功,显示警告但继续处理
pass
total_records = result.get("total_records", 0)
cursor = result.get("cursor", "")
items = result.get("items", [])
output = [
f"🔍 漏洞情报查询结果",
f"",
f"📊 总计: {total_records} 个漏洞",
f"📄 当前页: {len(items)} 个漏洞",
]
if cursor:
output.append(f"📖 下一页标识: {cursor}")
if not items:
output.append("")
output.append("❌ 未找到匹配的漏洞信息")
return "\n".join(output)
output.append("")
output.append("=" * 50)
for i, item in enumerate(items, 1):
output.append("")
output.append(f"🔸 漏洞 {i}")
# 基本信息
basic_info = item.get("basic_info", {})
xve_id = basic_info.get("xve_id", "")
cve_id = basic_info.get("cve_id", "")
vuln_name = basic_info.get("vuln_name", "")
vuln_category = basic_info.get("vuln_category", "")
publish_time = basic_info.get("publish_time", "")
if xve_id:
output.append(f" 🆔 XVE编号: {xve_id}")
if cve_id:
output.append(f" 🆔 CVE编号: {cve_id}")
if vuln_name:
output.append(f" 📝 漏洞名称: {vuln_name}")
if vuln_category:
output.append(f" 📂 漏洞分类: {vuln_category}")
if publish_time:
output.append(f" 📅 发布时间: {publish_time}")
# 风险评估
evaluation = item.get("evaluation", {})
x_vpt = evaluation.get("x_vpt", {})
if x_vpt:
vpr = x_vpt.get("vpr", "")
risk_level = x_vpt.get("risk_level", "")
if vpr:
risk_icon = "🔴" if risk_level == "高风险" else "🟡" if risk_level == "中风险" else "🟢"
output.append(f" {risk_icon} 风险评分: {vpr}/10.0 ({risk_level})")
# 情报信息
intelligence = item.get("intelligence", {})
if intelligence:
has_poc = intelligence.get("has_poc_public", False)
has_kev = intelligence.get("has_kev", False)
is_highrisk = intelligence.get("is_highrisk", False)
indicators = []
if has_poc:
indicators.append("🔧 有公开PoC")
if has_kev:
indicators.append("🚨 在野利用")
if is_highrisk:
indicators.append("⚠️ 高风险")
if indicators:
output.append(f" 🏷️ 标签: {' | '.join(indicators)}")
# 影响范围
impact = item.get("impact", {})
if impact:
affected_vendors = impact.get("affected_vendors_products", [])
if affected_vendors:
vendors = [v.get("vendor", "") for v in affected_vendors[:3]] # 只显示前3个
if vendors:
vendor_text = ", ".join(filter(None, vendors))
if len(affected_vendors) > 3:
vendor_text += f" 等{len(affected_vendors)}个厂商"
output.append(f" 🏢 影响厂商: {vendor_text}")
# 详情链接
link = item.get("link", "")
if link:
output.append(f" 🔗 详情: {link}")
if i < len(items): # 不是最后一个
output.append("")
output.append("-" * 30)
return "\n".join(output)