Skip to main content
Glama
anhhung04

DNSDumpster MCP Server

by anhhung04

search_subdomains

Discover subdomains for a target domain to identify potential attack surfaces and infrastructure details during security reconnaissance.

Instructions

Search for subdomains of a given domain.

Args: domain: The parent domain name to query (e.g., example.com) ctx: Request context page: Page number for pagination (Plus accounts only)

Returns: Formatted string containing subdomains found

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
domainYes
pageNo

Implementation Reference

  • Primary implementation of the search_subdomains tool. Registered with @mcp.tool() decorator. Fetches DNS records using DNSDumpsterClient and extracts unique subdomains from A and CNAME records, with pagination support.
    @mcp.tool()
    async def search_subdomains(domain: str, ctx: Context, page: int = 1) -> str:
        """Search for subdomains of a given domain.
    
        Args:
            domain: The parent domain name to query (e.g., example.com)
            ctx: Request context
            page: Page number for pagination (Plus accounts only)
    
        Returns:
            Formatted string containing subdomains found
        """
        if not domain:
            return "Error: Domain is required"
    
        # Validate domain
        if not is_valid_domain(domain):
            return "Error: Invalid domain name format"
    
        try:
            api_key = os.environ.get("DNSDUMPSTER_API_KEY")
            if not api_key:
                return "Error: API key not configured. Set DNSDUMPSTER_API_KEY environment variable."
    
            client = DNSDumpsterClient(api_key)
    
            try:
                ctx.info(f"Searching subdomains for {domain} (page {page})")
                result = await client.get_dns_records(domain, page=page)
    
                # Extract subdomains from A records
                subdomains = set()
    
                if "a" in result:
                    for record in result["a"]:
                        host = record.get("host", "").lower()
                        if (
                            host
                            and host.endswith(domain.lower())
                            and host != domain.lower()
                        ):
                            subdomains.add(host)
    
                # Extract subdomains from CNAME records
                if "cname" in result:
                    for record in result["cname"]:
                        host = record.get("host", "").lower()
                        if (
                            host
                            and host.endswith(domain.lower())
                            and host != domain.lower()
                        ):
                            subdomains.add(host)
    
                        target = record.get("target", "").lower()
                        if (
                            target
                            and target.endswith(domain.lower())
                            and target != domain.lower()
                        ):
                            subdomains.add(target)
    
                if not subdomains:
                    return f"No subdomains found for {domain} on page {page}"
    
                output_lines = [f"Subdomains for {domain} (page {page}):"]
    
                for subdomain in sorted(subdomains):
                    output_lines.append(f"\n{subdomain}")
    
                # Add pagination hint
                total_records = result.get("total_a_recs", 0)
                if total_records > 50 and len(subdomains) >= 50:  # Free tier limit
                    output_lines.append(
                        f"\n\nShowing {len(subdomains)} subdomains. There may be more results available."
                    )
                    output_lines.append(
                        f"To see more results, use page parameter (e.g., page=2)"
                    )
    
                return "\n".join(output_lines)
            finally:
                await client.close()
    
        except Exception as e:
            return f"Error: {str(e)}"
  • Helper function used by search_subdomains to validate the input domain format using regex.
    def is_valid_domain(domain: str) -> bool:
        """Validate a domain name.
    
        Args:
            domain: Domain name to validate
    
        Returns:
            True if the domain is valid, False otherwise
        """
        pattern = r"^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$"
        return bool(re.match(pattern, domain))
  • Core helper class that handles API communication with DNSDumpster, including caching, rate limiting, retries, and the get_dns_records method called by the tool.
    class DNSDumpsterClient:
        """Client for the DNSDumpster API."""
    
        def __init__(self, api_key: str):
            """Initialize the DNSDumpster API client.
    
            Args:
                api_key: DNSDumpster API key
            """
            self.api_key = api_key
            self.api_base_url = "https://api.dnsdumpster.com/domain"
            self.rate_limiter = APIRateLimiter()
            self.cache = DNSCache()
            self.client = httpx.AsyncClient(timeout=30.0, headers={"X-API-Key": api_key})
    
        async def get_dns_records(self, domain: str, page: Optional[int] = None) -> DNSData:
            """Query the DNSDumpster API for a domain's DNS records.
    
            Args:
                domain: Domain name to query
                page: Page number for pagination (Plus accounts only)
    
            Returns:
                Dictionary containing DNS records
            """
            # Check cache first
            cache_key = f"{domain}:{page or 1}"
            cached_data = await self.cache.get(cache_key)
            if cached_data:
                return cached_data
    
            # Wait for rate limiting
            await self.rate_limiter.wait_for_rate_limit()
    
            # Build URL with query parameters
            url = f"{self.api_base_url}/{domain}"
            params = {}
    
            if page is not None:
                params["page"] = str(page)
    
            # Retry logic for network errors
            max_retries = 3
            retry_delay = 2.0
    
            for attempt in range(max_retries):
                try:
                    response = await self.client.get(url, params=params)
    
                    if response.status_code == 429:
                        # Handle rate limiting
                        retry_after = int(response.headers.get("Retry-After", "5"))
                        await asyncio.sleep(retry_after)
                        continue
    
                    response.raise_for_status()
                    data = response.json()
    
                    # Cache the response
                    await self.cache.set(cache_key, data)
    
                    return data
    
                except httpx.HTTPError as e:
                    if attempt == max_retries - 1:
                        raise Exception(f"Failed to query DNSDumpster API: {str(e)}")
    
                    # Exponential backoff
                    await asyncio.sleep(retry_delay * (2**attempt))
    
        async def close(self):
            """Close the HTTP client."""
            await self.client.aclose()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/anhhung04/mcp-dnsdumpster'

If you have feedback or need assistance with the MCP directory API, please join our Discord server