Skip to main content
Glama
server.py12.9 kB
"""Dataproc MCP Server implementation.""" import os import structlog from mcp.server.fastmcp import FastMCP from .batch_client import DataprocBatchClient from .dataproc_client import DataprocClient from .gcloud_config import get_default_project, get_default_region logger = structlog.get_logger(__name__) # Get server configuration from environment SERVER_NAME = os.getenv("DATAPROC_MCP_SERVER_NAME", "dataproc-mcp-server") SERVER_VERSION = os.getenv("DATAPROC_MCP_SERVER_VERSION", "1.0.0") # Create FastMCP server mcp = FastMCP(SERVER_NAME) def resolve_project_and_region( project_id: str | None, region: str | None ) -> tuple[str, str] | str: """Resolve project_id and region from parameters or gcloud config defaults. Returns: Tuple of (project_id, region) if successful, error message string if failed. """ # Resolve project_id if project_id is None: project_id = get_default_project() if project_id is None: return "Error: No project_id provided and no default project configured in gcloud. Run 'gcloud config set project PROJECT_ID' or provide project_id parameter." # Resolve region if region is None: region = get_default_region() if region is None: return "Error: No region provided and no default region configured in gcloud. Run 'gcloud config set compute/region REGION' or provide region parameter." return project_id, region # Tools using FastMCP decorators @mcp.tool() async def list_clusters( project_id: str | None = None, region: str | None = None ) -> str: """List Dataproc clusters in a project and region. Args: project_id: Google Cloud project ID (optional, uses gcloud config default) region: Dataproc region (optional, uses gcloud config default) """ resolved = resolve_project_and_region(project_id, region) if isinstance(resolved, str): # Error message return resolved project_id, region = resolved client = DataprocClient() try: result = await client.list_clusters(project_id, region) return str(result) except Exception as e: logger.error("Failed to list clusters", error=str(e)) return f"Error: {str(e)}" @mcp.tool() async def create_cluster( cluster_name: str, project_id: str | None = None, region: str | None = None, num_instances: int = 2, machine_type: str = "n1-standard-4", disk_size_gb: int = 100, image_version: str = "2.1-debian11", ) -> str: """Create a new Dataproc cluster. Args: cluster_name: Name for the new cluster project_id: Google Cloud project ID (optional, uses gcloud config default) region: Dataproc region (optional, uses gcloud config default) num_instances: Number of worker instances machine_type: Machine type for cluster nodes disk_size_gb: Boot disk size in GB image_version: Dataproc image version """ resolved = resolve_project_and_region(project_id, region) if isinstance(resolved, str): # Error message return resolved project_id, region = resolved client = DataprocClient() try: result = await client.create_cluster( project_id=project_id, region=region, cluster_name=cluster_name, num_instances=num_instances, machine_type=machine_type, disk_size_gb=disk_size_gb, image_version=image_version, ) return str(result) except Exception as e: logger.error("Failed to create cluster", error=str(e)) return f"Error: {str(e)}" @mcp.tool() async def delete_cluster( cluster_name: str, project_id: str | None = None, region: str | None = None ) -> str: """Delete a Dataproc cluster. Args: cluster_name: Name of the cluster to delete project_id: Google Cloud project ID (optional, uses gcloud config default) region: Dataproc region (optional, uses gcloud config default) """ resolved = resolve_project_and_region(project_id, region) if isinstance(resolved, str): # Error message return resolved project_id, region = resolved client = DataprocClient() try: result = await client.delete_cluster(project_id, region, cluster_name) return str(result) except Exception as e: logger.error("Failed to delete cluster", error=str(e)) return f"Error: {str(e)}" @mcp.tool() async def get_cluster( cluster_name: str, project_id: str | None = None, region: str | None = None ) -> str: """Get details of a specific Dataproc cluster. Args: cluster_name: Name of the cluster project_id: Google Cloud project ID (optional, uses gcloud config default) region: Dataproc region (optional, uses gcloud config default) """ resolved = resolve_project_and_region(project_id, region) if isinstance(resolved, str): # Error message return resolved project_id, region = resolved client = DataprocClient() try: result = await client.get_cluster(project_id, region, cluster_name) return str(result) except Exception as e: logger.error("Failed to get cluster", error=str(e)) return f"Error: {str(e)}" @mcp.tool() async def submit_job( project_id: str, region: str, cluster_name: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, ) -> str: """Submit a job to a Dataproc cluster. Args: project_id: Google Cloud project ID region: Dataproc region cluster_name: Target cluster name job_type: Type of job (spark, pyspark, spark_sql, hive, pig, hadoop) main_file: Main file/class for the job args: Job arguments jar_files: JAR files to include properties: Job properties """ client = DataprocClient() try: result = await client.submit_job( project_id=project_id, region=region, cluster_name=cluster_name, job_type=job_type, main_file=main_file, args=args or [], jar_files=jar_files or [], properties=properties or {}, ) return str(result) except Exception as e: logger.error("Failed to submit job", error=str(e)) return f"Error: {str(e)}" @mcp.tool() async def list_jobs( project_id: str, region: str, cluster_name: str | None = None, job_states: list[str] | None = None, ) -> str: """List jobs in a Dataproc cluster. Args: project_id: Google Cloud project ID region: Dataproc region cluster_name: Cluster name (optional) job_states: Filter by job states """ client = DataprocClient() try: result = await client.list_jobs( project_id=project_id, region=region, cluster_name=cluster_name, job_states=job_states or [], ) return str(result) except Exception as e: logger.error("Failed to list jobs", error=str(e)) return f"Error: {str(e)}" @mcp.tool() async def get_job(project_id: str, region: str, job_id: str) -> str: """Get details of a specific job. Args: project_id: Google Cloud project ID region: Dataproc region job_id: Job ID """ client = DataprocClient() try: result = await client.get_job(project_id, region, job_id) return str(result) except Exception as e: logger.error("Failed to get job", error=str(e)) return f"Error: {str(e)}" @mcp.tool() async def cancel_job(project_id: str, region: str, job_id: str) -> str: """Cancel a running job. Args: project_id: Google Cloud project ID region: Dataproc region job_id: Job ID to cancel """ client = DataprocClient() try: result = await client.cancel_job(project_id, region, job_id) return str(result) except Exception as e: logger.error("Failed to cancel job", error=str(e)) return f"Error: {str(e)}" @mcp.tool() async def create_batch_job( project_id: str, region: str, batch_id: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, service_account: str | None = None, network_uri: str | None = None, subnetwork_uri: str | None = None, ) -> str: """Create a Dataproc batch job. Args: project_id: Google Cloud project ID region: Dataproc region batch_id: Unique identifier for the batch job job_type: Type of batch job (spark, pyspark, spark_sql) main_file: Main file/class for the job args: Job arguments jar_files: JAR files to include properties: Job properties service_account: Service account email network_uri: Network URI subnetwork_uri: Subnetwork URI """ batch_client = DataprocBatchClient() try: result = await batch_client.create_batch_job( project_id=project_id, region=region, batch_id=batch_id, job_type=job_type, main_file=main_file, args=args or [], jar_files=jar_files or [], properties=properties or {}, service_account=service_account, network_uri=network_uri, subnetwork_uri=subnetwork_uri, ) return str(result) except Exception as e: logger.error("Failed to create batch job", error=str(e)) return f"Error: {str(e)}" @mcp.tool() async def list_batch_jobs(project_id: str, region: str, page_size: int = 100) -> str: """List Dataproc batch jobs. Args: project_id: Google Cloud project ID region: Dataproc region page_size: Number of results per page """ batch_client = DataprocBatchClient() try: result = await batch_client.list_batch_jobs(project_id, region, page_size) return str(result) except Exception as e: logger.error("Failed to list batch jobs", error=str(e)) return f"Error: {str(e)}" @mcp.tool() async def get_batch_job(project_id: str, region: str, batch_id: str) -> str: """Get details of a specific batch job. Args: project_id: Google Cloud project ID region: Dataproc region batch_id: Batch job ID """ batch_client = DataprocBatchClient() try: result = await batch_client.get_batch_job(project_id, region, batch_id) return str(result) except Exception as e: logger.error("Failed to get batch job", error=str(e)) return f"Error: {str(e)}" @mcp.tool() async def delete_batch_job(project_id: str, region: str, batch_id: str) -> str: """Delete a batch job. Args: project_id: Google Cloud project ID region: Dataproc region batch_id: Batch job ID to delete """ batch_client = DataprocBatchClient() try: result = await batch_client.delete_batch_job(project_id, region, batch_id) return str(result) except Exception as e: logger.error("Failed to delete batch job", error=str(e)) return f"Error: {str(e)}" @mcp.tool() async def compare_batch_jobs( batch_id_1: str, batch_id_2: str, project_id: str | None = None, region: str | None = None, ) -> str: """Compare two Dataproc batch jobs and return detailed differences. Args: batch_id_1: First batch job ID to compare batch_id_2: Second batch job ID to compare project_id: Google Cloud project ID (optional, uses gcloud config default) region: Dataproc region (optional, uses gcloud config default) """ resolved = resolve_project_and_region(project_id, region) if isinstance(resolved, str): # Error message return resolved project_id, region = resolved batch_client = DataprocBatchClient() try: result = await batch_client.compare_batches( project_id, region, batch_id_1, batch_id_2 ) return str(result) except Exception as e: logger.error("Failed to compare batch jobs", error=str(e)) return f"Error: {str(e)}" # Resources using FastMCP decorators @mcp.resource("dataproc://clusters") async def get_clusters_resource() -> str: """Get list of all Dataproc clusters.""" return "Resource listing requires project_id and region parameters" @mcp.resource("dataproc://jobs") async def get_jobs_resource() -> str: """Get list of all Dataproc jobs.""" return "Resource listing requires project_id and region parameters" # Export the FastMCP app for use in main module app = mcp

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server