search_subdomains
Discover subdomains for a target domain to identify potential attack surfaces and infrastructure details during security reconnaissance.
Instructions
Search for subdomains of a given domain.
Args: domain: The parent domain name to query (e.g., example.com) ctx: Request context page: Page number for pagination (Plus accounts only)
Returns: Formatted string containing subdomains found
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| domain | Yes | ||
| page | No |
Implementation Reference
- server.py:561-646 (handler)Primary implementation of the search_subdomains tool. Registered with @mcp.tool() decorator. Fetches DNS records using DNSDumpsterClient and extracts unique subdomains from A and CNAME records, with pagination support.@mcp.tool() async def search_subdomains(domain: str, ctx: Context, page: int = 1) -> str: """Search for subdomains of a given domain. Args: domain: The parent domain name to query (e.g., example.com) ctx: Request context page: Page number for pagination (Plus accounts only) Returns: Formatted string containing subdomains found """ if not domain: return "Error: Domain is required" # Validate domain if not is_valid_domain(domain): return "Error: Invalid domain name format" try: api_key = os.environ.get("DNSDUMPSTER_API_KEY") if not api_key: return "Error: API key not configured. Set DNSDUMPSTER_API_KEY environment variable." client = DNSDumpsterClient(api_key) try: ctx.info(f"Searching subdomains for {domain} (page {page})") result = await client.get_dns_records(domain, page=page) # Extract subdomains from A records subdomains = set() if "a" in result: for record in result["a"]: host = record.get("host", "").lower() if ( host and host.endswith(domain.lower()) and host != domain.lower() ): subdomains.add(host) # Extract subdomains from CNAME records if "cname" in result: for record in result["cname"]: host = record.get("host", "").lower() if ( host and host.endswith(domain.lower()) and host != domain.lower() ): subdomains.add(host) target = record.get("target", "").lower() if ( target and target.endswith(domain.lower()) and target != domain.lower() ): subdomains.add(target) if not subdomains: return f"No subdomains found for {domain} on page {page}" output_lines = [f"Subdomains for {domain} (page {page}):"] for subdomain in sorted(subdomains): output_lines.append(f"\n{subdomain}") # Add pagination hint total_records = result.get("total_a_recs", 0) if total_records > 50 and len(subdomains) >= 50: # Free tier limit output_lines.append( f"\n\nShowing {len(subdomains)} subdomains. There may be more results available." ) output_lines.append( f"To see more results, use page parameter (e.g., page=2)" ) return "\n".join(output_lines) finally: await client.close() except Exception as e: return f"Error: {str(e)}"
- server.py:170-181 (helper)Helper function used by search_subdomains to validate the input domain format using regex.def is_valid_domain(domain: str) -> bool: """Validate a domain name. Args: domain: Domain name to validate Returns: True if the domain is valid, False otherwise """ pattern = r"^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$" return bool(re.match(pattern, domain))
- server.py:95-168 (helper)Core helper class that handles API communication with DNSDumpster, including caching, rate limiting, retries, and the get_dns_records method called by the tool.class DNSDumpsterClient: """Client for the DNSDumpster API.""" def __init__(self, api_key: str): """Initialize the DNSDumpster API client. Args: api_key: DNSDumpster API key """ self.api_key = api_key self.api_base_url = "https://api.dnsdumpster.com/domain" self.rate_limiter = APIRateLimiter() self.cache = DNSCache() self.client = httpx.AsyncClient(timeout=30.0, headers={"X-API-Key": api_key}) async def get_dns_records(self, domain: str, page: Optional[int] = None) -> DNSData: """Query the DNSDumpster API for a domain's DNS records. Args: domain: Domain name to query page: Page number for pagination (Plus accounts only) Returns: Dictionary containing DNS records """ # Check cache first cache_key = f"{domain}:{page or 1}" cached_data = await self.cache.get(cache_key) if cached_data: return cached_data # Wait for rate limiting await self.rate_limiter.wait_for_rate_limit() # Build URL with query parameters url = f"{self.api_base_url}/{domain}" params = {} if page is not None: params["page"] = str(page) # Retry logic for network errors max_retries = 3 retry_delay = 2.0 for attempt in range(max_retries): try: response = await self.client.get(url, params=params) if response.status_code == 429: # Handle rate limiting retry_after = int(response.headers.get("Retry-After", "5")) await asyncio.sleep(retry_after) continue response.raise_for_status() data = response.json() # Cache the response await self.cache.set(cache_key, data) return data except httpx.HTTPError as e: if attempt == max_retries - 1: raise Exception(f"Failed to query DNSDumpster API: {str(e)}") # Exponential backoff await asyncio.sleep(retry_delay * (2**attempt)) async def close(self): """Close the HTTP client.""" await self.client.aclose()