Skip to main content
Glama
discovery.py15.6 kB
""" Censys MCP tools module This module defines Claude-compatible tools that use the Censys Search API to perform recon on domains and IPs through natural language interactions. """ from datetime import datetime, timedelta from collections import defaultdict from mcp_censys.client.censys import CensysClient from mcp.server.fastmcp import FastMCP import mcp.types as types import sys import logging mcp = FastMCP("Censys MCP Server") censys = CensysClient() # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) def is_domain_match(hostname: str, domain: str) -> bool: """ Check if a hostname matches or is a subdomain of the given domain. Args: hostname: The hostname to check domain: The domain to check against Returns: bool: True if hostname matches domain or is a subdomain """ hostname = hostname.rstrip(".").lower() domain = domain.rstrip(".").lower() return hostname == domain or hostname.endswith(f".{domain}") @mcp.prompt( name="Lookup Domain", description="Create an insightful analysis of basic domain infrastructure.", ) def lookup_domain_prompt(domain: str) -> str: """ Create a prompt template for basic domain lookup with intelligent analysis. Args: domain: The domain to lookup ctx: The MCP context Returns: str: A prompt template for Claude to summarize domain information """ return f""" # {domain} - Infrastructure Analysis Perform a lookup_domain on `{domain}` ONLY - do not run any additional tools. Create an intelligent analysis that identifies patterns and organizes the data into meaningful sections. ## Suggested Sections - **Infrastructure Overview**: A brief summary of the domain's hosting approach - **IP Distribution**: Analyze the IP addresses and their organization - **DNS Architecture**: Identify naming patterns and DNS structure - **Service Configuration**: Examine the services and ports in use - **Network Presence**: Analyze the ASNs and hosting providers ## Technical Data Requirements - Display up to 5 IPv4 addresses in a code block, with total count in parentheses - Only report the count of IPv6 addresses (do not list them) - Include up to 5 forward DNS entries and 5 reverse DNS entries with total counts - List all observed open ports - Include all ASN information ## Guidelines 1. Run ONLY the lookup_domain tool - no additional lookups 2. Group similar information and identify patterns 3. Use code blocks for technical data that might need to be copied 4. DO NOT make claims about infrastructure quality or suggest improvements 5. Organize information based on logical relationships in the data 6. Keep your analysis factual while highlighting interesting patterns """ @mcp.prompt( name="Lookup Domain (Detailed)", description="Create an intelligent analysis of domain infrastructure.", ) def lookup_domain_detailed_prompt(domain: str) -> str: """ Create a prompt template for detailed domain lookup with intelligent analysis. Args: domain: The domain to lookup in detail ctx: The MCP context Returns: str: A prompt template for Claude to summarize detailed domain information """ return f""" # {domain} - Infrastructure Analysis Perform a lookup_domain_detailed on `{domain}` and create a comprehensive analysis Do not run any additional tools Begin with the sample limitation note from the response data, then analyze the infrastructure by grouping similar information into meaningful sections. ## Required Sections - **Infrastructure Overview**: A brief summary of the overall hosting approach - **IP Addresses**: List all IPs in an easy-to-copy code block - **DNS Information**: Include reverse DNS entries in a code block, identify any naming patterns - **Geographic Distribution**: Analyze where servers are located - **Technical Configuration**: Group common technical elements (ASNs, services, OS) - **Content Delivery Architecture**: Identify the hosting/CDN strategy ## Technical Data Format Present IPs and DNS entries in code blocks for easy copying: ``` IP Addresses: ip_address_1 ip_address_2 ``` ``` Reverse DNS: dns_name_1 dns_name_2 ``` ## Important Guidelines 1. Group similar data rather than listing each host separately 2. Identify patterns and commonalities across all hosts 3. ALWAYS include a dedicated section for IPs and DNS entries in code blocks 4. DO NOT make claims about infrastructure "quality" or "maintenance schedules" 5. Present timestamps as observation timestamps only, not maintenance indicators 6. Include significant details but avoid creating an overwhelming report """ @mcp.tool( description="Summarize a domain's infrastructure: IPs, reverse DNS names, open ports, and ASNs." ) async def lookup_domain(domain: str) -> dict: """ Lookup a domain's infrastructure information from Censys. Retrieves IP addresses, DNS names, ASNs, and open ports associated with a domain by searching Censys for matches in DNS records. Args: domain: The domain to lookup ctx: The MCP context Returns: dict: A dictionary containing domain infrastructure information """ logger.info(f"Looking up domain infrastructure: {domain}") query = f"(dns.names: {domain} OR dns.reverse_dns.names: {domain})" fields = [ "ip", "dns.names", "dns.reverse_dns.names", "autonomous_system.name", "services.port", ] # Initialize sets to collect all data ips, dns_names, reverse_dns, asns, ports = set(), set(), set(), set(), set() # Use search_hosts which handles pagination automatically logger.info(f"Searching Censys for hosts related to {domain}") search = censys.search_hosts(query, fields, per_page=100) # Process results from all pages record_count = 0 for page in search: for r in page: # Results are directly in the page, not in a "results" key record_count += 1 if ip := r.get("ip"): ips.add(ip) dns = r.get("dns", {}) dns_names.update(dns.get("names", [])) reverse_dns.update(dns.get("reverse_dns", {}).get("names", [])) if asn := r.get("autonomous_system", {}).get("name"): asns.add(asn) ports.update(s.get("port") for s in r.get("services", []) if s.get("port")) logger.info(f"Found {record_count} records for {domain}") logger.info( f"Collected {len(ips)} IPs, {len(dns_names)} DNS names, {len(reverse_dns)} reverse DNS names" ) return { "domain": domain, "ips": sorted(ips), "dns_names": sorted(dns_names), "reverse_dns": sorted(reverse_dns), "asns": sorted(asns), "ports": sorted(ports), } @mcp.tool( description="Return full host records for a domain (services, ASN, geo, TLS). Shows a limited sample of matching records." ) async def lookup_domain_detailed(domain: str) -> dict: """ Lookup detailed host records for a domain from Censys. Returns full host records including services, ASN, geo, and TLS information. To avoid overwhelming the user, only returns a limited sample of records. Args: domain: The domain to lookup ctx: The MCP context Returns: dict: A dictionary containing detailed host records """ logger.info(f"Looking up detailed domain information: {domain}") query = f"(dns.names: {domain} OR dns.reverse_dns.names: {domain})" per_page = 3 # Limit to just 3 records # Use raw_search to get metadata including total count logger.info(f"Performing raw search for {domain} with sample limit of {per_page}") raw_response = censys.hosts.raw_search(query=query, per_page=per_page) # Extract total count and results total_records = raw_response.get("result", {}).get("total", 0) results = raw_response.get("result", {}).get("hits", []) logger.info(f"Found {total_records} total records for {domain}") # Create an informative note about available records note = None if total_records > per_page: note = f"Showing {len(results)} of {total_records} total records. There are {total_records - per_page} additional records not displayed." else: note = f"Showing all {total_records} record(s)." return { "domain": domain, "record_count": total_records, "sample_limit": per_page, "note": note, "records": results, } @mcp.tool( description="Get full metadata for an IP: DNS, ASN, ports, TLS, and location." ) async def lookup_ip(ip: str) -> dict: """ Lookup detailed metadata for an IP address from Censys. Retrieves full IP metadata including DNS, ASN, geographical location, open ports, services, and TLS certificate information. Args: ip: The IP address to lookup ctx: The MCP context Returns: dict: A dictionary containing IP metadata """ logger.info(f"Looking up IP metadata: {ip}") query = f"ip: {ip}" # Use specific leaf fields instead of parent fields fields = [ "ip", "autonomous_system.name", "autonomous_system.asn", "location.country", "location.continent", "location.coordinates.latitude", # Separate leaf field for latitude "location.coordinates.longitude", # Separate leaf field for longitude "dns.names", "dns.reverse_dns.names", "services.port", "services.service_name", "services.transport_protocol", "services.tls.certificates.leaf_data.names", "last_updated_at", ] # Use search_hosts which handles pagination automatically logger.info(f"Searching Censys for IP: {ip}") search = censys.search_hosts(query, fields, per_page=1) results = [] # We only need the first page since we're looking up a specific IP for page in search: results.extend(page) # Page is directly iterable with results break result_count = len(results) logger.info(f"Found {result_count} results for IP: {ip}") return {"ip": ip, "records": results} @mcp.tool(description="Find recently seen FQDNs tied to a domain in DNS and certs.") async def new_fqdns( domain: str, days: int = 1, ) -> dict: """ Find recently seen FQDNs (fully qualified domain names) tied to a domain. Searches both DNS records and certificates in Censys to find domain names that have been seen within the specified time period. Args: domain: The base domain to search for days: Number of days back to search (default: 1) ctx: The MCP context Returns: dict: A dictionary containing recently seen FQDNs and their sources """ logger.info( f"Searching for recently seen FQDNs for domain: {domain} (last {days} days)" ) since = (datetime.now(datetime.timezone.utc) - timedelta(days=days)).strftime( "%Y-%m-%d" ) fqdns = defaultdict(lambda: {"sources": set(), "last_seen": None}) # Search for DNS records dns_query = f"(dns.names: {domain} OR dns.reverse_dns.names: {domain}) AND last_updated_at: [{since} TO *]" dns_fields = ["dns.names", "dns.reverse_dns.names", "last_updated_at"] logger.info(f"Searching DNS records since {since}") dns_search = censys.search_hosts(query=dns_query, fields=dns_fields, per_page=100) # Process each page of DNS results dns_record_count = 0 for page in dns_search: for r in page: # Direct iteration over page items instead of using .get() dns_record_count += 1 last_seen = r.get("last_updated_at") dns_data = r.get("dns", {}) for name in dns_data.get("names", []): if is_domain_match(name, domain): fqdns[name]["sources"].add("hosts-dns") fqdns[name]["last_seen"] = last_seen for name in dns_data.get("reverse_dns", {}).get("names", []): if is_domain_match(name, domain): fqdns[name]["sources"].add("hosts-reverse") fqdns[name]["last_seen"] = last_seen logger.info( f"Found {dns_record_count} DNS records with {len(fqdns)} matching FQDNs" ) # Search for certificates cert_query = f"names: {domain} AND added_at: [{since} TO *]" cert_fields = ["names", "added_at"] logger.info(f"Searching certificates since {since}") cert_results = censys.certs.search(cert_query, fields=cert_fields, per_page=100) # Process certificate results cert_count = 0 for result in cert_results: items = result if isinstance(result, list) else [result] for r in items: cert_count += 1 added_at = r.get("added_at") for name in r.get("names", []): if is_domain_match(name, domain): fqdns[name]["sources"].add("certs") fqdns[name]["last_seen"] = added_at logger.info(f"Found {cert_count} certificates with matching domains") logger.info(f"Total unique FQDNs found: {len(fqdns)}") return { "domain": domain, "days": days, "new_fqdns": sorted(fqdns.keys()), "count": len(fqdns), "details": { name: { "sources": sorted(list(data["sources"])), "last_seen": data["last_seen"], } for name, data in fqdns.items() }, } @mcp.tool(description="List exposed ports and service names for a given IP address.") async def host_services(ip: str) -> dict: """ List exposed ports and service names for a given IP address. Searches Censys for services running on the specified IP address and returns their port numbers, service names, and when they were last seen. Args: ip: The IP address to lookup ctx: The MCP context Returns: dict: A dictionary containing services running on the IP """ logger.info(f"Looking up services for IP: {ip}") query = f"ip: {ip}" fields = ["services.port", "services.service_name", "last_updated_at"] logger.info(f"Searching Censys for services on IP: {ip}") search = censys.search_hosts(query, fields, per_page=100) services = [] service_count = 0 record_count = 0 for page in search: for r in page: # Direct iteration over page items record_count += 1 for s in r.get("services", []): service_count += 1 services.append( { "port": s.get("port"), "service": s.get("service_name"), "last_seen": r.get("last_updated_at"), } ) logger.info( f"Found {service_count} services across {record_count} records for IP: {ip}" ) return {"ip": ip, "services": services} __all__ = ["mcp"]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nickpending/mcp-censys'

If you have feedback or need assistance with the MCP directory API, please join our Discord server