We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/dbt-labs/dbt-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import asyncio
import textwrap
from enum import StrEnum
from typing import Any, ClassVar, Literal, TypedDict
import httpx
from pydantic import BaseModel, ConfigDict, Field
from dbt_mcp.config.config_providers import ConfigProvider, DiscoveryConfig
from dbt_mcp.discovery.graphql import load_query
from dbt_mcp.errors import InvalidParameterError, ToolCallError
from dbt_mcp.errors.common import NotFoundError
from dbt_mcp.gql.errors import raise_gql_error
from dbt_mcp.tools.parameters import LineageResourceType
DEFAULT_PAGE_SIZE = 100
DEFAULT_MAX_NODE_QUERY_LIMIT = 10000
# dbt-labs first-party packages (from dbt-labs/dbt-adapters monorepo)
# These are the only packages maintained directly by dbt Labs
# See: https://github.com/dbt-labs/dbt-adapters
DBT_BUILTIN_PACKAGES = frozenset(
{
"dbt", # dbt-core
"dbt_postgres", # dbt-postgres
"dbt_redshift", # dbt-redshift
"dbt_snowflake", # dbt-snowflake
"dbt_bigquery", # dbt-bigquery
"dbt_spark", # dbt-spark
"dbt_athena", # dbt-athena
}
)
class GraphQLQueries:
GET_MODELS = textwrap.dedent("""
query GetModels(
$environmentId: BigInt!,
$modelsFilter: ModelAppliedFilter,
$after: String,
$first: Int,
$sort: AppliedModelSort
) {
environment(id: $environmentId) {
applied {
models(filter: $modelsFilter, after: $after, first: $first, sort: $sort) {
pageInfo {
endCursor
}
edges {
node {
name
uniqueId
description
}
}
}
}
}
}
""")
GET_MODEL_HEALTH = textwrap.dedent("""
query GetModelDetails(
$environmentId: BigInt!,
$modelsFilter: ModelAppliedFilter
$first: Int,
) {
environment(id: $environmentId) {
applied {
models(filter: $modelsFilter, first: $first) {
edges {
node {
name
uniqueId
executionInfo {
lastRunGeneratedAt
lastRunStatus
executeCompletedAt
executeStartedAt
}
tests {
name
description
columnName
testType
executionInfo {
lastRunGeneratedAt
lastRunStatus
executeCompletedAt
executeStartedAt
}
}
ancestors(types: [Model, Source, Seed, Snapshot]) {
... on ModelAppliedStateNestedNode {
name
uniqueId
resourceType
materializedType
modelexecutionInfo: executionInfo {
lastRunStatus
executeCompletedAt
}
}
... on SnapshotAppliedStateNestedNode {
name
uniqueId
resourceType
snapshotExecutionInfo: executionInfo {
lastRunStatus
executeCompletedAt
}
}
... on SeedAppliedStateNestedNode {
name
uniqueId
resourceType
seedExecutionInfo: executionInfo {
lastRunStatus
executeCompletedAt
}
}
... on SourceAppliedStateNestedNode {
sourceName
name
resourceType
freshness {
maxLoadedAt
maxLoadedAtTimeAgoInS
freshnessStatus
}
}
}
}
}
}
}
}
}
""")
COMMON_FIELDS_PARENTS_CHILDREN = textwrap.dedent("""
{
... on ExposureAppliedStateNestedNode {
resourceType
name
description
}
... on ExternalModelNode {
resourceType
description
name
}
... on MacroDefinitionNestedNode {
resourceType
name
description
}
... on MetricDefinitionNestedNode {
resourceType
name
description
}
... on ModelAppliedStateNestedNode {
resourceType
name
description
}
... on SavedQueryDefinitionNestedNode {
resourceType
name
description
}
... on SeedAppliedStateNestedNode {
resourceType
name
description
}
... on SemanticModelDefinitionNestedNode {
resourceType
name
description
}
... on SnapshotAppliedStateNestedNode {
resourceType
name
description
}
... on SourceAppliedStateNestedNode {
resourceType
sourceName
uniqueId
name
description
}
... on TestAppliedStateNestedNode {
resourceType
name
description
}
""")
GET_MODEL_PARENTS = (
textwrap.dedent("""
query GetModelParents(
$environmentId: BigInt!,
$modelsFilter: ModelAppliedFilter
$first: Int,
) {
environment(id: $environmentId) {
applied {
models(filter: $modelsFilter, first: $first) {
pageInfo {
endCursor
}
edges {
node {
parents
""")
+ COMMON_FIELDS_PARENTS_CHILDREN
+ textwrap.dedent("""
}
}
}
}
}
}
}
""")
)
GET_MODEL_CHILDREN = (
textwrap.dedent("""
query GetModelChildren(
$environmentId: BigInt!,
$modelsFilter: ModelAppliedFilter
$first: Int,
) {
environment(id: $environmentId) {
applied {
models(filter: $modelsFilter, first: $first) {
pageInfo {
endCursor
}
edges {
node {
children
""")
+ COMMON_FIELDS_PARENTS_CHILDREN
+ textwrap.dedent("""
}
}
}
}
}
}
}
""")
)
GET_SOURCES = textwrap.dedent("""
query GetSources(
$environmentId: BigInt!,
$sourcesFilter: SourceAppliedFilter,
$after: String,
$first: Int
) {
environment(id: $environmentId) {
applied {
sources(filter: $sourcesFilter, after: $after, first: $first) {
pageInfo {
hasNextPage
endCursor
}
edges {
node {
name
uniqueId
identifier
description
sourceName
resourceType
database
schema
freshness {
maxLoadedAt
maxLoadedAtTimeAgoInS
freshnessStatus
}
}
}
}
}
}
}
""")
GET_EXPOSURES = textwrap.dedent("""
query Exposures($environmentId: BigInt!, $first: Int, $after: String) {
environment(id: $environmentId) {
definition {
exposures(first: $first, after: $after) {
totalCount
pageInfo {
hasNextPage
endCursor
}
edges {
node {
name
uniqueId
url
description
}
}
}
}
}
}
""")
GET_MACROS = textwrap.dedent("""
query GetMacros(
$environmentId: BigInt!,
$filter: AppliedResourcesFilter!,
$after: String,
$first: Int
) {
environment(id: $environmentId) {
applied {
resources(filter: $filter, after: $after, first: $first) {
pageInfo {
hasNextPage
endCursor
}
edges {
node {
... on MacroDefinitionNode {
name
uniqueId
description
packageName
}
}
}
}
}
}
}
""")
# Lineage query
GET_FULL_LINEAGE = load_query("get_full_lineage.gql")
class MetadataAPIClient:
def __init__(self, config_provider: ConfigProvider[DiscoveryConfig]):
self.config_provider = config_provider
async def execute_query(self, query: str, variables: dict) -> dict:
config = await self.config_provider.get_config()
url = config.url
headers = config.headers_provider.get_headers()
async with httpx.AsyncClient() as client:
response = await client.post(
url=url,
json={"query": query, "variables": variables},
headers=headers,
)
response.raise_for_status()
return response.json()
class PageInfo(BaseModel):
model_config = ConfigDict(extra="ignore", populate_by_name=True)
end_cursor: str | None = Field(default=None, alias="endCursor")
has_next_page: bool | None = Field(default=None, alias="hasNextPage")
class PaginatedResourceFetcher:
def __init__(
self,
api_client: MetadataAPIClient,
*,
edges_path: tuple[str, ...],
page_info_path: tuple[str, ...],
page_size: int = DEFAULT_PAGE_SIZE,
max_node_query_limit: int = DEFAULT_MAX_NODE_QUERY_LIMIT,
):
self.api_client = api_client
self._edges_path = edges_path
self._page_info_path = page_info_path
self._page_size = page_size
self._max_node_query_limit = max_node_query_limit
async def get_environment_id(self) -> int:
config = await self.api_client.config_provider.get_config()
return config.environment_id
def _extract_path(self, payload: dict, path: tuple[str, ...]) -> Any:
current = payload
for key in path:
current = current[key]
return current
def _parse_edges(self, result: dict) -> list[dict]:
raise_gql_error(result)
edges = self._extract_path(result, self._edges_path)
parsed_edges: list[dict] = []
if not edges:
return parsed_edges
for edge in edges:
if not isinstance(edge, dict) or "node" not in edge:
continue
node = edge["node"]
if not isinstance(node, dict):
continue
parsed_edges.append(node)
return parsed_edges
def _should_continue(
self,
page_info: PageInfo,
previous_cursor: str | None,
) -> bool:
next_cursor = page_info.end_cursor
has_next = page_info.has_next_page
next_cursor_valid = bool(next_cursor) and next_cursor != previous_cursor
if isinstance(has_next, bool):
return has_next and next_cursor_valid
return next_cursor_valid
async def fetch_paginated(
self,
query: str,
variables: dict[str, Any],
) -> list[dict]:
environment_id = await self.get_environment_id()
collected: list[dict] = []
current_cursor: str | None = None
while True:
if len(collected) >= self._max_node_query_limit:
break
remaining_capacity = self._max_node_query_limit - len(collected)
request_variables = variables.copy()
request_variables["environmentId"] = environment_id
request_variables["first"] = min(self._page_size, remaining_capacity)
if current_cursor is not None:
request_variables["after"] = current_cursor
result = await self.api_client.execute_query(query, request_variables)
page_edges = self._parse_edges(result)
collected.extend(page_edges)
page_info_data = self._extract_path(result, self._page_info_path)
page_info = PageInfo(**page_info_data)
previous_cursor = current_cursor
current_cursor = page_info.end_cursor
if not self._should_continue(page_info, previous_cursor):
break
return collected
class ModelFilter(TypedDict, total=False):
modelingLayer: Literal["marts"] | None
class SourceFilter(TypedDict, total=False):
sourceNames: list[str]
uniqueIds: list[str] | None
identifier: str
class MacroFilter(TypedDict, total=False):
types: list[str]
uniqueIds: list[str] | None
class ModelsFetcher:
def __init__(
self,
api_client: MetadataAPIClient,
paginator: PaginatedResourceFetcher,
):
self.api_client = api_client
self._paginator = paginator
def _get_model_filters(
self, model_name: str | None = None, unique_id: str | None = None
) -> dict[str, list[str] | str]:
if unique_id:
return {"uniqueIds": [unique_id]}
elif model_name:
return {"identifier": model_name}
else:
raise InvalidParameterError(
"Either model_name or unique_id must be provided"
)
async def fetch_models(self, model_filter: ModelFilter | None = None) -> list[dict]:
return await self._paginator.fetch_paginated(
GraphQLQueries.GET_MODELS,
variables={
"modelsFilter": model_filter or {},
"sort": {"field": "queryUsageCount", "direction": "desc"},
},
)
async def fetch_model_parents(
self, model_name: str | None = None, unique_id: str | None = None
) -> list[dict]:
model_filters = self._get_model_filters(model_name, unique_id)
variables = {
"environmentId": await self._paginator.get_environment_id(),
"modelsFilter": model_filters,
"first": 1,
}
result = await self.api_client.execute_query(
GraphQLQueries.GET_MODEL_PARENTS, variables
)
raise_gql_error(result)
edges = result["data"]["environment"]["applied"]["models"]["edges"]
if not edges:
return []
return edges[0]["node"]["parents"]
async def fetch_model_children(
self, model_name: str | None = None, unique_id: str | None = None
) -> list[dict]:
model_filters = self._get_model_filters(model_name, unique_id)
variables = {
"environmentId": await self._paginator.get_environment_id(),
"modelsFilter": model_filters,
"first": 1,
}
result = await self.api_client.execute_query(
GraphQLQueries.GET_MODEL_CHILDREN, variables
)
raise_gql_error(result)
edges = result["data"]["environment"]["applied"]["models"]["edges"]
if not edges:
return []
return edges[0]["node"]["children"]
async def fetch_model_health(
self, model_name: str | None = None, unique_id: str | None = None
) -> list[dict]:
model_filters = self._get_model_filters(model_name, unique_id)
variables = {
"environmentId": await self._paginator.get_environment_id(),
"modelsFilter": model_filters,
"first": 1,
}
result = await self.api_client.execute_query(
GraphQLQueries.GET_MODEL_HEALTH, variables
)
raise_gql_error(result)
edges = result["data"]["environment"]["applied"]["models"]["edges"]
if not edges:
return []
return edges[0]["node"]
class ExposuresFetcher:
def __init__(
self,
api_client: MetadataAPIClient,
paginator: PaginatedResourceFetcher,
):
self.api_client = api_client
self._paginator = paginator
async def fetch_exposures(self) -> list[dict]:
return await self._paginator.fetch_paginated(
GraphQLQueries.GET_EXPOSURES,
variables={},
)
class SourcesFetcher:
def __init__(
self,
api_client: MetadataAPIClient,
paginator: PaginatedResourceFetcher,
):
self.api_client = api_client
self._paginator = paginator
async def get_environment_id(self) -> int:
return await self._paginator.get_environment_id()
async def fetch_sources(
self,
source_names: list[str] | None = None,
unique_ids: list[str] | None = None,
) -> list[dict]:
source_filter: SourceFilter = {}
if source_names is not None:
source_filter["sourceNames"] = source_names
if unique_ids is not None:
source_filter["uniqueIds"] = unique_ids
return await self._paginator.fetch_paginated(
GraphQLQueries.GET_SOURCES,
variables={"sourcesFilter": source_filter},
)
class MacrosFetcher:
def __init__(
self,
api_client: MetadataAPIClient,
paginator: PaginatedResourceFetcher,
):
self.api_client = api_client
self._paginator = paginator
async def fetch_macros(
self,
package_names: list[str] | None = None,
return_package_names_only: bool = False,
include_default_dbt_packages: bool = False,
) -> list[dict] | list[str]:
"""Fetch all macros with optional filtering.
Args:
package_names: Optional list of package names to filter by.
return_package_names_only: If True, returns only unique package names
instead of full macro details. Useful for discovering available
packages before drilling down.
include_default_dbt_packages: If True, includes the default dbt macros that
are maintained by dbt Labs.
Returns:
List of macros with name, uniqueId, description, and packageName,
or list of unique package names if return_package_names_only is True.
"""
macro_filter: MacroFilter = {"types": ["Macro"]}
macros = await self._paginator.fetch_paginated(
GraphQLQueries.GET_MACROS,
variables={"filter": macro_filter},
)
# Filter out dbt-labs first-party macros unless include_default_dbt_packages is True
if not include_default_dbt_packages:
macros = [
m
for m in macros
if not self._is_dbt_builtin_package(m.get("packageName", ""))
]
# Filter by package names if specified
if package_names is not None:
package_names_lower = [p.lower() for p in package_names]
macros = [
m
for m in macros
if m.get("packageName", "").lower() in package_names_lower
]
# Return only unique package names if requested
if return_package_names_only:
unique_packages = sorted(
{name for m in macros if (name := m.get("packageName"))}
)
return unique_packages
return macros
def _is_dbt_builtin_package(self, package_name: str) -> bool:
"""Check if a package is dbt core or a dbt-labs first-party adapter."""
if not package_name:
return False
return package_name.lower() in DBT_BUILTIN_PACKAGES
class AppliedResourceType(StrEnum):
MODEL = "model"
SOURCE = "source"
EXPOSURE = "exposure"
TEST = "test"
SEED = "seed"
SNAPSHOT = "snapshot"
MACRO = "macro"
SEMANTIC_MODEL = "semantic_model"
class ResourceDetailsFetcher:
GET_PACKAGES_QUERY = load_query("get_packages.gql")
GET_MODELS_DETAILS_QUERY = load_query("get_model_details.gql")
GET_SOURCES_QUERY = load_query("get_source_details.gql")
GET_EXPOSURES_QUERY = load_query("get_exposure_details.gql")
GET_TESTS_QUERY = load_query("get_test_details.gql")
GET_SEEDS_QUERY = load_query("get_seed_details.gql")
GET_SNAPSHOTS_QUERY = load_query("get_snapshot_details.gql")
GET_MACROS_QUERY = load_query("get_macro_details.gql")
GET_SEMANTIC_MODELS_QUERY = load_query("get_semantic_model_details.gql")
GQL_QUERIES: ClassVar[dict[AppliedResourceType, str]] = {
AppliedResourceType.MODEL: GET_MODELS_DETAILS_QUERY,
AppliedResourceType.SOURCE: GET_SOURCES_QUERY,
AppliedResourceType.EXPOSURE: GET_EXPOSURES_QUERY,
AppliedResourceType.TEST: GET_TESTS_QUERY,
AppliedResourceType.SEED: GET_SEEDS_QUERY,
AppliedResourceType.SNAPSHOT: GET_SNAPSHOTS_QUERY,
AppliedResourceType.MACRO: GET_MACROS_QUERY,
AppliedResourceType.SEMANTIC_MODEL: GET_SEMANTIC_MODELS_QUERY,
}
RESOURCE_TYPE_TO_GQL_TYPE: ClassVar[dict[AppliedResourceType, str]] = {
AppliedResourceType.MODEL: "Model",
AppliedResourceType.SOURCE: "Source",
AppliedResourceType.EXPOSURE: "Exposure",
AppliedResourceType.TEST: "Test",
AppliedResourceType.SEED: "Seed",
AppliedResourceType.SNAPSHOT: "Snapshot",
AppliedResourceType.MACRO: "Macro",
AppliedResourceType.SEMANTIC_MODEL: "SemanticModel",
}
def __init__(
self,
api_client: MetadataAPIClient,
):
self.api_client = api_client
async def fetch_details(
self,
resource_type: AppliedResourceType,
name: str | None = None,
unique_id: str | None = None,
) -> list[dict]:
normalized_name = name.strip().lower() if name else None
normalized_unique_id = unique_id.strip().lower() if unique_id else None
environment_id = (
await self.api_client.config_provider.get_config()
).environment_id
if not normalized_name and not normalized_unique_id:
raise InvalidParameterError("Either name or unique_id must be provided")
if (
normalized_name
and normalized_unique_id
and normalized_name != normalized_unique_id.split(".")[-1]
):
raise InvalidParameterError(
f"Name and unique_id do not match. The unique_id does not end with {normalized_name}."
)
if not normalized_unique_id:
assert normalized_name is not None, "Name must be provided"
packages_result = await asyncio.gather(
self.api_client.execute_query(
self.GET_PACKAGES_QUERY,
variables={"resource": "macro", "environmentId": environment_id},
),
self.api_client.execute_query(
self.GET_PACKAGES_QUERY,
variables={"resource": "model", "environmentId": environment_id},
),
)
raise_gql_error(packages_result[0])
raise_gql_error(packages_result[1])
macro_packages = packages_result[0]["data"]["environment"]["applied"][
"packages"
]
model_packages = packages_result[1]["data"]["environment"]["applied"][
"packages"
]
if not macro_packages and not model_packages:
raise InvalidParameterError("No packages found for project")
unique_ids = [
f"{resource_type.value.lower()}.{package_name}.{normalized_name}"
for package_name in macro_packages + model_packages
]
else:
unique_ids = [normalized_unique_id]
query = self.GQL_QUERIES[resource_type]
variables = {
"environmentId": environment_id,
"filter": {
"uniqueIds": unique_ids,
"types": [self.RESOURCE_TYPE_TO_GQL_TYPE[resource_type]],
},
"first": len(unique_ids),
}
get_details_result = await self.api_client.execute_query(query, variables)
raise_gql_error(get_details_result)
edges = get_details_result["data"]["environment"]["applied"]["resources"][
"edges"
]
if not edges:
return []
return [e["node"] for e in edges]
class LineageFetcher:
"""Fetcher for lineage data. Returns nodes connected to the target."""
def __init__(self, api_client: MetadataAPIClient):
self.api_client = api_client
async def fetch_lineage(
self,
unique_id: str,
depth: int,
types: list[LineageResourceType] | None = None,
) -> list[dict]:
"""Fetch lineage graph filtered to nodes connected to unique_id.
Args:
unique_id: The dbt unique ID of the resource to get lineage for.
depth: how many levels to traverse (0 = infinite, 1 = immediate neighbors only, higher = deeper)
types: List of resource types to include. If None, includes all types.
Returns:
List of nodes connected to unique_id (upstream + downstream).
"""
if depth < 0:
raise ToolCallError("Depth must be greater than or equal to 0")
config = await self.api_client.config_provider.get_config()
type_filter = [
t.value for t in (types if types is not None else LineageResourceType)
]
variables = {
"environmentId": config.environment_id,
"types": type_filter,
# uniqueId removed - not used by GraphQL
}
result = await self.api_client.execute_query(
GraphQLQueries.GET_FULL_LINEAGE, variables
)
raise_gql_error(result)
all_nodes = (
result.get("data", {})
.get("environment", {})
.get("applied", {})
.get("lineage", [])
)
# Filter to connected nodes only
return self._filter_connected_nodes(all_nodes, unique_id, depth)
def _filter_connected_nodes(
self, nodes: list[dict], target_id: str, depth: int
) -> list[dict]:
"""Return only nodes connected to target_id (upstream and downstream).
Uses BFS to find all nodes reachable from target in both directions.
Args:
nodes: List of all nodes in the lineage graph.
target_id: The unique ID of the target node.
depth: how many levels to traverse (0 = infinite, 1 = immediate neighbors only, higher = deeper)
"""
node_map = {
n["uniqueId"]: n
for n in nodes
if (resource_type := n.get("resourceType"))
and isinstance(resource_type, str)
# Filtering out macros because they have large
# dependency graphs that aren't always useful.
and resource_type.strip().lower() != "macro"
}
if target_id not in node_map:
return []
# BFS to find all connected nodes
connected = {target_id}
queue = [(target_id, 0)]
while queue:
current_id, current_depth = queue.pop(0)
node = node_map.get(current_id)
if not node:
continue
# Stop traversing beyond the depth limit, depth=0 means infinite
if depth > 0 and current_depth >= depth:
continue
# Traverse upstream (parents)
for parent_id in node.get("parentIds", []):
if parent_id not in connected and parent_id in node_map:
connected.add(parent_id)
queue.append((parent_id, current_depth + 1))
# Traverse downstream (children)
for candidate in nodes:
candidate_id = candidate.get("uniqueId")
if not candidate_id or candidate_id not in node_map:
continue
if (
current_id in candidate.get("parentIds", [])
and candidate_id not in connected
):
connected.add(candidate_id)
queue.append((candidate_id, current_depth + 1))
# Return in original order
return [node_map[uid] for uid in connected]
class ModelPerformanceFetcher:
"""Fetches model execution performance data from Discovery API."""
GET_MODEL_PERFORMANCE_QUERY = load_query("get_model_performance.gql")
def __init__(
self,
api_client: MetadataAPIClient,
resource_details_fetcher: ResourceDetailsFetcher,
):
self.api_client = api_client
self._resource_details_fetcher = resource_details_fetcher
async def fetch_performance(
self,
name: str | None = None,
unique_id: str | None = None,
num_runs: int = 1,
include_tests: bool = False,
) -> list[dict]:
"""Fetch model performance data.
Args:
name: Model name (resolved to unique_id if provided)
unique_id: Fully-qualified unique ID (preferred)
num_runs: Number of historical runs to retrieve
include_tests: If True, include test execution history for each run
Returns:
List of performance records sorted by executeStartedAt descending
Raises:
InvalidParameterError: If neither name nor unique_id provided
ToolCallError: If model not found or API error
"""
if not name and not unique_id:
raise InvalidParameterError("Either 'name' or 'unique_id' must be provided")
# Resolve name to unique_id if needed
resolved_unique_id = unique_id
if not resolved_unique_id:
# Re-use resource_details_fetcher from ResourceDetailsFetcher to resolve name
details = await self._resource_details_fetcher.fetch_details(
resource_type=AppliedResourceType.MODEL,
name=name,
)
if not details:
raise NotFoundError(f"Model not found: {name}")
# Model name can map to multiple unique_ids - require disambiguation
# For example, if multiple dbt packages define a model with the same name
if len(details) > 1:
matches = ", ".join(
sorted(d.get("uniqueId", "") for d in details if d.get("uniqueId"))
)
raise NotFoundError(
f"Multiple models found for name '{name}'. "
"Please provide the unique_id instead. "
f"Matches: {matches}"
)
resolved_unique_id = details[0]["uniqueId"]
config = await self.api_client.config_provider.get_config()
variables = {
"environmentId": config.environment_id,
"uniqueId": resolved_unique_id,
"lastRunCount": num_runs,
}
result = await self.api_client.execute_query(
self.GET_MODEL_PERFORMANCE_QUERY,
variables,
)
raise_gql_error(result)
runs = (
result.get("data", {})
.get("environment", {})
.get("applied", {})
.get("modelHistoricalRuns", [])
)
if not include_tests and runs:
for run in runs:
run.pop("tests", None)
return runs or []