Skip to main content
Glama
rickyb30

DataPilot MCP Server

by rickyb30

execute_sql

Execute SQL queries on Snowflake databases to retrieve data, manage warehouses, and perform data analysis operations through AI-guided interactions.

Instructions

Execute a SQL query on Snowflake and return results

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
requestYes

Implementation Reference

  • MCP tool handler for execute_sql: executes SQL query via SnowflakeClient, handles errors, provides context logging
    @mcp.tool()
    async def execute_sql(request: SQLQueryRequest, ctx: Context) -> QueryResult:
        """Execute a SQL query on Snowflake and return results"""
        await ctx.info(f"Executing SQL query: {request.query[:100]}...")
        
        try:
            client = await get_snowflake_client()
            result = await client.execute_query(
                request.query, 
                request.limit, 
                request.warehouse
            )
            
            if result.success:
                await ctx.info(f"Query executed successfully. {result.row_count} rows returned.")
            else:
                await ctx.error(f"Query failed: {result.error}")
            
            return result
            
        except Exception as e:
            logger.error(f"Error executing SQL: {str(e)}")
            await ctx.error(f"Failed to execute query: {str(e)}")
            return QueryResult(
                success=False,
                data=[],
                columns=[],
                row_count=0,
                error=str(e)
            )
  • Input schema for execute_sql tool: defines query, optional limit and warehouse
    class SQLQueryRequest(BaseModel):
        """Request model for SQL query execution"""
        query: str = Field(..., description="SQL query to execute")
        limit: Optional[int] = Field(None, description="Maximum number of rows to return")
        warehouse: Optional[str] = Field(None, description="Warehouse to use for this query")
  • Output schema for execute_sql tool: query results including data, columns, metadata, and error handling
    class QueryResult(BaseModel):
        """Result of a SQL query execution"""
        success: bool
        data: List[Dict[str, Any]]
        columns: List[str]
        row_count: int
        execution_time_ms: Optional[int] = None
        query_id: Optional[str] = None
        warehouse_used: Optional[str] = None
        error: Optional[str] = None
  • src/main.py:73-73 (registration)
    MCP tool registration decorator for execute_sql
    @mcp.tool()
  • Supporting helper: SnowflakeClient.execute_query method that performs the actual database query execution and formats results
    async def execute_query(
        self, 
        query: str, 
        limit: Optional[int] = None,
        warehouse: Optional[str] = None
    ) -> QueryResult:
        """Execute a SQL query and return results"""
        start_time = time.time()
        
        try:
            # Switch warehouse if specified
            if warehouse:
                await self._use_warehouse(warehouse)
            
            # Add limit if specified
            if limit and not query.strip().upper().startswith('SELECT'):
                logger.warning("LIMIT can only be applied to SELECT queries")
            elif limit and 'LIMIT' not in query.upper():
                query = f"{query.rstrip(';')} LIMIT {limit}"
            
            async with self.get_cursor() as cursor:
                logger.info(f"Executing query: {query[:100]}...")
                
                await asyncio.to_thread(cursor.execute, query)
                results = await asyncio.to_thread(cursor.fetchall)
                
                # Get column names
                columns = [desc[0] for desc in cursor.description] if cursor.description else []
                
                # Convert results to list of dictionaries
                data = []
                for row in results:
                    if isinstance(row, dict):
                        data.append(row)
                    else:
                        data.append(dict(zip(columns, row)))
                
                execution_time_ms = int((time.time() - start_time) * 1000)
                
                return QueryResult(
                    success=True,
                    data=data,
                    columns=columns,
                    row_count=len(data),
                    execution_time_ms=execution_time_ms,
                    query_id=cursor.sfqid if hasattr(cursor, 'sfqid') else None,
                    warehouse_used=warehouse or self.config.warehouse
                )
                
        except Exception as e:
            logger.error(f"Query execution failed: {str(e)}")
            execution_time_ms = int((time.time() - start_time) * 1000)
            
            return QueryResult(
                success=False,
                data=[],
                columns=[],
                row_count=0,
                execution_time_ms=execution_time_ms,
                error=str(e)
            )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rickyb30/datapilot-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server