Skip to main content
Glama

run_query

Execute SQL queries on CSV and Parquet data sources using DuckDB syntax to retrieve structured results for data analysis and visualization.

Instructions

Run query against specified source For both csv and parquet sources, use DuckDB SQL syntax Use 'CSV' as the table name for csv sources. Use 'PARQUET' as the table name for parquet sources.

This will return a dataframe with the results.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
source_idYesThe data source to run the query on
queryYesSQL query to run on the data source

Implementation Reference

  • The handler function for the 'run_query' tool. Takes source_id and query, executes the query using query_utils.execute_query, and formats results as a markdown table.
    def run_query(self, source_id: Annotated[ str, Field(description='The data source to run the query on') ], query: Annotated[ str, Field(description='SQL query to run on the data source') ] ) -> str: """ Run query against specified source For both csv and parquet sources, use DuckDB SQL syntax Use 'CSV' as the table name for csv sources. Use 'PARQUET' as the table name for parquet sources. This will return a dataframe with the results. """ try: source = self.data_sources.get(source_id) if not source: return f"Source {source_id} Not Found" df = query_utils.execute_query(source, query) return df.to_markdown(index=False) except Exception as e: return str(e)
  • Registers 'run_query' (included in zaturn_tools.tools) as an MCP tool using FastMCP.add_tool(Tool.from_function(tool_function)) in the ZaturnMCP server setup.
    for tool_function in zaturn_tools.tools: zaturn_mcp.add_tool(Tool.from_function(tool_function))
  • Helper function execute_query that implements the actual query execution logic for various data sources (SQLite, MySQL, PostgreSQL, etc.) in read-only mode, called directly by the run_query handler.
    def execute_query(source: dict, query: str): """Run the query using the appropriate engine and read only config""" url = source['url'] match source['source_type']: case "sqlite": with sqlalchemy.create_engine(url).connect() as conn: conn.execute(sqlalchemy.text('PRAGMA query_only = ON;')) result = conn.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "mysql": engine = sqlalchemy.create_engine(url) with Session(engine) as session: session.autoflush = False session.autocommit = False session.flush = lambda *args: None session.execute(sqlalchemy.text('SET SESSION TRANSACTION READ ONLY;')) result = session.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "mssql": engine = sqlalchemy.create_engine(url) with Session(engine) as session: # no known way to ensure read-only here # please use read-only credentials result = session.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "postgresql": engine = sqlalchemy.create_engine(url) with engine.connect() as conn: conn = conn.execution_options( isolation_level="SERIALIZABLE", postgresql_readonly=True, postgresql_deferrable=True, ) with conn.begin(): result = conn.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "clickhouse": client = clickhouse_connect.get_client(dsn=url) client.query('SET readonly=1;') return client.query_df(query, use_extended_dtypes=False) case "duckdb": conn = duckdb.connect(url, read_only=True) return conn.execute(query).df() case "csv": conn = duckdb.connect(database=':memory:') conn.execute(f"CREATE VIEW CSV AS SELECT * FROM read_csv('{url}')") return conn.execute(query).df() case "parquet": conn = duckdb.connect(database=':memory:') conn.execute(f"CREATE VIEW PARQUET AS SELECT * FROM read_parquet('{url}')") return conn.execute(query).df() case "bigquery": credentials = None if config.BIGQUERY_SERVICE_ACCOUNT_FILE: credentials = service_account.Credentials.from_service_account_file( config.BIGQUERY_SERVICE_ACCOUNT_FILE, ) chunks = source['url'].split('://')[1].split('/') bq_client = bigquery.Client( credentials = credentials, default_query_job_config = bigquery.QueryJobConfig( default_dataset = f'{chunks[0]}.{chunks[1]}' ) ) query_job = bq_client.query(query) return query_job.result().to_dataframe() case _: raise Exception("Unsupported Source")
  • Initial registration of the run_query method in the Core class tools list, which is later aggregated into higher-level tool collections.
    self.tools = [ self.list_data_sources, self.describe_table, self.run_query, ]
  • Aggregates tools from core.Core (including run_query) into ZaturnTools.tools list, used by MCP server.
    def __init__(self, data_sources): self.tools = [ *core.Core(data_sources).tools, *visualizations.Visualizations(data_sources).tools, ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kdqed/zaturn'

If you have feedback or need assistance with the MCP directory API, please join our Discord server