#!/usr/bin/env python3
"""MCP Server for Cloud SQL and BigQuery access."""
import os
import json
import logging
from typing import Any, Dict, List, Optional
import asyncio
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from google.cloud import bigquery
import psycopg2
from psycopg2.extras import RealDictCursor
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DatabaseMCPServer:
"""MCP Server with Cloud SQL and BigQuery capabilities."""
def __init__(self):
self.server = Server("gcp-database-server")
self.bq_client = None
self.cloudsql_config = None
self._setup_handlers()
def _setup_handlers(self):
"""Set up MCP server handlers."""
@self.server.list_tools()
async def list_tools() -> List[Tool]:
"""List available tools."""
return [
Tool(
name="query_cloudsql",
description=(
"Execute a SQL query against the Cloud SQL PostgreSQL database. "
"This database contains customer, orders, and vendors tables with "
"recent transactional data. Use this for queries about specific "
"customer orders, vendor information, or recent sales transactions."
),
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SQL query to execute (SELECT statements only)"
}
},
"required": ["query"]
}
),
Tool(
name="query_bigquery",
description=(
"Execute a SQL query against BigQuery's thelook_ecommerce public dataset. "
"This dataset contains comprehensive e-commerce analytics data including "
"products, users, events, inventory, and distribution centers. "
"Use this for broader analytical queries, historical trends, and "
"large-scale data analysis."
),
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The BigQuery SQL query to execute"
}
},
"required": ["query"]
}
),
Tool(
name="list_cloudsql_tables",
description="List all tables in the Cloud SQL database with their schemas.",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="list_bigquery_tables",
description="List all tables in the BigQuery thelook_ecommerce dataset with their schemas.",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="get_cloudsql_schema",
description="Get the detailed schema for a specific Cloud SQL table.",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table"
}
},
"required": ["table_name"]
}
),
Tool(
name="get_bigquery_schema",
description="Get the detailed schema for a specific BigQuery table.",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table in thelook_ecommerce dataset"
}
},
"required": ["table_name"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle tool calls."""
try:
if name == "query_cloudsql":
result = await self._query_cloudsql(arguments["query"])
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
elif name == "query_bigquery":
result = await self._query_bigquery(arguments["query"])
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
elif name == "list_cloudsql_tables":
result = await self._list_cloudsql_tables()
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "list_bigquery_tables":
result = await self._list_bigquery_tables()
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_cloudsql_schema":
result = await self._get_cloudsql_schema(arguments["table_name"])
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_bigquery_schema":
result = await self._get_bigquery_schema(arguments["table_name"])
return [TextContent(type="text", text=json.dumps(result, indent=2))]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Error executing tool {name}: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
def _get_cloudsql_connection(self):
"""Get Cloud SQL database connection."""
if not self.cloudsql_config:
self.cloudsql_config = {
'host': os.getenv('DB_HOST'),
'database': os.getenv('CLOUDSQL_DATABASE', 'salesdb'),
'user': os.getenv('CLOUDSQL_USER', 'salesuser'),
'password': os.getenv('CLOUDSQL_PASSWORD')
}
return psycopg2.connect(**self.cloudsql_config)
def _get_bigquery_client(self):
"""Get BigQuery client."""
if not self.bq_client:
self.bq_client = bigquery.Client(project=os.getenv('GCP_PROJECT_ID'))
return self.bq_client
async def _query_cloudsql(self, query: str) -> Dict[str, Any]:
"""Execute query against Cloud SQL."""
# Basic SQL injection prevention
query = query.strip()
if not query.upper().startswith('SELECT'):
raise ValueError("Only SELECT queries are allowed")
conn = self._get_cloudsql_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query)
rows = cur.fetchall()
return {
"source": "CloudSQL",
"row_count": len(rows),
"data": [dict(row) for row in rows]
}
finally:
conn.close()
async def _query_bigquery(self, query: str) -> Dict[str, Any]:
"""Execute query against BigQuery."""
client = self._get_bigquery_client()
# Ensure query uses the public dataset
if 'thelook_ecommerce' not in query:
query = query.replace('FROM ', 'FROM `bigquery-public-data.thelook_ecommerce`.')
query_job = client.query(query)
results = query_job.result()
rows = [dict(row) for row in results]
return {
"source": "BigQuery",
"row_count": len(rows),
"total_bytes_processed": results.total_bytes_processed,
"data": rows
}
async def _list_cloudsql_tables(self) -> Dict[str, Any]:
"""List all tables in Cloud SQL."""
query = """
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name, ordinal_position
"""
conn = self._get_cloudsql_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query)
rows = cur.fetchall()
tables = {}
for row in rows:
table = row['table_name']
if table not in tables:
tables[table] = []
tables[table].append({
'column': row['column_name'],
'type': row['data_type']
})
return {"source": "CloudSQL", "tables": tables}
finally:
conn.close()
async def _list_bigquery_tables(self) -> Dict[str, Any]:
"""List all tables in BigQuery dataset."""
client = self._get_bigquery_client()
dataset_ref = client.dataset('thelook_ecommerce', project='bigquery-public-data')
tables = {}
for table in client.list_tables(dataset_ref):
table_ref = dataset_ref.table(table.table_id)
table_obj = client.get_table(table_ref)
tables[table.table_id] = [
{'column': field.name, 'type': field.field_type}
for field in table_obj.schema
]
return {"source": "BigQuery", "tables": tables}
async def _get_cloudsql_schema(self, table_name: str) -> Dict[str, Any]:
"""Get schema for a specific Cloud SQL table."""
query = """
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s
ORDER BY ordinal_position
"""
conn = self._get_cloudsql_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query, (table_name,))
rows = cur.fetchall()
return {
"source": "CloudSQL",
"table": table_name,
"columns": [dict(row) for row in rows]
}
finally:
conn.close()
async def _get_bigquery_schema(self, table_name: str) -> Dict[str, Any]:
"""Get schema for a specific BigQuery table."""
client = self._get_bigquery_client()
table_ref = f"bigquery-public-data.thelook_ecommerce.{table_name}"
table = client.get_table(table_ref)
return {
"source": "BigQuery",
"table": table_name,
"columns": [
{
'column_name': field.name,
'data_type': field.field_type,
'mode': field.mode,
'description': field.description
}
for field in table.schema
]
}
async def run(self):
"""Run the MCP server."""
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
async def main():
"""Main entry point."""
server = DatabaseMCPServer()
await server.run()
if __name__ == "__main__":
asyncio.run(main())