execute_query
Execute SQL queries on Trino databases to retrieve and analyze data directly from distributed storage systems.
Instructions
Execute a SQL query against Trino.
Args:
sql: The SQL query to execute.
catalog: Optional catalog name to use for the query.
schema: Optional schema name to use for the query.
Returns:
Dict[str, Any]: Query results including metadata.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| sql | Yes | ||
| catalog | No | ||
| schema | No |
Implementation Reference
- src/trino_mcp/tools/__init__.py:22-76 (handler)The primary handler function for the 'execute_query' MCP tool. It is decorated with @mcp.tool() and formats query results from the TrinoClient, providing preview rows and metadata.@mcp.tool() def execute_query( sql: str, catalog: Optional[str] = None, schema: Optional[str] = None ) -> Dict[str, Any]: """ Execute a SQL query against Trino. Args: sql: The SQL query to execute. catalog: Optional catalog name to use for the query. schema: Optional schema name to use for the query. Returns: Dict[str, Any]: Query results including metadata. """ logger.info(f"Executing query: {sql}") try: result = client.execute_query(sql, catalog, schema) # Format the result in a structured way formatted_result = { "query_id": result.query_id, "columns": result.columns, "row_count": result.row_count, "query_time_ms": result.query_time_ms } # Add preview of results (first 20 rows) preview_rows = [] max_preview_rows = min(20, len(result.rows)) for i in range(max_preview_rows): row_dict = {} for j, col in enumerate(result.columns): row_dict[col] = result.rows[i][j] preview_rows.append(row_dict) formatted_result["preview_rows"] = preview_rows # Include a resource path for full results formatted_result["resource_path"] = f"trino://query/{result.query_id}" return formatted_result except Exception as e: error_msg = str(e) logger.error(f"Query execution failed: {error_msg}") return { "error": error_msg, "query": sql }
- src/trino_mcp/server.py:83-85 (registration)The registration point where register_trino_tools is called on the FastMCP instance, which defines and registers the execute_query tool.logger.info("Registering resources and tools") register_trino_resources(mcp, trino_client) register_trino_tools(mcp, trino_client)
- src/trino_mcp/trino_client.py:74-152 (helper)The underlying TrinoClient.execute_query method that performs the actual database query execution using the trino-python-client library.self, sql: str, catalog: Optional[str] = None, schema: Optional[str] = None ) -> TrinoQueryResult: """ Execute a SQL query against Trino. Important note on catalog handling: This method properly sets the catalog by updating the connection parameters, rather than using unreliable "USE catalog" statements. The catalog is passed directly to the connection, which is more reliable than SQL-based catalog switching. Args: sql: The SQL query to execute. catalog: Optional catalog name to use for the query. schema: Optional schema name to use for the query. Returns: TrinoQueryResult: The result of the query. """ # If we're switching catalogs or don't have a connection, we need to reconnect use_catalog = catalog or self.current_catalog if self.conn and (use_catalog != self.current_catalog): logger.info(f"Switching catalog from {self.current_catalog} to {use_catalog}, reconnecting...") self.disconnect() # Update current catalog and schema self.current_catalog = use_catalog if schema: self.current_schema = schema # Update the config catalog before connecting if use_catalog: self.config.catalog = use_catalog # Ensure connection with updated catalog self.ensure_connection() # Create a cursor cursor = self.conn.cursor() # If we have a schema, try to set it # This still uses a USE statement, but catalogs are now set in the connection if self.current_schema: try: logger.debug(f"Setting schema to {self.current_schema}") # Make sure to include catalog with schema to avoid errors if self.current_catalog: cursor.execute(f"USE {self.current_catalog}.{self.current_schema}") else: logger.warning("Cannot set schema without catalog") except Exception as e: logger.warning(f"Failed to set schema: {e}") try: # Execute the query and time it logger.debug(f"Executing query: {sql}") start_time = time.time() cursor.execute(sql) query_time = time.time() - start_time # Fetch the query ID, metadata and results query_id = cursor.stats.get("queryId", "unknown") columns = [desc[0] for desc in cursor.description] if cursor.description else [] rows = cursor.fetchall() if cursor.description else [] return TrinoQueryResult( query_id=query_id, columns=columns, rows=rows, query_time_ms=query_time * 1000, row_count=len(rows) ) except Exception as e: logger.error(f"Query execution failed: {e}") raise