lineage-impact
Analyzes downstream impact of changing a data entity by walking lineage, counting unique consumers, and surfacing affected owners. Answers who or what breaks if you modify an asset.
Instructions
Aggregated downstream impact analysis: walks lineage (default 3 levels down, 1 up), counts unique consumers, breaks down by entity type, surfaces highest-fan-out top consumers, and (optionally) resolves the union of owners affected. Answers 'who/what breaks if I change X?' in one call instead of recursive get-lineage walks. Renders an Apps SDK card on ChatGPT clients.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| entity | Yes | Entity type | |
| fqn | Yes | Entity fully qualified name (e.g. 'mysql.default.warehouse.orders') | |
| downstreamDepth | No | Lineage depth to walk downstream (default 3, max 5) | |
| upstreamDepth | No | Lineage depth to walk upstream (default 1) | |
| includeOwners | No | Resolve owners (users/teams) of each downstream entity for change-management notifications | |
| topConsumersLimit | No | Number of highest-degree downstream consumers to surface in the summary |
Implementation Reference
- src/tools/lineage-impact.ts:64-181 (handler)Main handler function `lineageImpact` — fetches lineage from OpenMetadata API, performs BFS downstream traversal, classifies entities by type, computes top consumers (highest fan-out), and optionally resolves owners via search query. Returns structured impact assessment.
export async function lineageImpact(params: z.infer<typeof lineageImpactSchema>) { const { entity, fqn, downstreamDepth, upstreamDepth, includeOwners, topConsumersLimit } = params; const caveats: string[] = []; const lineagePath = `/lineage/${entity}/name/${encodeURIComponent(fqn)}`; const lineage = await omClient .get<LineageResponse>(lineagePath, { upstreamDepth, downstreamDepth }) .catch((err: unknown) => { caveats.push(`lineage failed: ${err instanceof Error ? err.message : String(err)}`); return {} as LineageResponse; }); const nodes: LineageNode[] = Array.isArray(lineage.nodes) ? lineage.nodes : []; const downstreamEdges: LineageEdge[] = Array.isArray(lineage.downstreamEdges) ? lineage.downstreamEdges : []; const upstreamEdges: LineageEdge[] = Array.isArray(lineage.upstreamEdges) ? lineage.upstreamEdges : []; const nodeById = new Map<string, LineageNode>(); for (const n of nodes) if (n.id) nodeById.set(n.id, n); // Walk downstream BFS from the root entity, classifying every reachable node. const rootId = lineage.entity?.id ?? null; const downstream = new Set<string>(); const downstreamByType: Record<string, number> = {}; const fanOut: Record<string, number> = {}; if (rootId) { const queue: string[] = [rootId]; const seen = new Set<string>([rootId]); while (queue.length > 0) { const cur = queue.shift()!; for (const e of downstreamEdges) { if (e.fromEntity?.id !== cur || !e.toEntity?.id) continue; const next = e.toEntity.id; fanOut[cur] = (fanOut[cur] ?? 0) + 1; if (seen.has(next)) continue; seen.add(next); downstream.add(next); const node = nodeById.get(next); const t = node?.type ?? "unknown"; downstreamByType[t] = (downstreamByType[t] ?? 0) + 1; queue.push(next); } } } const upstream = new Set<string>(); if (rootId) { for (const e of upstreamEdges) { if (e.toEntity?.id !== rootId || !e.fromEntity?.id) continue; upstream.add(e.fromEntity.id); } } // "Top consumers" = highest fan-out leaf nodes (most siblings rely on them). const topConsumers = [...downstream] .map((id) => ({ id, node: nodeById.get(id), score: fanOut[id] ?? 0 })) .sort((a, b) => b.score - a.score) .slice(0, topConsumersLimit) .map((c) => ({ id: c.id, name: c.node?.displayName ?? c.node?.name ?? null, fqn: c.node?.fullyQualifiedName ?? null, type: c.node?.type ?? null, downstreamFanOut: c.score, })); // Owner resolution via /search/query (entity index): one call returns owners // for every FQN we collected. Skipped when includeOwners=false. let owners: Array<{ name: string; displayName?: string; type?: string }> = []; if (includeOwners && downstream.size > 0) { const fqns = [...downstream] .map((id) => nodeById.get(id)?.fullyQualifiedName) .filter((s): s is string => typeof s === "string" && s.length > 0); if (fqns.length > 0) { const query = fqns.map((f) => `fullyQualifiedName:"${f.replace(/"/g, "")}"`).join(" OR "); const { search } = await aggregate( { search: () => omClient.get<{ hits?: { hits?: Array<{ _source?: OwnerHit }> } }>( "/search/query", { q: query, index: "all", size: Math.min(fqns.length, 200), include_source_fields: "owners,fullyQualifiedName" }, ), }, caveats, ); const hits = search?.hits?.hits ?? []; const seen = new Map<string, { name: string; displayName?: string; type?: string }>(); for (const h of hits) { for (const o of h._source?.owners ?? []) { if (!o.name) continue; if (!seen.has(o.name)) seen.set(o.name, { name: o.name, displayName: o.displayName, type: o.type }); } } owners = [...seen.values()]; } } return { entity: { id: lineage.entity?.id ?? null, fqn: lineage.entity?.fullyQualifiedName ?? fqn, type: lineage.entity?.type ?? entity, name: lineage.entity?.displayName ?? lineage.entity?.name ?? null, }, impact: { upstreamCount: upstream.size, downstreamCount: downstream.size, downstreamByType, ownersAffected: owners.length, depthWalked: { upstream: upstreamDepth, downstream: downstreamDepth }, }, topConsumers, owners, caveats, }; } - src/tools/lineage-impact.ts:26-37 (schema)Zod schema `lineageImpactSchema` defining input params: entity type enum, fqn, downstreamDepth (1-5, default 3), upstreamDepth (0-3, default 1), includeOwners (default true), topConsumersLimit (1-20, default 5).
export const lineageImpactSchema = z.object({ entity: entityTypeEnum, fqn: z.string().describe("Entity fully qualified name (e.g. 'mysql.default.warehouse.orders')"), downstreamDepth: z.coerce.number().int().min(1).max(5).optional().default(3) .describe("Lineage depth to walk downstream (default 3, max 5)"), upstreamDepth: z.coerce.number().int().min(0).max(3).optional().default(1) .describe("Lineage depth to walk upstream (default 1)"), includeOwners: z.boolean().optional().default(true) .describe("Resolve owners (users/teams) of each downstream entity for change-management notifications"), topConsumersLimit: z.coerce.number().int().min(1).max(20).optional().default(5) .describe("Number of highest-degree downstream consumers to surface in the summary"), }); - src/index.ts:464-484 (registration)Registration wrapper and `tool()` call. Defines LINEAGE_IMPACT_CARD_URI, wraps handler with `lineageImpactWithCard` (parses result JSON and attaches card metadata), then registers as MCP tool 'lineage-impact' with description and schema.
const LINEAGE_IMPACT_CARD_URI = "ui://widget/lineage-impact.html"; const wrappedLineageImpact = wrapToolHandler(lineageImpact); async function lineageImpactWithCard(args: Parameters<typeof wrappedLineageImpact>[0]) { const result = await wrappedLineageImpact(args); if (result.isError) return result; try { const structured = JSON.parse(result.content[0].text); return { ...result, structuredContent: structured, _meta: { "openai/outputTemplate": LINEAGE_IMPACT_CARD_URI, "ui.resourceUri": LINEAGE_IMPACT_CARD_URI, }, }; } catch { return result; } } tool("lineage-impact", "Aggregated downstream impact analysis: walks lineage (default 3 levels down, 1 up), counts unique consumers, breaks down by entity type, surfaces highest-fan-out top consumers, and (optionally) resolves the union of owners affected. Answers 'who/what breaks if I change X?' in one call instead of recursive get-lineage walks. Renders an Apps SDK card on ChatGPT clients.", lineageImpactSchema.shape, lineageImpactWithCard); - src/resources.ts:224-240 (registration)Resource registration of 'lineage-impact-card' serving the HTML UI template (lineage-impact.html) for Apps SDK clients (ChatGPT).
server.registerResource( "lineage-impact-card", "ui://widget/lineage-impact.html", { title: "Lineage Impact card", description: "Apps SDK UI template rendered with lineage-impact tool output", mimeType: "text/html+skybridge", _meta: { "openai/outputTemplate": "ui://widget/lineage-impact.html", "ui.resourceUri": "ui://widget/lineage-impact.html", }, }, async (uri) => ({ contents: [{ uri: uri.toString(), mimeType: "text/html+skybridge", text: LINEAGE_IMPACT_HTML, - src/prompts.ts:28-55 (helper)Prompt template 'lineage-impact-analysis' — instructs the LLM to walk lineage manually step-by-step (alternative to the one-call lineage-impact tool).
({ entityFqn, direction, depth }) => { const dir = direction ?? "downstream"; const d = depth ?? "3"; const upstreamDepth = dir === "downstream" ? "0" : d; const downstreamDepth = dir === "upstream" ? "0" : d; return { messages: [ { role: "user", content: { type: "text", text: [ `Perform a lineage impact analysis for ${entityFqn} (direction='${dir}', depth=${d}).`, "", "Steps:", `1. Identify the entity type from the FQN prefix and call the matching by-name tool to fetch the entity (e.g. \`get-table-by-name\` for tables, \`get-topic-by-name\` for topics, \`get-dashboard-by-name\` for dashboards, \`get-pipeline-by-name\` for pipelines, \`get-ml-model-by-name\` for ML models, \`get-container-by-name\` for containers). Pass fields='owners,tags,domain' so the source entity's owners are included.`, `2. Call \`get-lineage-by-name\` with entity=<resolved type> (one of 'table' | 'topic' | 'dashboard' | 'pipeline' | 'mlmodel' | 'container'), fqn=${JSON.stringify(entityFqn)}, upstreamDepth=${upstreamDepth}, downstreamDepth=${downstreamDepth}.`, "3. Walk the response edges (`upstreamEdges` / `downstreamEdges`) and collect the unique affected entity FQNs and their entity types from `nodes`.", "4. For each affected entity, call the corresponding `get-<type>-by-name` tool with fields='owners,tags' to retrieve current owners. De-duplicate owners across entities.", "5. Produce an impact report with:", " - Blast radius: count of affected entities grouped by entity type and depth level.", " - Entities at risk: a table of FQN, type, depth, owners (name + email).", " - Owners to notify: de-duplicated list of users/teams with the entities they own in this blast radius.", " - Recommended next steps (e.g. announce in #data-platform, open a change ticket, schedule downtime).", ].join("\n"), }, }, ],