Skip to main content
Glama

execute_sql

Execute SQL queries on Snowflake databases to retrieve data, manage warehouses, and perform data analysis operations through AI-guided interactions.

Instructions

Execute a SQL query on Snowflake and return results

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
requestYes

Implementation Reference

  • MCP tool handler for execute_sql: executes SQL query via SnowflakeClient, handles errors, provides context logging
    @mcp.tool() async def execute_sql(request: SQLQueryRequest, ctx: Context) -> QueryResult: """Execute a SQL query on Snowflake and return results""" await ctx.info(f"Executing SQL query: {request.query[:100]}...") try: client = await get_snowflake_client() result = await client.execute_query( request.query, request.limit, request.warehouse ) if result.success: await ctx.info(f"Query executed successfully. {result.row_count} rows returned.") else: await ctx.error(f"Query failed: {result.error}") return result except Exception as e: logger.error(f"Error executing SQL: {str(e)}") await ctx.error(f"Failed to execute query: {str(e)}") return QueryResult( success=False, data=[], columns=[], row_count=0, error=str(e) )
  • Input schema for execute_sql tool: defines query, optional limit and warehouse
    class SQLQueryRequest(BaseModel): """Request model for SQL query execution""" query: str = Field(..., description="SQL query to execute") limit: Optional[int] = Field(None, description="Maximum number of rows to return") warehouse: Optional[str] = Field(None, description="Warehouse to use for this query")
  • Output schema for execute_sql tool: query results including data, columns, metadata, and error handling
    class QueryResult(BaseModel): """Result of a SQL query execution""" success: bool data: List[Dict[str, Any]] columns: List[str] row_count: int execution_time_ms: Optional[int] = None query_id: Optional[str] = None warehouse_used: Optional[str] = None error: Optional[str] = None
  • src/main.py:73-73 (registration)
    MCP tool registration decorator for execute_sql
    @mcp.tool()
  • Supporting helper: SnowflakeClient.execute_query method that performs the actual database query execution and formats results
    async def execute_query( self, query: str, limit: Optional[int] = None, warehouse: Optional[str] = None ) -> QueryResult: """Execute a SQL query and return results""" start_time = time.time() try: # Switch warehouse if specified if warehouse: await self._use_warehouse(warehouse) # Add limit if specified if limit and not query.strip().upper().startswith('SELECT'): logger.warning("LIMIT can only be applied to SELECT queries") elif limit and 'LIMIT' not in query.upper(): query = f"{query.rstrip(';')} LIMIT {limit}" async with self.get_cursor() as cursor: logger.info(f"Executing query: {query[:100]}...") await asyncio.to_thread(cursor.execute, query) results = await asyncio.to_thread(cursor.fetchall) # Get column names columns = [desc[0] for desc in cursor.description] if cursor.description else [] # Convert results to list of dictionaries data = [] for row in results: if isinstance(row, dict): data.append(row) else: data.append(dict(zip(columns, row))) execution_time_ms = int((time.time() - start_time) * 1000) return QueryResult( success=True, data=data, columns=columns, row_count=len(data), execution_time_ms=execution_time_ms, query_id=cursor.sfqid if hasattr(cursor, 'sfqid') else None, warehouse_used=warehouse or self.config.warehouse ) except Exception as e: logger.error(f"Query execution failed: {str(e)}") execution_time_ms = int((time.time() - start_time) * 1000) return QueryResult( success=False, data=[], columns=[], row_count=0, execution_time_ms=execution_time_ms, error=str(e) )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rickyb30/datapilot-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server