We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/NAXG/ThreatMCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""微步在线威胁分析域名高级查询模块"""
import logging
from typing import Any, Dict, List
import mcp.types as types
from threatbook_mcp.response_handler import ThreatBookResponseHandler
logger = logging.getLogger("threatbook-mcp.domain_advanced")
class DomainAdvancedTool:
"""域名高级查询工具"""
def __init__(self, client):
self.client = client
def get_tool_definition(self) -> types.Tool:
"""获取工具定义"""
return types.Tool(
name="domain_advanced",
description="域名高级查询:获取域名的历史IP和历史Whois信息,用于深度溯源分析",
inputSchema={
"type": "object",
"properties": {
"domain": {
"type": "string",
"description": "要查询的域名"
},
"exclude": {
"type": "string",
"description": "可排除的字段,多个用逗号分隔:history_ips,history_whoises"
},
"lang": {
"type": "string",
"description": "返回结果语言,zh为中文,en为英文",
"enum": ["zh", "en"],
"default": "zh"
}
},
"required": ["domain"]
}
)
async def execute(self, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""执行域名高级查询"""
try:
domain = arguments.get("domain")
if not domain:
return [types.TextContent(
type="text",
text="错误:缺少必需的参数 'domain'"
)]
exclude = arguments.get("exclude", "")
lang = arguments.get("lang", "zh")
# 调用微步在线威胁分析API
result = await self.client.get_domain_advanced(domain, exclude, lang)
# 格式化结果
formatted_result = self.format_result(result)
return [types.TextContent(
type="text",
text=formatted_result
)]
except Exception as e:
logger.error(f"域名高级查询失败: {e}")
return [types.TextContent(
type="text",
text=f"查询失败: {str(e)}"
)]
def format_result(self, result: Dict[str, Any]) -> str:
"""格式化域名高级查询结果"""
# 使用响应处理器检查状态
is_success, error_msg = ThreatBookResponseHandler.check_response(result)
if not is_success:
return ThreatBookResponseHandler.format_error_message(result)
# 处理部分成功的情况
if error_msg:
# 部分成功,显示警告但继续处理
pass
domain = result.get("domain", "")
if not domain:
return "❌ 未找到域名信息"
output = [
f"🔍 域名高级查询结果",
f"",
f"🌐 域名: {domain}",
f""
]
# 历史IP信息
history_ips = result.get("history_ips", [])
if history_ips:
output.append("📅 历史解析IP记录:")
for i, ip_record in enumerate(history_ips[:10]): # 只显示前10个记录
date = ip_record.get("date", "")
ips = ip_record.get("ips", [])
output.append(f" 📅 {date}:")
if isinstance(ips, list):
for ip_info in ips[:5]: # 每个日期只显示前5个IP
ip = ip_info.get("ip", "")
carrier = ip_info.get("carrier", "")
location = ip_info.get("location", {})
ip_desc = f" • {ip}"
if location:
country = location.get("country", "")
province = location.get("province", "")
city = location.get("city", "")
location_str = country
if province:
location_str += f" {province}"
if city:
location_str += f" {city}"
if location_str:
ip_desc += f" ({location_str})"
if carrier:
ip_desc += f" - {carrier}"
output.append(ip_desc)
if len(ips) > 5:
output.append(f" • ... 还有 {len(ips) - 5} 个IP")
output.append("")
if len(history_ips) > 10:
output.append(f" 📈 还有 {len(history_ips) - 10} 个历史记录")
output.append("")
# 历史Whois信息
history_whoises = result.get("history_whoises", [])
if history_whoises:
output.append("📋 历史Whois记录:")
for i, whois_record in enumerate(history_whoises[:5]): # 只显示前5个记录
date = whois_record.get("date", "")
whois = whois_record.get("whois", {})
output.append(f" 📅 {date}:")
if whois:
registrar_name = whois.get("registrar_name", "")
if registrar_name:
output.append(f" • 注册商: {registrar_name}")
registrant_name = whois.get("registrant_name", "")
if registrant_name:
output.append(f" • 注册者: {registrant_name}")
registrant_company = whois.get("registrant_company", "")
if registrant_company:
output.append(f" • 注册机构: {registrant_company}")
registrant_email = whois.get("registrant_email", "")
if registrant_email:
output.append(f" • 注册邮箱: {registrant_email}")
cdate = whois.get("cdate", "")
if cdate:
output.append(f" • 注册时间: {cdate}")
edate = whois.get("edate", "")
if edate:
output.append(f" • 过期时间: {edate}")
name_server = whois.get("name_server", "")
if name_server:
# 分割多个域名服务器
servers = name_server.split("|")
if len(servers) > 1:
output.append(f" • 域名服务器:")
for server in servers[:3]: # 只显示前3个服务器
if server.strip():
output.append(f" - {server.strip()}")
if len(servers) > 3:
output.append(f" - ... 还有 {len(servers) - 3} 个服务器")
else:
output.append(f" • 域名服务器: {name_server}")
output.append("")
if len(history_whoises) > 5:
output.append(f" 📈 还有 {len(history_whoises) - 5} 个历史Whois记录")
output.append("")
# 详细报告链接
permalink = result.get("permalink", "")
if permalink:
output.append(f"📋 详细报告: {permalink}")
# 如果没有任何历史信息
if not history_ips and not history_whoises:
output.append("ℹ️ 未找到相关历史信息")
return "\n".join(output)