graphql_introspect
Query a GraphQL endpoint to retrieve its schema and security observations, with configurable timeout, TLS verification, and SSRF protection.
Instructions
Run a GraphQL introspection query against url and summarize the schema.
Single HTTP POST. Read-only. Will not mutate state on the server.
By default, requests resolving to private / loopback / link-local /
cloud-metadata addresses are blocked (SSRF protection). Set
allow_private=True to override — useful when explicitly auditing
internal infrastructure.
Redirects are disabled (an HTTP 3xx from the target raises
HTTP-error: redirects disabled). This prevents a public endpoint from
redirecting the request to a private address after the pre-flight check.
Residual risk: DNS rebinding. The pre-flight resolution and the actual HTTP request happen in separate syscalls and the OS may resolve the hostname twice. A hostile DNS that returns a public IP for the check and a private IP for the request can defeat the guard. For high-stakes environments, run this tool inside a network namespace / egress proxy that enforces address restrictions independently.
Args:
url: Full GraphQL endpoint URL (e.g. https://api.example.com/graphql).
timeout: Network timeout in seconds (clamped to [1, 60]).
insecure: Skip TLS verification (for self-signed certs in test envs).
allow_private: Permit requests to private / internal addresses.
Default False.
Returns:
IntrospectReport summarizing the schema and security observations.
If the URL resolves to a private address and allow_private is
False, returns {"error": "blocked-private-address", ...}.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| url | Yes | ||
| timeout | No | ||
| insecure | No | ||
| allow_private | No |
Implementation Reference
- The main tool handler function `graphql_introspect` — sends a GraphQL introspection query and summarizes the schema with security observations.
def graphql_introspect( url: str, timeout: float = 10.0, insecure: bool = False, allow_private: bool = False, ) -> dict: """Run a GraphQL introspection query against `url` and summarize the schema. Single HTTP POST. Read-only. Will not mutate state on the server. By default, requests resolving to private / loopback / link-local / cloud-metadata addresses are blocked (SSRF protection). Set `allow_private=True` to override — useful when explicitly auditing internal infrastructure. Redirects are disabled (an HTTP 3xx from the target raises `HTTP-error: redirects disabled`). This prevents a public endpoint from redirecting the request to a private address after the pre-flight check. Residual risk: DNS rebinding. The pre-flight resolution and the actual HTTP request happen in separate syscalls and the OS may resolve the hostname twice. A hostile DNS that returns a public IP for the check and a private IP for the request can defeat the guard. For high-stakes environments, run this tool inside a network namespace / egress proxy that enforces address restrictions independently. Args: url: Full GraphQL endpoint URL (e.g. `https://api.example.com/graphql`). timeout: Network timeout in seconds (clamped to [1, 60]). insecure: Skip TLS verification (for self-signed certs in test envs). allow_private: Permit requests to private / internal addresses. Default False. Returns: IntrospectReport summarizing the schema and security observations. If the URL resolves to a private address and `allow_private` is False, returns `{"error": "blocked-private-address", ...}`. """ if not isinstance(url, str) or not url.startswith(("http://", "https://")): return {"error": "url must be an http(s) URL"} timeout = max(1.0, min(float(timeout), 60.0)) parsed = urllib.parse.urlparse(url) host = parsed.hostname or "" if not host: return {"error": "url has no hostname"} is_private, resolved = _is_private_address(host) if is_private and not allow_private: return { "error": "blocked-private-address", "host": host, "resolved_ip": resolved, "hint": "Pass allow_private=True to audit internal infrastructure intentionally.", } status, data, err = _post(url, {"query": INTROSPECTION_QUERY}, timeout, insecure) report = IntrospectReport(url=url, introspection_enabled=False, status=status, error=err) if not data or "data" not in data or not data.get("data", {}).get("__schema"): if data and data.get("errors"): report.error = json.dumps(data["errors"])[:300] return report.model_dump() schema = data["data"]["__schema"] report.introspection_enabled = True report.query_type = (schema.get("queryType") or {}).get("name") report.mutation_type = (schema.get("mutationType") or {}).get("name") report.subscription_type = (schema.get("subscriptionType") or {}).get("name") by_name = {t["name"]: t for t in (schema.get("types") or []) if t.get("name")} report.type_count = sum(1 for n in by_name if not n.startswith("__")) def _fields(type_name: str | None) -> list[str]: if not type_name or type_name not in by_name: return [] return [f["name"] for f in (by_name[type_name].get("fields") or [])] queries = _fields(report.query_type) mutations = _fields(report.mutation_type) subs = _fields(report.subscription_type) report.query_count = len(queries) report.mutation_count = len(mutations) report.subscription_count = len(subs) report.sample_queries = queries[:25] report.sample_mutations = mutations[:25] report.observations.append(GraphQLObservation( category="introspection-exposed", severity="medium", message="introspection is enabled on this endpoint — consider disabling in production", )) if report.mutation_count > 0: report.observations.append(GraphQLObservation( category="mutations-exposed", severity="info", message=f"{report.mutation_count} mutation(s) exposed; review auth on each", )) sensitive_hits: list[str] = [] for name in queries + mutations: low = name.lower() if any(h in low for h in SENSITIVE_FIELD_HINTS): sensitive_hits.append(name) if sensitive_hits: report.observations.append(GraphQLObservation( category="sensitive-named-fields", severity="medium", message=f"operations with sensitive-looking names: {', '.join(sensitive_hits[:8])}", )) return report.model_dump() - Pydantic models: GraphQLObservation (category/severity/message) and IntrospectReport (full report schema) used for input/output validation.
class GraphQLObservation(BaseModel): category: str severity: Severity message: str class IntrospectReport(BaseModel): url: str introspection_enabled: bool status: int | None = None error: str | None = None query_type: str | None = None mutation_type: str | None = None subscription_type: str | None = None query_count: int = 0 mutation_count: int = 0 subscription_count: int = 0 type_count: int = 0 sample_queries: list[str] = Field(default_factory=list) sample_mutations: list[str] = Field(default_factory=list) observations: list[GraphQLObservation] = Field(default_factory=list) - src/mcp_security_toolkit/server.py:32-32 (registration)Registration of graphql_introspect as an MCP tool via `mcp.tool()(graphql_introspect.graphql_introspect)`
mcp.tool()(graphql_introspect.graphql_introspect) - Helper class `_NoRedirectHandler` — blocks HTTP redirects to prevent SSRF (redirect to private IP after pre-flight check).
class _NoRedirectHandler(urllib.request.HTTPRedirectHandler): """Block all redirects. Following Location: from an attacker-controlled server can land on private IPs (SSRF) or attacker-controlled second-stage URLs that the pre-flight SSRF check didn't see. For an introspection POST, redirects are not legitimate behavior — fail fast. """ def http_error_301(self, req, fp, code, msg, headers): raise urllib.error.HTTPError( req.full_url, code, "redirects disabled (SSRF protection — redirect from initial URL not allowed)", headers, fp, ) http_error_302 = http_error_301 http_error_303 = http_error_301 http_error_307 = http_error_301 http_error_308 = http_error_301 - Helper function `safe_url` — mirrors the SSRF protection logic used in graphql_introspect, exposed as a standalone URL validation utility.
def safe_url( url: str, *, allow_private: bool = False, allowed_schemes: frozenset[str] = DEFAULT_ALLOWED_SCHEMES, ) -> str: """Return `url` if it is safe to fetch; otherwise raise. Checks performed: 1. URL must parse, have a non-empty hostname. 2. Scheme must be in `allowed_schemes` (default: http, https). 3. Hostname must resolve. Refusing to resolve is treated as unsafe to avoid leaking lookups to attacker-controlled DNS through downstream HTTP libraries. 4. None of the resolved A/AAAA records may be private, loopback, link-local, multicast, reserved, or a known cloud-metadata IP — unless `allow_private=True` (opt-in for explicit internal audits). Residual: this is a *pre-flight* check. A hostile DNS server could return a public IP here and a private IP at the time the actual request is made (DNS rebinding). For high-stakes use, run inside a network namespace / egress proxy that enforces address restrictions independently — same caveat as `graphql_introspect`. Args: url: URL string from agent / user / config. allow_private: Permit hosts on private / internal networks. Default False. allowed_schemes: Schemes to accept. Default `{"http", "https"}`. Returns: The original `url` string, unchanged. Callers can pass it to their HTTP client knowing it has been validated. Raises: UnsafeURL: malformed URL, disallowed scheme, or hostname missing. BlockedAddress: resolved to private / loopback / metadata address. Example: >>> from mcp_security_toolkit.helpers import safe_url >>> @mcp.tool() ... def fetch_metadata(url: str) -> str: ... url = safe_url(url) ... return httpx.get(url, timeout=5).text """ if not isinstance(url, str) or not url.strip(): raise UnsafeURL("url must be a non-empty string") parsed = urllib.parse.urlparse(url) if parsed.scheme not in allowed_schemes: raise UnsafeURL( f"scheme {parsed.scheme!r} not in allowed set {sorted(allowed_schemes)}" ) host = parsed.hostname if not host: raise UnsafeURL("url has no hostname") if allow_private: return url ips = _resolve_host(host) if not ips: raise BlockedAddress(f"host {host!r} did not resolve") for ip in ips: if _is_blocked(ip): raise BlockedAddress( f"host {host!r} resolves to {ip!s} (private / loopback / metadata) " f"— pass allow_private=True to permit" ) return url