Skip to main content
Glama
us-all

openmetadata-mcp-server

by us-all

lineage-impact

Analyzes downstream impact of changing a data entity by walking lineage, counting unique consumers, and surfacing affected owners. Answers who or what breaks if you modify an asset.

Instructions

Aggregated downstream impact analysis: walks lineage (default 3 levels down, 1 up), counts unique consumers, breaks down by entity type, surfaces highest-fan-out top consumers, and (optionally) resolves the union of owners affected. Answers 'who/what breaks if I change X?' in one call instead of recursive get-lineage walks. Renders an Apps SDK card on ChatGPT clients.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
entityYesEntity type
fqnYesEntity fully qualified name (e.g. 'mysql.default.warehouse.orders')
downstreamDepthNoLineage depth to walk downstream (default 3, max 5)
upstreamDepthNoLineage depth to walk upstream (default 1)
includeOwnersNoResolve owners (users/teams) of each downstream entity for change-management notifications
topConsumersLimitNoNumber of highest-degree downstream consumers to surface in the summary

Implementation Reference

  • Main handler function `lineageImpact` — fetches lineage from OpenMetadata API, performs BFS downstream traversal, classifies entities by type, computes top consumers (highest fan-out), and optionally resolves owners via search query. Returns structured impact assessment.
    export async function lineageImpact(params: z.infer<typeof lineageImpactSchema>) {
      const { entity, fqn, downstreamDepth, upstreamDepth, includeOwners, topConsumersLimit } = params;
    
      const caveats: string[] = [];
    
      const lineagePath = `/lineage/${entity}/name/${encodeURIComponent(fqn)}`;
      const lineage = await omClient
        .get<LineageResponse>(lineagePath, { upstreamDepth, downstreamDepth })
        .catch((err: unknown) => {
          caveats.push(`lineage failed: ${err instanceof Error ? err.message : String(err)}`);
          return {} as LineageResponse;
        });
    
      const nodes: LineageNode[] = Array.isArray(lineage.nodes) ? lineage.nodes : [];
      const downstreamEdges: LineageEdge[] = Array.isArray(lineage.downstreamEdges) ? lineage.downstreamEdges : [];
      const upstreamEdges: LineageEdge[] = Array.isArray(lineage.upstreamEdges) ? lineage.upstreamEdges : [];
    
      const nodeById = new Map<string, LineageNode>();
      for (const n of nodes) if (n.id) nodeById.set(n.id, n);
    
      // Walk downstream BFS from the root entity, classifying every reachable node.
      const rootId = lineage.entity?.id ?? null;
      const downstream = new Set<string>();
      const downstreamByType: Record<string, number> = {};
      const fanOut: Record<string, number> = {};
    
      if (rootId) {
        const queue: string[] = [rootId];
        const seen = new Set<string>([rootId]);
        while (queue.length > 0) {
          const cur = queue.shift()!;
          for (const e of downstreamEdges) {
            if (e.fromEntity?.id !== cur || !e.toEntity?.id) continue;
            const next = e.toEntity.id;
            fanOut[cur] = (fanOut[cur] ?? 0) + 1;
            if (seen.has(next)) continue;
            seen.add(next);
            downstream.add(next);
            const node = nodeById.get(next);
            const t = node?.type ?? "unknown";
            downstreamByType[t] = (downstreamByType[t] ?? 0) + 1;
            queue.push(next);
          }
        }
      }
    
      const upstream = new Set<string>();
      if (rootId) {
        for (const e of upstreamEdges) {
          if (e.toEntity?.id !== rootId || !e.fromEntity?.id) continue;
          upstream.add(e.fromEntity.id);
        }
      }
    
      // "Top consumers" = highest fan-out leaf nodes (most siblings rely on them).
      const topConsumers = [...downstream]
        .map((id) => ({ id, node: nodeById.get(id), score: fanOut[id] ?? 0 }))
        .sort((a, b) => b.score - a.score)
        .slice(0, topConsumersLimit)
        .map((c) => ({
          id: c.id,
          name: c.node?.displayName ?? c.node?.name ?? null,
          fqn: c.node?.fullyQualifiedName ?? null,
          type: c.node?.type ?? null,
          downstreamFanOut: c.score,
        }));
    
      // Owner resolution via /search/query (entity index): one call returns owners
      // for every FQN we collected. Skipped when includeOwners=false.
      let owners: Array<{ name: string; displayName?: string; type?: string }> = [];
      if (includeOwners && downstream.size > 0) {
        const fqns = [...downstream]
          .map((id) => nodeById.get(id)?.fullyQualifiedName)
          .filter((s): s is string => typeof s === "string" && s.length > 0);
    
        if (fqns.length > 0) {
          const query = fqns.map((f) => `fullyQualifiedName:"${f.replace(/"/g, "")}"`).join(" OR ");
          const { search } = await aggregate(
            {
              search: () =>
                omClient.get<{ hits?: { hits?: Array<{ _source?: OwnerHit }> } }>(
                  "/search/query",
                  { q: query, index: "all", size: Math.min(fqns.length, 200), include_source_fields: "owners,fullyQualifiedName" },
                ),
            },
            caveats,
          );
          const hits = search?.hits?.hits ?? [];
          const seen = new Map<string, { name: string; displayName?: string; type?: string }>();
          for (const h of hits) {
            for (const o of h._source?.owners ?? []) {
              if (!o.name) continue;
              if (!seen.has(o.name)) seen.set(o.name, { name: o.name, displayName: o.displayName, type: o.type });
            }
          }
          owners = [...seen.values()];
        }
      }
    
      return {
        entity: {
          id: lineage.entity?.id ?? null,
          fqn: lineage.entity?.fullyQualifiedName ?? fqn,
          type: lineage.entity?.type ?? entity,
          name: lineage.entity?.displayName ?? lineage.entity?.name ?? null,
        },
        impact: {
          upstreamCount: upstream.size,
          downstreamCount: downstream.size,
          downstreamByType,
          ownersAffected: owners.length,
          depthWalked: { upstream: upstreamDepth, downstream: downstreamDepth },
        },
        topConsumers,
        owners,
        caveats,
      };
    }
  • Zod schema `lineageImpactSchema` defining input params: entity type enum, fqn, downstreamDepth (1-5, default 3), upstreamDepth (0-3, default 1), includeOwners (default true), topConsumersLimit (1-20, default 5).
    export const lineageImpactSchema = z.object({
      entity: entityTypeEnum,
      fqn: z.string().describe("Entity fully qualified name (e.g. 'mysql.default.warehouse.orders')"),
      downstreamDepth: z.coerce.number().int().min(1).max(5).optional().default(3)
        .describe("Lineage depth to walk downstream (default 3, max 5)"),
      upstreamDepth: z.coerce.number().int().min(0).max(3).optional().default(1)
        .describe("Lineage depth to walk upstream (default 1)"),
      includeOwners: z.boolean().optional().default(true)
        .describe("Resolve owners (users/teams) of each downstream entity for change-management notifications"),
      topConsumersLimit: z.coerce.number().int().min(1).max(20).optional().default(5)
        .describe("Number of highest-degree downstream consumers to surface in the summary"),
    });
  • src/index.ts:464-484 (registration)
    Registration wrapper and `tool()` call. Defines LINEAGE_IMPACT_CARD_URI, wraps handler with `lineageImpactWithCard` (parses result JSON and attaches card metadata), then registers as MCP tool 'lineage-impact' with description and schema.
    const LINEAGE_IMPACT_CARD_URI = "ui://widget/lineage-impact.html";
    const wrappedLineageImpact = wrapToolHandler(lineageImpact);
    async function lineageImpactWithCard(args: Parameters<typeof wrappedLineageImpact>[0]) {
      const result = await wrappedLineageImpact(args);
      if (result.isError) return result;
      try {
        const structured = JSON.parse(result.content[0].text);
        return {
          ...result,
          structuredContent: structured,
          _meta: {
            "openai/outputTemplate": LINEAGE_IMPACT_CARD_URI,
            "ui.resourceUri": LINEAGE_IMPACT_CARD_URI,
          },
        };
      } catch { return result; }
    }
    
    tool("lineage-impact",
      "Aggregated downstream impact analysis: walks lineage (default 3 levels down, 1 up), counts unique consumers, breaks down by entity type, surfaces highest-fan-out top consumers, and (optionally) resolves the union of owners affected. Answers 'who/what breaks if I change X?' in one call instead of recursive get-lineage walks. Renders an Apps SDK card on ChatGPT clients.",
      lineageImpactSchema.shape, lineageImpactWithCard);
  • Resource registration of 'lineage-impact-card' serving the HTML UI template (lineage-impact.html) for Apps SDK clients (ChatGPT).
    server.registerResource(
      "lineage-impact-card",
      "ui://widget/lineage-impact.html",
      {
        title: "Lineage Impact card",
        description: "Apps SDK UI template rendered with lineage-impact tool output",
        mimeType: "text/html+skybridge",
        _meta: {
          "openai/outputTemplate": "ui://widget/lineage-impact.html",
          "ui.resourceUri": "ui://widget/lineage-impact.html",
        },
      },
      async (uri) => ({
        contents: [{
          uri: uri.toString(),
          mimeType: "text/html+skybridge",
          text: LINEAGE_IMPACT_HTML,
  • Prompt template 'lineage-impact-analysis' — instructs the LLM to walk lineage manually step-by-step (alternative to the one-call lineage-impact tool).
    ({ entityFqn, direction, depth }) => {
      const dir = direction ?? "downstream";
      const d = depth ?? "3";
      const upstreamDepth = dir === "downstream" ? "0" : d;
      const downstreamDepth = dir === "upstream" ? "0" : d;
      return {
        messages: [
          {
            role: "user",
            content: {
              type: "text",
              text: [
                `Perform a lineage impact analysis for ${entityFqn} (direction='${dir}', depth=${d}).`,
                "",
                "Steps:",
                `1. Identify the entity type from the FQN prefix and call the matching by-name tool to fetch the entity (e.g. \`get-table-by-name\` for tables, \`get-topic-by-name\` for topics, \`get-dashboard-by-name\` for dashboards, \`get-pipeline-by-name\` for pipelines, \`get-ml-model-by-name\` for ML models, \`get-container-by-name\` for containers). Pass fields='owners,tags,domain' so the source entity's owners are included.`,
                `2. Call \`get-lineage-by-name\` with entity=<resolved type> (one of 'table' | 'topic' | 'dashboard' | 'pipeline' | 'mlmodel' | 'container'), fqn=${JSON.stringify(entityFqn)}, upstreamDepth=${upstreamDepth}, downstreamDepth=${downstreamDepth}.`,
                "3. Walk the response edges (`upstreamEdges` / `downstreamEdges`) and collect the unique affected entity FQNs and their entity types from `nodes`.",
                "4. For each affected entity, call the corresponding `get-<type>-by-name` tool with fields='owners,tags' to retrieve current owners. De-duplicate owners across entities.",
                "5. Produce an impact report with:",
                "   - Blast radius: count of affected entities grouped by entity type and depth level.",
                "   - Entities at risk: a table of FQN, type, depth, owners (name + email).",
                "   - Owners to notify: de-duplicated list of users/teams with the entities they own in this blast radius.",
                "   - Recommended next steps (e.g. announce in #data-platform, open a change ticket, schedule downtime).",
              ].join("\n"),
            },
          },
        ],
Behavior4/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description effectively discloses key behaviors: default depths (3 down, 1 up), counting consumers, entity type breakdown, top consumers, optional owner resolution, and rendering an Apps SDK card. It does not mention performance implications or confirm read-only nature, but covers most relevant traits.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is concise (three sentences) with front-loaded purpose. Every sentence contributes essential information: what it does, its key capabilities, and a client-specific rendering detail. No redundancy or filler.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness5/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Despite lacking an output schema, the description thoroughly covers return value semantics (counts, breakdowns, top consumers, owners). For a tool with 6 parameters and medium complexity, it provides sufficient context for correct invocation, including example question and option for owner resolution.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema coverage is 100% with descriptions for each parameter. The description adds value by explaining how parameters like downstreamDepth, includeOwners, and topConsumersLimit serve the impact analysis goal, and clarifies defaults (e.g., default 3 down, 1 up). This goes beyond schema information.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description explicitly states the tool performs aggregated downstream impact analysis, answering 'who/what breaks if I change X?' in a single call, and distinguishes it from recursive get-lineage walks. It uses specific verbs (walks, counts, breaks, resolves) and resource (lineage), making purpose clear and distinct from siblings.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines4/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description clearly indicates when to use this tool (for impact analysis instead of multiple get-lineage calls) and mentions optional owner resolution. However, it does not explicitly state when not to use it or list alternatives beyond get-lineage, which slightly limits completeness.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/us-all/openmetadata-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server