We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/dadepo/whois-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import logging
from typing import Annotated, Any
import aiohttp
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.session import ServerSession
from pydantic import Field
from whois_mcp.cache import TTLCache
from whois_mcp.config import HTTP_TIMEOUT_SECONDS, RIPE_REST_BASE, USER_AGENT
__all__ = ["register"]
# Configure logging
logger = logging.getLogger(__name__)
# Initialize cache with 5-minute TTL for AS-SET results
_as_set_cache: TTLCache[str, Any] = TTLCache(max_items=1000, ttl_seconds=300.0)
# Tool metadata constants
TOOL_NAME = "ripe_expand_as_set"
TOOL_DESCRIPTION = (
"Efficiently expand AS-SET objects from the RIPE NCC database into concrete ASNs with configurable depth. "
"This tool is specifically for the RIPE RIR (Europe/Middle East/Central Asia region). "
"Use this instead of whois_query when you need ASNs from a RIPE AS-SET. "
"CRITICAL: For 'top-level', 'direct', or 'immediate' members, use max_depth=1. "
"For complete expansion, use max_depth=10+. Large AS-SETs like 'AS-RETN' have hundreds "
"of nested AS-SETs - choose depth carefully to balance completeness vs speed. "
"Automatically handles recursive expansion, deduplication, and cycle detection. "
"Perfect for network analysis, route filtering, and policy generation for RIPE-managed AS-SETs."
)
SETNAME_DESCRIPTION = (
"AS-SET name to recursively expand into concrete ASN numbers from RIPE database. "
"Examples: 'AS-CLOUDFLARE', 'AS-GOOGLE', 'AS-RETN'. "
"The tool will automatically resolve all nested AS-SETs and return a complete "
"list of individual ASNs contained within the hierarchy from RIPE NCC records."
)
MAX_DEPTH_DESCRIPTION = (
"Maximum recursion depth for AS-SET expansion (1-20 levels, default: 10). "
"IMPORTANT: Use depth=1 for 'top-level' or 'direct' members only. "
"Use depth=2-3 for shallow analysis, depth=10 for complete expansion. "
"Higher values provide more complete results but take much longer to process. "
"For questions about 'immediate' or 'direct' members, always use depth=1."
)
async def _get_json(url: str) -> dict[str, Any]:
"""Fetch JSON data from the specified URL."""
try:
async with aiohttp.ClientSession(
headers={"User-Agent": USER_AGENT},
timeout=aiohttp.ClientTimeout(total=HTTP_TIMEOUT_SECONDS),
) as session:
async with session.get(url) as response:
response.raise_for_status()
logger.debug(f"Successfully fetched JSON from {url}")
return await response.json()
except aiohttp.ClientResponseError as e:
if e.status == 404:
# Return empty structure for 404s (AS-SET not found)
logger.debug(f"AS-SET not found (404): {url}")
return {"objects": {"object": []}}
else:
logger.error(f"HTTP error fetching JSON from {url}: {str(e)}")
raise
except aiohttp.ClientError as e:
logger.error(f"Failed to fetch JSON from {url}: {str(e)}")
raise
def _attrs(obj: dict[str, Any], name: str) -> list[str]:
"""Extract attribute values by name from an object."""
return [
attr.get("value", "").strip()
for attr in obj.get("attributes", {}).get("attribute", [])
if attr.get("name") == name and attr.get("value", "").strip()
]
async def _expand_as_set_recursive(
setname: str,
seen: set[str],
out_asns: set[int],
found_sets: set[str],
depth: int = 0,
max_depth: int = 10,
) -> None:
"""Recursively expand an AS-SET into concrete ASNs (cycle-safe, depth-limited).
Args:
setname: The AS-SET name to expand
seen: Set of already-visited AS-SETs (for cycle detection)
out_asns: Output set where discovered ASNs are accumulated
found_sets: Set of AS-SETs that were successfully found in the database
depth: Current recursion depth
max_depth: Maximum recursion depth
"""
if depth >= max_depth:
logger.warning(
f"Maximum recursion depth ({max_depth}) reached for AS-SET: {setname}"
)
return
if setname in seen:
logger.debug(f"Cycle detected for AS-SET: {setname}")
return
seen.add(setname)
cache_key = f"as_set:{setname}"
cached_result = _as_set_cache.get(cache_key)
if cached_result:
logger.debug(f"Cache hit for AS-SET: {setname}")
out_asns.update(cached_result)
found_sets.add(setname) # Mark as found
return
try:
url = f"{RIPE_REST_BASE}/ripe/as-set/{setname}.json"
data = await _get_json(url)
objects = data.get("objects", {}).get("object", [])
if not objects:
logger.warning(f"No objects found for AS-SET: {setname}")
return
# Mark this AS-SET as found (it exists in the database)
found_sets.add(setname)
obj = objects[0]
asns: set[int] = set()
for member in _attrs(obj, "members"):
if member.upper().startswith("AS") and member[2:].isdigit():
asns.add(int(member[2:]))
elif member.upper().startswith("AS-"):
try:
await _expand_as_set_recursive(
member, seen, asns, found_sets, depth + 1, max_depth
)
except Exception as e:
logger.warning(f"Failed to expand nested AS-SET {member}: {str(e)}")
# Continue with other members even if one fails
_as_set_cache.set(cache_key, asns)
out_asns.update(asns)
logger.info(f"Successfully expanded AS-SET {setname} with {len(asns)} ASNs")
except Exception as e:
logger.error(f"Failed to expand AS-SET {setname}: {str(e)}")
raise
async def _expand_as_set_request(
setname: Annotated[str, Field(description=SETNAME_DESCRIPTION)],
max_depth: Annotated[
int,
Field(description=MAX_DEPTH_DESCRIPTION, ge=1, le=20),
] = 10,
*,
ctx: Context[ServerSession, None],
) -> dict[str, Any]:
"""
Expand an AS-SET into all concrete ASNs and return the result in a structured format.
This tool recursively resolves AS-SET hierarchies with configurable depth limits,
automatically handling cycles and deduplication. Much more efficient than multiple
whois_query calls when you need complete AS-SET membership information.
Args:
setname: The AS-SET name to expand (e.g., 'AS-RETN')
max_depth: Maximum recursion depth (1-20, default: 10). Use lower values (3-5)
for quick analysis, higher values (15-20) for comprehensive expansion.
Most AS-SETs are fully resolved within 10 levels.
Returns: {"ok": true, "data": {"as_set": "...", "asns": [1234, 5678, ...], "count": N}}
Performance Notes:
- Depth 1: FAST - Only direct/immediate members (use for "top-level" questions)
- Depth 2-3: Quick overview with one level of nesting
- Depth 6-10: Balanced, handles most real-world AS-SETs completely
- Depth 11-20: Comprehensive, may be very slow for large AS-SETs like AS-RETN
"""
# Log the incoming request
await ctx.info(
f"Starting AS-SET expansion for '{setname}' with max_depth={max_depth}"
)
# Check cache first
cache_key = f"as_set:{setname}"
cached_result = _as_set_cache.get(cache_key)
if cached_result is not None:
logger.info(f"AS-SET expansion for '{setname}' served from cache")
await ctx.info(
f"AS-SET expansion for '{setname}' served from cache ({len(cached_result)} ASNs)"
)
return {
"ok": True,
"data": {
"as_set": setname,
"asns": sorted(cached_result),
"count": len(cached_result),
},
}
try:
seen: set[str] = set()
asns: set[int] = set()
found_sets: set[str] = set() # Track which AS-SETs were actually found
await _expand_as_set_recursive(setname, seen, asns, found_sets, 0, max_depth)
# Determine the status based on whether the AS-SET was found and has members
if setname not in found_sets:
# AS-SET doesn't exist in the database
logger.info(f"AS-SET '{setname}' not found in database")
await ctx.info(
f"AS-SET expansion completed: not-found ('{setname}' does not exist in database)"
)
result: dict[str, Any] = {
"ok": True,
"data": {
"as_set": setname,
"asns": [],
"count": 0,
"status": "not-found",
},
}
elif len(asns) == 0:
# AS-SET exists but has no members
logger.info(f"AS-SET '{setname}' exists but contains no members")
await ctx.info(
f"AS-SET expansion completed: empty ('{setname}' exists but has no members)"
)
result = {
"ok": True,
"data": {
"as_set": setname,
"asns": [],
"count": 0,
"status": "empty",
},
}
else:
# AS-SET exists and has members
await ctx.info(
f"AS-SET expansion completed: expanded (found {len(asns)} ASNs from '{setname}' at depth {max_depth})"
)
result = {
"ok": True,
"data": {
"as_set": setname,
"asns": sorted(asns),
"count": len(asns),
"status": "expanded",
},
}
logger.info(f"AS-SET expansion for '{setname}' completed with {len(asns)} ASNs")
return result
except Exception as e:
error_msg = f"AS-SET expansion for '{setname}' failed: {str(e)}"
logger.error(error_msg)
await ctx.error(f"AS-SET expansion failed: {error_msg}")
return {"ok": False, "error": "expansion_error", "detail": str(e)}
def register(mcp: FastMCP) -> None:
"""Register the expand_as_set tool with the MCP server."""
mcp.tool(
name=TOOL_NAME,
description=TOOL_DESCRIPTION,
)(_expand_as_set_request)