domain_fetch_domain_history
Retrieve historical SSL certificate records for any domain using Certificate Transparency logs. Provides past certificate data for security and compliance analysis.
Instructions
Use this to get historical SSL certificate records for a domain. Provide the domain name. Returns past certificates from Certificate Transparency logs.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| domain | Yes |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- datanexus/tools/t07.py:401-487 (handler)The actual tool handler 'fetch_domain_history' (tool name 'domain_fetch_domain_history' when mounted under the 'domain' namespace). Fetches historical SSL certificate records for a domain from crt.sh Certificate Transparency logs. Uses a wildcard query to capture all subdomain certs, falls back to exact domain. Returns certificate events with logging dates, validity periods, issuer, and names.
# ══════════════════════════════════════════════════════════════════════════════ # DATA TOOL 4 — fetch_domain_history # ══════════════════════════════════════════════════════════════════════════════ @mcp.tool() @with_timeout @verify_entitlement("T07") async def fetch_domain_history(domain: str) -> dict: """Use this to get historical SSL certificate records for a domain. Provide the domain name. Returns past certificates from Certificate Transparency logs.""" domain_clean = domain.strip().lower().lstrip("www.").split("/")[0] params = {"domain": domain_clean, "query_type": "history"} async with AuditContext("T07", params, "1.0") as ctx: _incr_calls("T07") phash = make_params_hash(params) cached = get_cached("T07", phash) if cached: ctx.set_cache_hit(True) return { **cached, **standard_response_fields( ctx.query_hash, cached.get("data_as_of", ""), True, ), "cache_hit": True, } if is_tripped("crt_sh"): return error_response( ErrorCode.CIRCUIT_OPEN, "crt.sh Certificate Transparency temporarily unavailable. Try again later.", ctx.query_hash, 300, False, ) try: # Broader wildcard query to capture all subdomains' historical certs history = await _fetch_crt_sh(f"%.{domain_clean}", limit=50) if not history: # Fallback to exact domain history = await _fetch_crt_sh(domain_clean, limit=50) except httpx.TimeoutException: record_failure_sync("crt_sh") return error_response( ErrorCode.UPSTREAM_TIMEOUT, "crt.sh timed out. Try again shortly.", ctx.query_hash, 30, False, ) except Exception: record_failure_sync("crt_sh") log.exception("t07.fetch_domain_history error domain=%s", domain_clean) return error_response( ErrorCode.INTERNAL_ERROR, "An internal error occurred. Please try again.", ctx.query_hash, 0, False, ) data_as_of = datetime.now(timezone.utc).isoformat() raw_bytes = json.dumps(history).encode() payload_hash = compute_payload_hash(raw_bytes) markdown = _build_history_markdown(history, domain_clean) _validate_canary(markdown) payload = { "tool_id": "T07", "source_url": f"https://crt.sh/?q=%.{domain_clean}&output=json", "fetch_timestamp": data_as_of, "cache_hit": False, "staleness_notice": None, "sha256_hash": payload_hash, "data": {"domain": domain_clean, "certificate_events": history, "count": len(history)}, "markdown_output": markdown, "disclaimer": DISCLAIMER, "data_as_of": data_as_of, "ingest_healthy": True, } set_cached("T07", phash, payload, T07_TTL) ctx.set_cache_hit(False) record_success_sync("crt_sh") log.info("t07.fetch_domain_history ok domain=%s events=%d", domain_clean, len(history)) return {**payload, **standard_response_fields(ctx.query_hash, data_as_of, True)} - datanexus/tools/t07.py:408-408 (schema)Input schema: function signature takes a single 'domain: str' parameter. No explicit Pydantic schema class — uses FastMCP type inference from the function signature.
async def fetch_domain_history(domain: str) -> dict: - datanexus/tools/t07.py:593-635 (helper)Helper '_fetch_crt_sh' — the upstream fetch function that queries crt.sh JSON API, used by both fetch_ssl_certificate_chain and fetch_domain_history.
async def _fetch_crt_sh(query: str, limit: int = 10) -> list: async with httpx.AsyncClient( timeout=httpx.Timeout(20.0, connect=5.0), headers=_HEADERS, follow_redirects=True, ) as client: resp = await client.get( CRT_SH_URL, params={"q": query, "output": "json"}, ) if resp.status_code == 404: return [] resp.raise_for_status() try: data = resp.json() except Exception: return [] if not isinstance(data, list): return [] # Deduplicate by certificate serial / subject and limit seen_ids: set = set() certs = [] for entry in data: cert_id = entry.get("id") or entry.get("serial_number", "") if cert_id in seen_ids: continue seen_ids.add(cert_id) certs.append({ "id": entry.get("id", ""), "logged_at": (entry.get("logged_at") or "")[:10], "not_before": (entry.get("not_before") or "")[:10], "not_after": (entry.get("not_after") or "")[:10], "common_name": entry.get("common_name", ""), "issuer_name": entry.get("issuer_name", ""), "name_value": entry.get("name_value", ""), }) if len(certs) >= limit: break return certs - datanexus/tools/t07.py:764-790 (helper)Helper '_build_history_markdown' — builds the markdown table output for fetch_domain_history results.
def _build_history_markdown(history: list, domain: str) -> str: lines = [ f"## SSL Certificate History — {domain}", f"Found **{len(history)}** certificate event(s) in CT logs.\n", ] if not history: lines.append("No historical certificate records found.") else: lines.append("| Logged | Valid From | Valid To | Issuer | Name/SAN |") lines.append("|--------|------------|----------|--------|----------|") for c in history[:25]: # cap display at 25 rows issuer_short = c.get("issuer_name", "")[:35] name = (c.get("common_name") or c.get("name_value", ""))[:40] lines.append( f"| {c.get('logged_at','')} " f"| {c.get('not_before','')} " f"| {c.get('not_after','')} " f"| {issuer_short} " f"| {name} |" ) if len(history) > 25: lines.append(f"\n*…and {len(history)-25} more records (showing most recent 25).*") lines.append("") lines.append(f"*{DISCLAIMER}*") return "\n".join(lines) - datanexus/tools/domain.py:11-19 (registration)Registration: fetch_domain_history is imported from t07.py and registered via domain.tool()(fetch_domain_history) on a FastMCP sub-server named 'domain'. This is mounted in main.py under the 'domain' namespace, so the full MCP tool name becomes 'domain_fetch_domain_history'.
fetch_domain_history, ) domain = FastMCP("DataNexus Domain") domain.tool()(fetch_domain_rdap) domain.tool()(fetch_ssl_certificate_chain) domain.tool()(fetch_dns_records) domain.tool()(fetch_domain_history)