MCPMC (Minecraft MCP)
by gerred
- src
- mcp_server_bigquery
from google.cloud import bigquery
import logging
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
from typing import Any
# Set up logging to both stdout and file
logger = logging.getLogger('mcp_bigquery_server')
handler_stdout = logging.StreamHandler()
handler_file = logging.FileHandler('/tmp/mcp_bigquery_server.log')
# Set format for both handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler_stdout.setFormatter(formatter)
handler_file.setFormatter(formatter)
# Add both handlers to logger
logger.addHandler(handler_stdout)
logger.addHandler(handler_file)
# Set overall logging level
logger.setLevel(logging.DEBUG)
logger.info("Starting MCP BigQuery Server")
class BigQueryDatabase:
def __init__(self, project: str, location: str, datasets_filter: list[str]):
"""Initialize a BigQuery database client"""
if not project:
raise ValueError("Project is required")
if not location:
raise ValueError("Location is required")
self.client = bigquery.Client(project=project, location=location)
self.datasets_filter = datasets_filter
def execute_query(self, query: str, params: dict[str, Any] | None = None) -> list[dict[str, Any]]:
"""Execute a SQL query and return results as a list of dictionaries"""
logger.debug(f"Executing query: {query}")
try:
if params:
job = self.client.query(query, job_config=bigquery.QueryJobConfig(query_parameters=params))
else:
job = self.client.query(query)
results = job.result()
rows = [dict(row.items()) for row in results]
logger.debug(f"Query returned {len(rows)} rows")
return rows
except Exception as e:
logger.error(f"Database error executing query: {e}")
raise
def list_tables(self) -> list[str]:
"""List all tables in the BigQuery database"""
logger.debug("Listing all tables")
if self.datasets_filter:
datasets = [self.client.dataset(dataset) for dataset in self.datasets_filter]
else:
datasets = list(self.client.list_datasets())
logger.debug(f"Found {len(datasets)} datasets")
tables = []
for dataset in datasets:
dataset_tables = self.client.list_tables(dataset.dataset_id)
tables.extend([
f"{dataset.dataset_id}.{table.table_id}" for table in dataset_tables
])
logger.debug(f"Found {len(tables)} tables")
return tables
def describe_table(self, table_name: str) -> list[dict[str, Any]]:
"""Describe a table in the BigQuery database"""
logger.debug(f"Describing table: {table_name}")
parts = table_name.split(".")
if len(parts) != 2:
raise ValueError(f"Invalid table name: {table_name}")
dataset_id = parts[0]
table_id = parts[1]
query = f"""
SELECT ddl
FROM {dataset_id}.INFORMATION_SCHEMA.TABLES
WHERE table_name = @table_name;
"""
return self.execute_query(query, params=[
bigquery.ScalarQueryParameter("table_name", "STRING", table_id),
])
async def main(project: str, location: str, datasets_filter: list[str]):
logger.info(f"Starting BigQuery MCP Server with project: {project} and location: {location}")
db = BigQueryDatabase(project, location, datasets_filter)
server = Server("bigquery-manager")
# Register handlers
logger.debug("Registering handlers")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools"""
return [
types.Tool(
name="execute-query",
description="Execute a SELECT query on the BigQuery database",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "SELECT SQL query to execute using BigQuery dialect"},
},
"required": ["query"],
},
),
types.Tool(
name="list-tables",
description="List all tables in the BigQuery database",
inputSchema={
"type": "object",
"properties": {},
},
),
types.Tool(
name="describe-table",
description="Get the schema information for a specific table",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "Name of the table to describe (e.g. my_dataset.my_table)"},
},
"required": ["table_name"],
},
),
]
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict[str, Any] | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""Handle tool execution requests"""
logger.debug(f"Handling tool execution request: {name}")
try:
if name == "list-tables":
results = db.list_tables()
return [types.TextContent(type="text", text=str(results))]
elif name == "describe-table":
if not arguments or "table_name" not in arguments:
raise ValueError("Missing table_name argument")
results = db.describe_table(arguments["table_name"])
return [types.TextContent(type="text", text=str(results))]
if name == "execute-query":
results = db.execute_query(arguments["query"])
return [types.TextContent(type="text", text=str(results))]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
logger.info("Server running with stdio transport")
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="bigquery",
server_version="0.2.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)