We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/NAXG/ThreatMCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""微步在线威胁分析子域名查询模块"""
import logging
from typing import Any, Dict, List
import mcp.types as types
from threatbook_mcp.response_handler import ThreatBookResponseHandler
logger = logging.getLogger("threatbook-mcp.subdomain")
class SubdomainTool:
"""子域名查询工具"""
def __init__(self, client):
self.client = client
def get_tool_definition(self) -> types.Tool:
"""获取工具定义"""
return types.Tool(
name="subdomain",
description="子域名查询:获取域名的所有子域名信息,用于资产发现和安全评估",
inputSchema={
"type": "object",
"properties": {
"domain": {
"type": "string",
"description": "要查询的域名"
},
"lang": {
"type": "string",
"description": "返回结果语言,zh为中文,en为英文",
"enum": ["zh", "en"],
"default": "zh"
}
},
"required": ["domain"]
}
)
async def execute(self, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""执行子域名查询"""
try:
domain = arguments.get("domain")
if not domain:
return [types.TextContent(
type="text",
text="错误:缺少必需的参数 'domain'"
)]
lang = arguments.get("lang", "zh")
# 调用微步在线威胁分析API
result = await self.client.get_subdomain(domain, lang)
# 格式化结果
formatted_result = self.format_result(result)
return [types.TextContent(
type="text",
text=formatted_result
)]
except Exception as e:
logger.error(f"子域名查询失败: {e}")
return [types.TextContent(
type="text",
text=f"查询失败: {str(e)}"
)]
def format_result(self, result: Dict[str, Any]) -> str:
"""格式化子域名查询结果"""
# 使用响应处理器检查状态
is_success, error_msg = ThreatBookResponseHandler.check_response(result)
if not is_success:
return ThreatBookResponseHandler.format_error_message(result)
# 处理部分成功的情况
if error_msg:
# 部分成功,显示警告但继续处理
pass
domain = result.get("domain", "")
if not domain:
return "❌ 未找到域名信息"
output = [
f"🔍 子域名查询结果",
f"",
f"🌐 主域名: {domain}",
f""
]
# 子域名信息
sub_domains = result.get("sub_domains", {})
if sub_domains:
total = sub_domains.get("total", "0")
data = sub_domains.get("data", [])
output.append(f"📊 子域名统计:")
output.append(f" • 总数量: {total}")
output.append(f" • 本次返回: {len(data) if data else 0}")
output.append("")
if data:
output.append("📋 子域名列表:")
# 按字母顺序排序
sorted_subdomains = sorted(data) if isinstance(data, list) else []
# 分类显示子域名
www_domains = []
mail_domains = []
api_domains = []
other_domains = []
for subdomain in sorted_subdomains:
if subdomain.startswith("www."):
www_domains.append(subdomain)
elif any(prefix in subdomain.lower() for prefix in ["mail", "mx", "smtp", "pop", "imap"]):
mail_domains.append(subdomain)
elif any(prefix in subdomain.lower() for prefix in ["api", "rest", "service", "endpoint"]):
api_domains.append(subdomain)
else:
other_domains.append(subdomain)
# 显示分类结果
if www_domains:
output.append(" 🌐 Web服务:")
for subdomain in www_domains[:10]: # 只显示前10个
output.append(f" • {subdomain}")
if len(www_domains) > 10:
output.append(f" • ... 还有 {len(www_domains) - 10} 个")
output.append("")
if mail_domains:
output.append(" 📧 邮件服务:")
for subdomain in mail_domains[:5]: # 只显示前5个
output.append(f" • {subdomain}")
if len(mail_domains) > 5:
output.append(f" • ... 还有 {len(mail_domains) - 5} 个")
output.append("")
if api_domains:
output.append(" 🔌 API服务:")
for subdomain in api_domains[:10]: # 只显示前10个
output.append(f" • {subdomain}")
if len(api_domains) > 10:
output.append(f" • ... 还有 {len(api_domains) - 10} 个")
output.append("")
if other_domains:
output.append(" 🔧 其他服务:")
# 根据总数量决定显示多少个
display_count = min(20, len(other_domains))
for subdomain in other_domains[:display_count]:
output.append(f" • {subdomain}")
if len(other_domains) > display_count:
output.append(f" • ... 还有 {len(other_domains) - display_count} 个")
output.append("")
# 安全建议
if len(sorted_subdomains) > 50:
output.append("⚠️ 安全提醒:")
output.append(" • 子域名数量较多,建议定期检查是否存在未使用的子域名")
output.append(" • 确保所有子域名都有适当的安全配置")
output.append(" • 考虑使用子域名监控工具")
output.append("")
else:
output.append("ℹ️ 未找到子域名信息")
# 详细报告链接
permalink = result.get("permalink", "")
if permalink:
output.append(f"📋 详细报告: {permalink}")
return "\n".join(output)