run_query
Execute SQL queries on CSV and Parquet data sources using DuckDB syntax to retrieve structured results for data analysis and visualization.
Instructions
Run query against specified source For both csv and parquet sources, use DuckDB SQL syntax Use 'CSV' as the table name for csv sources. Use 'PARQUET' as the table name for parquet sources.
This will return a dataframe with the results.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| source_id | Yes | The data source to run the query on | |
| query | Yes | SQL query to run on the data source |
Implementation Reference
- zaturn/tools/core.py:69-95 (handler)The handler function for the 'run_query' tool. Takes source_id and query, executes the query using query_utils.execute_query, and formats results as a markdown table.def run_query(self, source_id: Annotated[ str, Field(description='The data source to run the query on') ], query: Annotated[ str, Field(description='SQL query to run on the data source') ] ) -> str: """ Run query against specified source For both csv and parquet sources, use DuckDB SQL syntax Use 'CSV' as the table name for csv sources. Use 'PARQUET' as the table name for parquet sources. This will return a dataframe with the results. """ try: source = self.data_sources.get(source_id) if not source: return f"Source {source_id} Not Found" df = query_utils.execute_query(source, query) return df.to_markdown(index=False) except Exception as e: return str(e)
- zaturn/mcp/__init__.py:92-93 (registration)Registers 'run_query' (included in zaturn_tools.tools) as an MCP tool using FastMCP.add_tool(Tool.from_function(tool_function)) in the ZaturnMCP server setup.for tool_function in zaturn_tools.tools: zaturn_mcp.add_tool(Tool.from_function(tool_function))
- zaturn/tools/query_utils.py:74-155 (helper)Helper function execute_query that implements the actual query execution logic for various data sources (SQLite, MySQL, PostgreSQL, etc.) in read-only mode, called directly by the run_query handler.def execute_query(source: dict, query: str): """Run the query using the appropriate engine and read only config""" url = source['url'] match source['source_type']: case "sqlite": with sqlalchemy.create_engine(url).connect() as conn: conn.execute(sqlalchemy.text('PRAGMA query_only = ON;')) result = conn.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "mysql": engine = sqlalchemy.create_engine(url) with Session(engine) as session: session.autoflush = False session.autocommit = False session.flush = lambda *args: None session.execute(sqlalchemy.text('SET SESSION TRANSACTION READ ONLY;')) result = session.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "mssql": engine = sqlalchemy.create_engine(url) with Session(engine) as session: # no known way to ensure read-only here # please use read-only credentials result = session.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "postgresql": engine = sqlalchemy.create_engine(url) with engine.connect() as conn: conn = conn.execution_options( isolation_level="SERIALIZABLE", postgresql_readonly=True, postgresql_deferrable=True, ) with conn.begin(): result = conn.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "clickhouse": client = clickhouse_connect.get_client(dsn=url) client.query('SET readonly=1;') return client.query_df(query, use_extended_dtypes=False) case "duckdb": conn = duckdb.connect(url, read_only=True) return conn.execute(query).df() case "csv": conn = duckdb.connect(database=':memory:') conn.execute(f"CREATE VIEW CSV AS SELECT * FROM read_csv('{url}')") return conn.execute(query).df() case "parquet": conn = duckdb.connect(database=':memory:') conn.execute(f"CREATE VIEW PARQUET AS SELECT * FROM read_parquet('{url}')") return conn.execute(query).df() case "bigquery": credentials = None if config.BIGQUERY_SERVICE_ACCOUNT_FILE: credentials = service_account.Credentials.from_service_account_file( config.BIGQUERY_SERVICE_ACCOUNT_FILE, ) chunks = source['url'].split('://')[1].split('/') bq_client = bigquery.Client( credentials = credentials, default_query_job_config = bigquery.QueryJobConfig( default_dataset = f'{chunks[0]}.{chunks[1]}' ) ) query_job = bq_client.query(query) return query_job.result().to_dataframe() case _: raise Exception("Unsupported Source")
- zaturn/tools/core.py:12-16 (registration)Initial registration of the run_query method in the Core class tools list, which is later aggregated into higher-level tool collections.self.tools = [ self.list_data_sources, self.describe_table, self.run_query, ]
- zaturn/tools/__init__.py:6-10 (registration)Aggregates tools from core.Core (including run_query) into ZaturnTools.tools list, used by MCP server.def __init__(self, data_sources): self.tools = [ *core.Core(data_sources).tools, *visualizations.Visualizations(data_sources).tools, ]