Skip to main content
Glama
server.py20.8 kB
"""Model Context Protocol server for Trino. This module provides a Model Context Protocol (MCP) server that exposes Trino functionality through resources and tools, with special support for Iceberg tables. """ import os from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp.prompts import base from pydantic import Field from config import load_config from trino_client import TrinoClient # Initialize the MCP server and Trino client config = load_config() client = TrinoClient(config) # Initialize the MCP server with recommended settings for all transports mcp = FastMCP( name="Trino Explorer", instructions="This Model Context Protocol (MCP) server provides access to Trino Query Engine.", # HTTP transport settings (used by streamable-http and sse) host=os.getenv("MCP_HOST", "127.0.0.1"), port=int(os.getenv("MCP_PORT", "8000")), # Recommended for production HTTP deployments stateless_http=True, json_response=True, ) # Tools @mcp.tool(description="List all available catalogs") def show_catalogs() -> str: """List all available catalogs.""" return client.list_catalogs() @mcp.tool(description="List all schemas in a catalog") def show_schemas(catalog: str = Field(description="The name of the catalog")) -> str: """List all schemas in a catalog. Args: catalog: The name of the catalog Returns: str: List of schemas in the specified catalog """ return client.list_schemas(catalog) @mcp.tool(description="List all tables in a schema") def show_tables( catalog: str = Field(description="The name of the catalog"), schema_name: str = Field(description="The name of the schema"), ) -> str: """List all tables in a schema. Args: catalog: The name of the catalog schema_name: The name of the schema Returns: str: List of tables in the specified schema """ return client.list_tables(catalog, schema_name) @mcp.tool(description="Describe a table") def describe_table( catalog: str = Field(description="The catalog name"), schema_name: str = Field(description="The schema name"), table: str = Field(description="The name of the table"), ) -> str: """Describe a table. Args: catalog (str): The catalog name schema_name (str): The schema name table (str): The name of the table Returns: str: Table description in JSON format """ return client.describe_table(catalog, schema_name, table) @mcp.tool(description="Show the CREATE TABLE statement for a specific table") def show_create_table( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), table: str = Field(description="The name of the table"), ) -> str: """Show the CREATE TABLE statement for a table. Args: catalog: catalog name schema_name: schema name table: The name of the table Returns: str: The CREATE TABLE statement """ return client.show_create_table(catalog, schema_name, table) @mcp.tool(description="Show the CREATE VIEW statement for a specific view") def show_create_view( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), view: str = Field(description="The name of the view"), ) -> str: """Show the CREATE VIEW statement for a view. Args: catalog: catalog name schema_name: schema name view: The name of the view Returns: str: The CREATE VIEW statement """ return client.show_create_view(catalog, schema_name, view) @mcp.tool(description="Execute a SQL query and return results in a readable format") def execute_query(query: str = Field(description="The SQL query to execute")) -> str: """Execute a SQL query and return formatted results. Args: query: The SQL query to execute Returns: str: Query results formatted as a JSON string """ return client.execute_query(query) @mcp.tool(description="Optimize an Iceberg table's data files") def optimize( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), table: str = Field(description="The name of the table to optimize"), ) -> str: """Optimize an Iceberg table by compacting small files. Args: catalog: catalog name schema_name: schema name table: The name of the table to optimize Returns: str: Confirmation message """ return client.optimize(catalog, schema_name, table) @mcp.tool(description="Optimize manifest files for an Iceberg table") def optimize_manifests( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), table: str = Field(description="The name of the table"), ) -> str: """Optimize manifest files for an Iceberg table. Args: catalog: catalog name schema_name: schema name table: The name of the table Returns: str: Confirmation message """ return client.optimize_manifests(catalog, schema_name, table) @mcp.tool(description="Remove old snapshots from an Iceberg table") def expire_snapshots( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), retention_threshold: str = Field( description="Age threshold for snapshot removal (e.g., '7d', '30d')", default="7d" ), table: str = Field(description="The name of the table"), ) -> str: """Remove old snapshots from an Iceberg table. Args: catalog: catalog name schema_name: schema name table: The name of the table retention_threshold: Age threshold for snapshot removal (e.g., "7d", "30d") Returns: str: Confirmation message """ return client.expire_snapshots(catalog, schema_name, table, retention_threshold) @mcp.tool(description="Show statistics for a table") def show_stats( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), table: str = Field(description="The name of the table"), ) -> str: """Show statistics for a table. Args: catalog: catalog name schema_name: schema name table: The name of the table Returns: str: Table statistics in JSON format """ return client.show_stats(catalog, schema_name, table) @mcp.tool(name="show_query_history", description="Get the history of executed queries") def show_query_history( limit: int = Field(description="maximum number of history entries to return", default=None), ) -> str: """Get the history of executed queries. Args: limit: maximum number of history entries to return. If None, returns all entries. Returns: str: JSON-formatted string containing query history. """ return client.get_query_history(limit) @mcp.tool(description="Show a hierarchical tree view of catalogs, schemas, and tables") def show_catalog_tree() -> str: """Get a hierarchical tree view showing the full structure of catalogs, schemas, and tables. Returns: str: A formatted string showing the catalog > schema > table hierarchy with visual indicators """ return client.show_catalog_tree() @mcp.tool(description="Show Iceberg table properties") def show_table_properties( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), table: str = Field(description="The name of the table"), ) -> str: """Show Iceberg table properties. Args: catalog: catalog name schema_name: schema name table: The name of the table Returns: str: JSON-formatted table properties """ return client.show_table_properties(catalog, schema_name, table) @mcp.tool(description="Show Iceberg table history/changelog") def show_table_history( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), table: str = Field(description="The name of the table"), ) -> str: """Show Iceberg table history/changelog. The history contains: - made_current_at: When snapshot became active - snapshot_id: Identifier of the snapshot - parent_id: Identifier of the parent snapshot - is_current_ancestor: Whether snapshot is an ancestor of current Args: catalog: catalog name schema_name: schema name table: The name of the table Returns: str: JSON-formatted table history """ return client.show_table_history(catalog, schema_name, table) @mcp.tool(description="Show metadata for the table") def show_metadata_log_entries( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), table: str = Field(description="The name of the table"), ) -> str: """Show Iceberg table metadata log entries. The metadata log contains: - timestamp: When metadata was created - file: Location of the metadata file - latest_snapshot_id: ID of latest snapshot when metadata was updated - latest_schema_id: ID of latest schema when metadata was updated - latest_sequence_number: Data sequence number of metadata file Args: catalog: catalog name schema_name: schema name table: The name of the table Returns: str: JSON-formatted metadata log entries """ return client.show_metadata_log_entries(catalog, schema_name, table) @mcp.tool(description="Show Iceberg table snapshots") def show_snapshots( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), table: str = Field(description="The name of the table"), ) -> str: """Show Iceberg table snapshots. The snapshots table contains: - committed_at: When snapshot became active - snapshot_id: Identifier for the snapshot - parent_id: Identifier for the parent snapshot - operation: Type of operation (append/replace/overwrite/delete) - manifest_list: List of Avro manifest files - summary: Summary of changes from previous snapshot Args: catalog: catalog name schema_name: schema name table: The name of the table Returns: str: JSON-formatted table snapshots """ return client.show_snapshots(catalog, schema_name, table) @mcp.tool(description="Show Iceberg table manifests") def show_manifests( catalog: str = Field(description="catalog name"), schema_name: str = Field(description="schema name"), table: str = Field(description="The name of the table"), all_snapshots: bool = False, ) -> str: """Show Iceberg table manifests for current or all snapshots. The manifests table contains: - path: Manifest file location - length: Manifest file length - partition_spec_id: ID of partition spec used - added_snapshot_id: ID of snapshot when manifest was added - added_data_files_count: Number of data files with status ADDED - added_rows_count: Total rows in ADDED files - existing_data_files_count: Number of EXISTING files - existing_rows_count: Total rows in EXISTING files - deleted_data_files_count: Number of DELETED files - deleted_rows_count: Total rows in DELETED files - partition_summaries: Partition range metadata Args: catalog: catalog name schema_name: schema name table: The name of the table all_snapshots: If True, show manifests from all snapshots Returns: str: JSON-formatted table manifests """ return client.show_manifests(catalog, schema_name, table, all_snapshots) @mcp.tool(description="Show Iceberg table partitions") def show_partitions( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), table: str = Field(description="The name of the table"), ) -> str: """Show Iceberg table partitions. The partitions table contains: - partition: Mapping of partition column names to values - record_count: Number of records in partition - file_count: Number of files in partition - total_size: Total size of files in partition - data: Partition range metadata with min/max values and null/nan counts Args: catalog: catalog name schema_name: schema name table: The name of the table Returns: str: JSON-formatted table partitions """ return client.show_partitions(catalog, schema_name, table) @mcp.tool(description="Show Iceberg table data files") def show_files( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), table: str = Field(description="The name of the table"), ) -> str: """Show Iceberg table data files in current snapshot. The files table contains: - content: Type of content (0=DATA, 1=POSITION_DELETES, 2=EQUALITY_DELETES) - file_path: Data file location - file_format: Format of the data file - record_count: Number of records in file - file_size_in_bytes: File size - column_sizes: Column ID to size mapping - value_counts: Column ID to value count mapping - null_value_counts: Column ID to null count mapping - nan_value_counts: Column ID to NaN count mapping - lower_bounds: Column ID to lower bound mapping - upper_bounds: Column ID to upper bound mapping - key_metadata: Encryption key metadata - split_offsets: Recommended split locations - equality_ids: Field IDs for equality deletes Args: catalog: catalog name schema_name: schema name table: The name of the table Returns: str: JSON-formatted table files info """ return client.show_files(catalog, schema_name, table) @mcp.tool(description="Show Iceberg table manifest entries") def show_entries( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), table: str = Field(description="The name of the table"), all_snapshots: bool = False, ) -> str: """Show Iceberg table manifest entries for current or all snapshots. The entries table contains: - status: Status of entry (0=EXISTING, 1=ADDED, 2=DELETED) - snapshot_id: ID of the snapshot - sequence_number: Data sequence number - file_sequence_number: File sequence number - data_file: File metadata including path, format, size etc - readable_metrics: Human-readable file metrics Args: catalog: catalog name schema_name: schema name table: The name of the table all_snapshots: If True, show entries from all snapshots Returns: str: JSON-formatted manifest entries """ return client.show_entries(catalog, schema_name, table, all_snapshots) @mcp.tool(description="Show Iceberg table references (branches and tags)") def show_refs( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), table: str = Field(description="The name of the table"), ) -> str: """Show Iceberg table references (branches and tags). The refs table contains: - name: Name of the reference - type: Type of reference (BRANCH or TAG) - snapshot_id: ID of referenced snapshot - max_reference_age_in_ms: Max age before reference expiry - min_snapshots_to_keep: Min snapshots to keep (branches only) - max_snapshot_age_in_ms: Max snapshot age in branch Args: catalog: catalog name schema_name: schema name table: The name of the table Returns: str: JSON-formatted table references """ return client.show_refs(catalog, schema_name, table) # Prompts @mcp.prompt(title="Explore Trino Data") def explore_data(catalog: str, schema_name: str) -> list[base.Message]: """Interactive prompt to explore Trino data. Args: catalog: The name of the catalog to explore schema_name: The name of the schema to explore Returns: list[base.Message]: A list of messages to guide the conversation """ messages = [ base.AssistantMessage( "I'll help you explore data in Trino. I can show you available catalogs, " "schemas, and tables, and help you query the data." ) ] if catalog and schema_name: messages.append( base.UserMessage( f"Show me what tables are available in the {catalog}.{schema_name} schema and help me query them." ) ) elif catalog: messages.append(base.UserMessage(f"Show me what schemas are available in the {catalog} catalog.")) else: messages.append(base.UserMessage("Show me what catalogs are available.")) return messages @mcp.prompt(title="Maintain Iceberg Table") def maintain_iceberg(table: str, catalog: str, schema_name: str) -> list[base.Message]: """Interactive prompt for Iceberg table maintenance.""" return [ base.AssistantMessage( "I'll help you maintain an Iceberg table. I can help with optimization, " "cleaning up snapshots and orphan files, and viewing table metadata." ), base.UserMessage( f"What maintenance operations should we perform on the Iceberg table " f"{catalog + '.' if catalog else ''}{schema_name + '.' if schema_name else ''}{table}?" ), ] if __name__ == "__main__": import argparse import sys from loguru import logger # Configure logging logger.remove() logger.add(sys.stderr, level="INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}") parser = argparse.ArgumentParser( description="MCP Trino Server - Model Context Protocol connector for Trino", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Transport modes: stdio Standard I/O (default) - for local MCP clients like VS Code streamable-http HTTP with streaming support (recommended for remote/web access) sse Server-Sent Events (legacy HTTP transport) Examples: # Run with stdio (default, for VS Code integration) python server.py # Run with Streamable HTTP on port 8000 python server.py --transport streamable-http --host 0.0.0.0 --port 8000 # Run with SSE transport python server.py --transport sse --host 127.0.0.1 --port 8001 Environment variables: MCP_HOST Default host for HTTP transports (default: 127.0.0.1) MCP_PORT Default port for HTTP transports (default: 8000) """, ) parser.add_argument( "--host", default=None, help="Host to bind to (default: 127.0.0.1, use 0.0.0.0 for all interfaces)", ) parser.add_argument( "--port", type=int, default=None, help="Port to listen on (default: 8000)", ) parser.add_argument( "--transport", choices=["stdio", "streamable-http", "sse"], default="stdio", help="Transport type (default: stdio)", ) args = parser.parse_args() # Update settings if CLI args provided (override env vars) if args.host: mcp.settings.host = args.host if args.port: mcp.settings.port = args.port logger.info(f"Starting Trino MCP server with {args.transport} transport") if args.transport == "stdio": logger.info("Using stdio transport for local MCP communication") mcp.run(transport="stdio") elif args.transport == "streamable-http": logger.info(f"Starting Streamable HTTP server on http://{mcp.settings.host}:{mcp.settings.port}/mcp") mcp.run(transport="streamable-http") elif args.transport == "sse": logger.info(f"Starting SSE server on http://{mcp.settings.host}:{mcp.settings.port}/sse") mcp.run(transport="sse") def main(): """Entry point for the MCP Trino server.""" mcp.run(transport="stdio")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/alaturqua/mcp-trino-python'

If you have feedback or need assistance with the MCP directory API, please join our Discord server