Skip to main content
Glama

dbt-mcp

Official
by dbt-labs
test_client.py15.6 kB
from unittest.mock import Mock, patch import pytest import requests from dbt_mcp.config.config_providers import AdminApiConfig from dbt_mcp.dbt_admin.client import ( AdminAPIError, DbtAdminAPIClient, ) class MockHeadersProvider: """Mock headers provider for testing.""" def __init__(self, headers: dict[str, str]): self._headers = headers def get_headers(self) -> dict[str, str]: return self._headers class MockAdminApiConfigProvider: """Mock config provider for testing.""" def __init__(self, config: AdminApiConfig): self.config = config async def get_config(self) -> AdminApiConfig: return self.config @pytest.fixture def admin_config(): return AdminApiConfig( account_id=12345, headers_provider=MockHeadersProvider({"Authorization": "Bearer test_token"}), url="https://cloud.getdbt.com", ) @pytest.fixture def admin_config_with_prefix(): return AdminApiConfig( account_id=12345, headers_provider=MockHeadersProvider({"Authorization": "Bearer test_token"}), url="https://eu1.cloud.getdbt.com", ) @pytest.fixture def client(admin_config): config_provider = MockAdminApiConfigProvider(admin_config) return DbtAdminAPIClient(config_provider) @pytest.fixture def client_with_prefix(admin_config_with_prefix): config_provider = MockAdminApiConfigProvider(admin_config_with_prefix) return DbtAdminAPIClient(config_provider) async def test_client_initialization(client): config = await client.get_config() assert config.account_id == 12345 assert config.headers_provider.get_headers() == { "Authorization": "Bearer test_token" } assert config.url == "https://cloud.getdbt.com" headers = await client.get_headers() assert headers["Authorization"] == "Bearer test_token" assert headers["Content-Type"] == "application/json" assert headers["Accept"] == "application/json" @patch("requests.request") async def test_make_request_success(mock_request, client): mock_response = Mock() mock_response.json.return_value = {"data": "test"} mock_response.raise_for_status.return_value = None mock_request.return_value = mock_response result = await client._make_request("GET", "/test/endpoint") assert result == {"data": "test"} headers = await client.get_headers() mock_request.assert_called_once_with( "GET", "https://cloud.getdbt.com/test/endpoint", headers=headers ) @patch("requests.request") async def test_make_request_failure(mock_request, client): mock_response = Mock() mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError( "404 Not Found" ) mock_request.return_value = mock_response with pytest.raises(AdminAPIError): await client._make_request("GET", "/test/endpoint") @patch("requests.request") async def test_list_jobs(mock_request, client): mock_response = Mock() mock_response.json.return_value = { "data": [ { "id": 1, "name": "test_job", "description": "Test description", "dbt_version": "1.7.0", "job_type": "deploy", "triggers": {"github_webhook": True}, "most_recent_run": { "id": 100, "status_humanized": "Success", "started_at": "2024-01-01T00:00:00Z", "finished_at": "2024-01-01T00:05:00Z", }, "most_recent_completed_run": { "id": 99, "status_humanized": "Success", "started_at": "2024-01-01T00:00:00Z", "finished_at": "2024-01-01T00:04:00Z", }, "schedule": {"cron": "0 9 * * *"}, "next_run": "2024-01-02T09:00:00Z", } ] } mock_response.raise_for_status.return_value = None mock_request.return_value = mock_response result = await client.list_jobs(12345, project_id=1, limit=10) assert len(result) == 1 assert result[0]["id"] == 1 assert result[0]["name"] == "test_job" assert result[0]["most_recent_run_id"] == 100 assert result[0]["schedule"] == "0 9 * * *" headers = await client.get_headers() mock_request.assert_called_once_with( "GET", "https://cloud.getdbt.com/api/v2/accounts/12345/jobs/?include_related=['most_recent_run','most_recent_completed_run']", headers=headers, params={"project_id": 1, "limit": 10}, ) @patch("requests.request") async def test_list_jobs_with_null_values(mock_request, client): mock_response = Mock() mock_response.json.return_value = { "data": [ { "id": 1, "name": "test_job", "description": None, "dbt_version": "1.7.0", "job_type": "deploy", "triggers": {}, "most_recent_run": None, "most_recent_completed_run": None, "schedule": None, "next_run": None, } ] } mock_response.raise_for_status.return_value = None mock_request.return_value = mock_response result = await client.list_jobs(12345) assert len(result) == 1 assert result[0]["most_recent_run_id"] is None assert result[0]["schedule"] is None @patch("requests.request") async def test_get_job_details(mock_request, client): mock_response = Mock() mock_response.json.return_value = {"data": {"id": 1, "name": "test_job"}} mock_response.raise_for_status.return_value = None mock_request.return_value = mock_response result = await client.get_job_details(12345, 1) assert result == {"id": 1, "name": "test_job"} headers = await client.get_headers() mock_request.assert_called_once_with( "GET", "https://cloud.getdbt.com/api/v2/accounts/12345/jobs/1/?include_related=['most_recent_run','most_recent_completed_run']", headers=headers, ) @patch("requests.request") async def test_trigger_job_run(mock_request, client): mock_response = Mock() mock_response.json.return_value = {"data": {"id": 200, "status": "queued"}} mock_response.raise_for_status.return_value = None mock_request.return_value = mock_response result = await client.trigger_job_run( 12345, 1, "Manual trigger", git_branch="main", schema_override="test_schema" ) assert result == {"id": 200, "status": "queued"} headers = await client.get_headers() mock_request.assert_called_once_with( "POST", "https://cloud.getdbt.com/api/v2/accounts/12345/jobs/1/run/", headers=headers, json={ "cause": "Manual trigger", "git_branch": "main", "schema_override": "test_schema", }, ) @patch("requests.request") async def test_list_jobs_runs(mock_request, client): mock_response = Mock() mock_response.json.return_value = { "data": [ { "id": 100, "status": 10, "status_humanized": "Success", "job": {"name": "test_job", "execute_step": ["dbt run"]}, "started_at": "2024-01-01T00:00:00Z", "finished_at": "2024-01-01T00:05:00Z", # Fields that should be removed "account_id": 12345, "environment_id": 1, "blocked_by": None, "used_repo_cache": True, "audit": {}, "created_at_humanized": "1 hour ago", "duration_humanized": "5 minutes", "finished_at_humanized": "1 hour ago", "queued_duration_humanized": "10 seconds", "run_duration_humanized": "4 minutes 50 seconds", "artifacts_saved": True, "artifact_s3_path": "s3://bucket/path", "has_docs_generated": True, "has_sources_generated": False, "notifications_sent": True, "executed_by_thread_id": "thread123", "updated_at": "2024-01-01T00:05:00Z", "dequeued_at": "2024-01-01T00:00:30Z", "last_checked_at": "2024-01-01T00:04:00Z", "last_heartbeat_at": "2024-01-01T00:04:30Z", "trigger": {}, "run_steps": [], "deprecation": {}, "environment": {}, } ] } mock_response.raise_for_status.return_value = None mock_request.return_value = mock_response result = await client.list_jobs_runs(12345, job_definition_id=1, status="success") assert len(result) == 1 run = result[0] assert run["id"] == 100 assert run["job_name"] == "test_job" assert run["job_steps"] == ["dbt run"] # Verify removed fields are not present removed_fields = [ "job", "account_id", "environment_id", "blocked_by", "used_repo_cache", "audit", "created_at_humanized", "duration_humanized", "finished_at_humanized", "queued_duration_humanized", "run_duration_humanized", "artifacts_saved", "artifact_s3_path", "has_docs_generated", "has_sources_generated", "notifications_sent", "executed_by_thread_id", "updated_at", "dequeued_at", "last_checked_at", "last_heartbeat_at", "trigger", "run_steps", "deprecation", "environment", ] for field in removed_fields: assert field not in run headers = await client.get_headers() mock_request.assert_called_once_with( "GET", "https://cloud.getdbt.com/api/v2/accounts/12345/runs/?include_related=['job']", headers=headers, params={"job_definition_id": 1, "status": "success"}, ) @patch("requests.request") async def test_get_job_run_details(mock_request, client): mock_response = Mock() mock_response.json.return_value = { "data": { "id": 100, "status": 10, "run_steps": [ { "id": 1, "name": "dbt run", "logs": "log data", } ], } } mock_response.raise_for_status.return_value = None mock_request.return_value = mock_response result = await client.get_job_run_details(12345, 100) assert result["id"] == 100 # Verify truncated_debug_logs and logs are removed assert "truncated_debug_logs" not in result["run_steps"][0] assert "logs" not in result["run_steps"][0] headers = await client.get_headers() mock_request.assert_called_once_with( "GET", "https://cloud.getdbt.com/api/v2/accounts/12345/runs/100/?include_related=['run_steps']", headers=headers, ) @patch("requests.request") async def test_cancel_job_run(mock_request, client): mock_response = Mock() mock_response.json.return_value = {"data": {"id": 100, "status": "cancelled"}} mock_response.raise_for_status.return_value = None mock_request.return_value = mock_response result = await client.cancel_job_run(12345, 100) assert result == {"id": 100, "status": "cancelled"} headers = await client.get_headers() mock_request.assert_called_once_with( "POST", "https://cloud.getdbt.com/api/v2/accounts/12345/runs/100/cancel/", headers=headers, ) @patch("requests.request") async def test_retry_job_run(mock_request, client): mock_response = Mock() mock_response.json.return_value = {"data": {"id": 101, "status": "queued"}} mock_response.raise_for_status.return_value = None mock_request.return_value = mock_response result = await client.retry_job_run(12345, 100) assert result == {"id": 101, "status": "queued"} headers = await client.get_headers() mock_request.assert_called_once_with( "POST", "https://cloud.getdbt.com/api/v2/accounts/12345/runs/100/retry/", headers=headers, ) @patch("requests.request") async def test_list_job_run_artifacts(mock_request, client): mock_response = Mock() mock_response.json.return_value = { "data": [ "manifest.json", "catalog.json", "compiled/my_project/models/model.sql", "run/my_project/models/model.sql", "sources.json", ] } mock_response.raise_for_status.return_value = None mock_request.return_value = mock_response result = await client.list_job_run_artifacts(12345, 100) # Should filter out compiled/ and run/ artifacts expected = ["manifest.json", "catalog.json", "sources.json"] assert result == expected headers = await client.get_headers() mock_request.assert_called_once_with( "GET", "https://cloud.getdbt.com/api/v2/accounts/12345/runs/100/artifacts/", headers=headers, ) @patch("requests.get") async def test_get_job_run_artifact_json(mock_get, client): mock_response = Mock() mock_response.json.return_value = {"nodes": {"model.test": {}}} mock_response.headers = {"content-type": "application/json"} mock_response.raise_for_status.return_value = None mock_get.return_value = mock_response result = await client.get_job_run_artifact(12345, 100, "manifest.json", step=1) # The client returns response.text, but the mock returns the mock_response.text which is a Mock object # In a real scenario with JSON content type, the API would return JSON as text assert result is not None mock_get.assert_called_once_with( "https://cloud.getdbt.com/api/v2/accounts/12345/runs/100/artifacts/manifest.json", headers={"Authorization": "Bearer test_token", "Accept": "*/*"}, params={"step": 1}, ) @patch("requests.get") async def test_get_job_run_artifact_text(mock_get, client): mock_response = Mock() mock_response.text = "LOG DATA" mock_response.headers = {"content-type": "text/plain"} mock_response.raise_for_status.return_value = None mock_get.return_value = mock_response result = await client.get_job_run_artifact(12345, 100, "logs/dbt.log") assert result == "LOG DATA" mock_get.assert_called_once_with( "https://cloud.getdbt.com/api/v2/accounts/12345/runs/100/artifacts/logs/dbt.log", headers={"Authorization": "Bearer test_token", "Accept": "*/*"}, params={}, ) @patch("requests.get") async def test_get_job_run_artifact_no_step_param(mock_get, client): mock_response = Mock() mock_response.text = "artifact content" mock_response.headers = {"content-type": "text/plain"} mock_response.raise_for_status.return_value = None mock_get.return_value = mock_response await client.get_job_run_artifact(12345, 100, "manifest.json") mock_get.assert_called_once_with( "https://cloud.getdbt.com/api/v2/accounts/12345/runs/100/artifacts/manifest.json", headers={"Authorization": "Bearer test_token", "Accept": "*/*"}, params={}, ) @patch("requests.get") async def test_get_job_run_artifact_request_exception(mock_get, client): mock_get.side_effect = requests.exceptions.HTTPError("404 Not Found") with pytest.raises(requests.exceptions.HTTPError): await client.get_job_run_artifact(12345, 100, "nonexistent.json")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dbt-labs/dbt-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server