Skip to main content
Glama

dbt-mcp

Official
by dbt-labs
session.py796 B
import contextlib import os from collections.abc import AsyncGenerator from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client @contextlib.asynccontextmanager async def session_context() -> AsyncGenerator[ClientSession, None]: async with ( streamablehttp_client( url=f"https://{os.environ.get('DBT_HOST')}/api/ai/v1/mcp/", headers={ "Authorization": f"token {os.environ.get('DBT_TOKEN')}", "x-dbt-prod-environment-id": os.environ.get("DBT_PROD_ENV_ID", ""), }, ) as ( read_stream, write_stream, _, ), ClientSession(read_stream, write_stream) as session, ): await session.initialize() yield session

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dbt-labs/dbt-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server