Skip to main content
Glama

BigQuery Validator

by caron14
test_integration.pyβ€’6.84 kB
"""Integration tests for mcp-bigquery.""" import os import pytest # Skip all tests in this file if no credentials pytestmark = pytest.mark.skipif( not ( os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") is not None or os.path.exists( os.path.expanduser("~/.config/gcloud/application_default_credentials.json") ) or os.environ.get("GOOGLE_CLOUD_PROJECT") is not None ), reason="BigQuery credentials not available", ) class TestRealBigQueryIntegration: """Test with actual BigQuery API calls.""" @pytest.mark.asyncio async def test_public_dataset_query(self): """Test querying a public dataset.""" from mcp_bigquery.server import dry_run_sql # Use a well-known public dataset sql = "SELECT * FROM `bigquery-public-data.samples.shakespeare` LIMIT 10" result = await dry_run_sql(sql) assert "error" not in result assert "totalBytesProcessed" in result assert result["totalBytesProcessed"] > 0 assert "usdEstimate" in result assert "referencedTables" in result assert len(result["referencedTables"]) == 1 assert result["referencedTables"][0]["project"] == "bigquery-public-data" assert result["referencedTables"][0]["dataset"] == "samples" assert result["referencedTables"][0]["table"] == "shakespeare" assert "schemaPreview" in result assert len(result["schemaPreview"]) > 0 @pytest.mark.asyncio async def test_aggregation_query(self): """Test aggregation query dry-run.""" from mcp_bigquery.server import dry_run_sql sql = """ SELECT corpus, COUNT(*) as word_count, SUM(word_count) as total_occurrences FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus """ result = await dry_run_sql(sql) assert "error" not in result assert "schemaPreview" in result # Check schema has expected columns schema_names = [field["name"] for field in result["schemaPreview"]] assert "corpus" in schema_names assert "word_count" in schema_names assert "total_occurrences" in schema_names @pytest.mark.asyncio async def test_parameterized_query(self): """Test parameterized query validation.""" from mcp_bigquery.server import validate_sql # Since we only support STRING parameters, we need to cast numeric comparisons sql = """ SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` WHERE corpus = @corpus_name AND word_count > CAST(@min_count AS INT64) """ params = {"corpus_name": "hamlet", "min_count": "10"} result = await validate_sql(sql, params) assert result["isValid"] is True @pytest.mark.asyncio async def test_invalid_table_reference(self): """Test error handling for non-existent table.""" from mcp_bigquery.server import dry_run_sql sql = "SELECT * FROM `nonexistent-project.nonexistent-dataset." "nonexistent-table`" result = await dry_run_sql(sql) assert "error" in result assert result["error"]["code"] == "INVALID_SQL" assert "message" in result["error"] @pytest.mark.asyncio async def test_syntax_error_with_location(self): """Test that syntax errors include location information.""" from mcp_bigquery.server import validate_sql sql = "SELECT FROM `bigquery-public-data.samples.shakespeare`" result = await validate_sql(sql) assert result["isValid"] is False assert "error" in result assert "message" in result["error"] # BigQuery often returns location info for syntax errors if "location" in result["error"]: assert "line" in result["error"]["location"] assert "column" in result["error"]["location"] @pytest.mark.asyncio async def test_cost_estimation_with_custom_price(self): """Test cost estimation with custom price per TiB.""" from mcp_bigquery.server import dry_run_sql sql = "SELECT * FROM `bigquery-public-data.samples.shakespeare`" # Test with different prices prices = [1.0, 5.0, 10.0] results = [] for price in prices: result = await dry_run_sql(sql, price_per_tib=price) results.append(result) assert "usdEstimate" in result # Verify that estimates scale linearly with price bytes_processed = results[0]["totalBytesProcessed"] for i, price in enumerate(prices): expected = (bytes_processed / (2**40)) * price assert abs(results[i]["usdEstimate"] - expected) < 0.000001 @pytest.mark.asyncio async def test_cte_query(self): """Test Common Table Expression (CTE) query.""" from mcp_bigquery.server import dry_run_sql, validate_sql sql = """ WITH word_stats AS ( SELECT word, SUM(word_count) as total_count FROM `bigquery-public-data.samples.shakespeare` GROUP BY word ) SELECT* FROM word_stats WHERE total_count > 100 ORDER BY total_count DESC """ # First validate validate_result = await validate_sql(sql) assert validate_result["isValid"] is True # Then dry-run dry_run_result = await dry_run_sql(sql) assert "error" not in dry_run_result assert "totalBytesProcessed" in dry_run_result @pytest.mark.asyncio async def test_environment_variable_precedence(self): """Test that price parameter takes precedence over env variable.""" import os from mcp_bigquery.server import dry_run_sql # Save original value original = os.environ.get("SAFE_PRICE_PER_TIB") try: # Set environment variable os.environ["SAFE_PRICE_PER_TIB"] = "100.0" sql = "SELECT 1" # Test with explicit price (should override env) result = await dry_run_sql(sql, price_per_tib=1.0) assert "usdEstimate" in result # SELECT 1 typically has 0 bytes, so estimate should be 0 assert result["usdEstimate"] == 0.0 # Test without explicit price (should use env) result = await dry_run_sql(sql) assert "usdEstimate" in result assert result["usdEstimate"] == 0.0 # Still 0 because 0 bytes finally: # Restore original value if original is not None: os.environ["SAFE_PRICE_PER_TIB"] = original else: os.environ.pop("SAFE_PRICE_PER_TIB", None)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/caron14/mcp-bigquery'

If you have feedback or need assistance with the MCP directory API, please join our Discord server