query_sqlite
Execute SELECT queries on SQLite databases to retrieve and analyze tabular data for insights and reporting.
Instructions
Execute a SQL query on a SQLite database.
Args:
db_path: Path to SQLite database file
query: SQL query to execute (SELECT queries only for safety)
limit: Maximum number of rows to return (default 100)
Returns:
Dictionary containing:
- query: The executed query
- row_count: Number of rows returned
- columns: List of column names
- rows: Query results
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| db_path | Yes | ||
| query | Yes | ||
| limit | No |
Implementation Reference
- src/mcp_tabular/server.py:380-430 (handler)The @mcp.tool()-decorated function that implements the 'query_sqlite' MCP tool. It safely executes SELECT queries on SQLite databases (adding LIMIT if missing), loads results into pandas DataFrame, and returns structured JSON with query, row count, columns, and rows.@mcp.tool() def query_sqlite( db_path: str, query: str, limit: int = 100, ) -> dict[str, Any]: """ Execute a SQL query on a SQLite database. Args: db_path: Path to SQLite database file query: SQL query to execute (SELECT queries only for safety) limit: Maximum number of rows to return (default 100) Returns: Dictionary containing: - query: The executed query - row_count: Number of rows returned - columns: List of column names - rows: Query results """ # Basic safety check - only allow SELECT query_upper = query.strip().upper() if not query_upper.startswith("SELECT"): raise ValueError("Only SELECT queries are allowed for safety") path = _resolve_path(db_path) if not path.exists(): raise FileNotFoundError( f"Database not found: {db_path}\n" f"Resolved to: {path}\n" f"Project root: {_PROJECT_ROOT}" ) conn = sqlite3.connect(str(path)) try: # Add LIMIT if not present if "LIMIT" not in query_upper: query = f"{query.rstrip(';')} LIMIT {limit}" df = pd.read_sql_query(query, conn) return { "query": query, "row_count": len(df), "columns": df.columns.tolist(), "rows": df.to_dict(orient="records"), } finally: conn.close()