run_query
Execute SQL queries on CSV or Parquet sources using DuckDB syntax via the Zaturn MCP server, returning results as a dataframe for analysis and visualization.
Instructions
Run query against specified source For both csv and parquet sources, use DuckDB SQL syntax Use 'CSV' as the table name for csv sources. Use 'PARQUET' as the table name for parquet sources.
This will return a dataframe with the results.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | SQL query to run on the data source | |
| source_id | Yes | The data source to run the query on |
Implementation Reference
- zaturn/tools/core.py:69-95 (handler)The handler function for the 'run_query' tool. It retrieves the data source, executes the SQL query using query_utils.execute_query, and returns the results as a markdown table. Handles errors by returning the exception string.def run_query(self, source_id: Annotated[ str, Field(description='The data source to run the query on') ], query: Annotated[ str, Field(description='SQL query to run on the data source') ] ) -> str: """ Run query against specified source For both csv and parquet sources, use DuckDB SQL syntax Use 'CSV' as the table name for csv sources. Use 'PARQUET' as the table name for parquet sources. This will return a dataframe with the results. """ try: source = self.data_sources.get(source_id) if not source: return f"Source {source_id} Not Found" df = query_utils.execute_query(source, query) return df.to_markdown(index=False) except Exception as e: return str(e)
- zaturn/tools/core.py:10-16 (registration)Registration of the 'run_query' tool (along with others) in the Core class's self.tools list during initialization, making it available for MCP.def __init__(self, data_sources): self.data_sources = data_sources self.tools = [ self.list_data_sources, self.describe_table, self.run_query, ]
- zaturn/tools/core.py:69-76 (schema)Input schema for the 'run_query' tool defined using Pydantic's Annotated and Field for source_id (str) and query (str) parameters with descriptions.def run_query(self, source_id: Annotated[ str, Field(description='The data source to run the query on') ], query: Annotated[ str, Field(description='SQL query to run on the data source') ] ) -> str:
- zaturn/tools/query_utils.py:74-154 (helper)Core helper function execute_query that implements the actual query execution logic for various data source types (sqlite, mysql, postgresql, mssql, clickhouse, duckdb, csv, parquet, bigquery), ensuring read-only access where possible and returning pandas DataFrames.def execute_query(source: dict, query: str): """Run the query using the appropriate engine and read only config""" url = source['url'] match source['source_type']: case "sqlite": with sqlalchemy.create_engine(url).connect() as conn: conn.execute(sqlalchemy.text('PRAGMA query_only = ON;')) result = conn.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "mysql": engine = sqlalchemy.create_engine(url) with Session(engine) as session: session.autoflush = False session.autocommit = False session.flush = lambda *args: None session.execute(sqlalchemy.text('SET SESSION TRANSACTION READ ONLY;')) result = session.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "mssql": engine = sqlalchemy.create_engine(url) with Session(engine) as session: # no known way to ensure read-only here # please use read-only credentials result = session.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "postgresql": engine = sqlalchemy.create_engine(url) with engine.connect() as conn: conn = conn.execution_options( isolation_level="SERIALIZABLE", postgresql_readonly=True, postgresql_deferrable=True, ) with conn.begin(): result = conn.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "clickhouse": client = clickhouse_connect.get_client(dsn=url) client.query('SET readonly=1;') return client.query_df(query, use_extended_dtypes=False) case "duckdb": conn = duckdb.connect(url, read_only=True) return conn.execute(query).df() case "csv": conn = duckdb.connect(database=':memory:') conn.execute(f"CREATE VIEW CSV AS SELECT * FROM read_csv('{url}')") return conn.execute(query).df() case "parquet": conn = duckdb.connect(database=':memory:') conn.execute(f"CREATE VIEW PARQUET AS SELECT * FROM read_parquet('{url}')") return conn.execute(query).df() case "bigquery": credentials = None if config.BIGQUERY_SERVICE_ACCOUNT_FILE: credentials = service_account.Credentials.from_service_account_file( config.BIGQUERY_SERVICE_ACCOUNT_FILE, ) chunks = source['url'].split('://')[1].split('/') bq_client = bigquery.Client( credentials = credentials, default_query_job_config = bigquery.QueryJobConfig( default_dataset = f'{chunks[0]}.{chunks[1]}' ) ) query_job = bq_client.query(query) return query_job.result().to_dataframe() case _: raise Exception("Unsupported Source")