Skip to main content
Glama

run_query

Execute SQL queries on CSV or Parquet sources using DuckDB syntax via the Zaturn MCP server, returning results as a dataframe for analysis and visualization.

Instructions

Run query against specified source For both csv and parquet sources, use DuckDB SQL syntax Use 'CSV' as the table name for csv sources. Use 'PARQUET' as the table name for parquet sources.

This will return a dataframe with the results.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryYesSQL query to run on the data source
source_idYesThe data source to run the query on

Implementation Reference

  • The handler function for the 'run_query' tool. It retrieves the data source, executes the SQL query using query_utils.execute_query, and returns the results as a markdown table. Handles errors by returning the exception string.
    def run_query(self, source_id: Annotated[ str, Field(description='The data source to run the query on') ], query: Annotated[ str, Field(description='SQL query to run on the data source') ] ) -> str: """ Run query against specified source For both csv and parquet sources, use DuckDB SQL syntax Use 'CSV' as the table name for csv sources. Use 'PARQUET' as the table name for parquet sources. This will return a dataframe with the results. """ try: source = self.data_sources.get(source_id) if not source: return f"Source {source_id} Not Found" df = query_utils.execute_query(source, query) return df.to_markdown(index=False) except Exception as e: return str(e)
  • Registration of the 'run_query' tool (along with others) in the Core class's self.tools list during initialization, making it available for MCP.
    def __init__(self, data_sources): self.data_sources = data_sources self.tools = [ self.list_data_sources, self.describe_table, self.run_query, ]
  • Input schema for the 'run_query' tool defined using Pydantic's Annotated and Field for source_id (str) and query (str) parameters with descriptions.
    def run_query(self, source_id: Annotated[ str, Field(description='The data source to run the query on') ], query: Annotated[ str, Field(description='SQL query to run on the data source') ] ) -> str:
  • Core helper function execute_query that implements the actual query execution logic for various data source types (sqlite, mysql, postgresql, mssql, clickhouse, duckdb, csv, parquet, bigquery), ensuring read-only access where possible and returning pandas DataFrames.
    def execute_query(source: dict, query: str): """Run the query using the appropriate engine and read only config""" url = source['url'] match source['source_type']: case "sqlite": with sqlalchemy.create_engine(url).connect() as conn: conn.execute(sqlalchemy.text('PRAGMA query_only = ON;')) result = conn.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "mysql": engine = sqlalchemy.create_engine(url) with Session(engine) as session: session.autoflush = False session.autocommit = False session.flush = lambda *args: None session.execute(sqlalchemy.text('SET SESSION TRANSACTION READ ONLY;')) result = session.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "mssql": engine = sqlalchemy.create_engine(url) with Session(engine) as session: # no known way to ensure read-only here # please use read-only credentials result = session.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "postgresql": engine = sqlalchemy.create_engine(url) with engine.connect() as conn: conn = conn.execution_options( isolation_level="SERIALIZABLE", postgresql_readonly=True, postgresql_deferrable=True, ) with conn.begin(): result = conn.execute(sqlalchemy.text(query)) return pd.DataFrame(result) case "clickhouse": client = clickhouse_connect.get_client(dsn=url) client.query('SET readonly=1;') return client.query_df(query, use_extended_dtypes=False) case "duckdb": conn = duckdb.connect(url, read_only=True) return conn.execute(query).df() case "csv": conn = duckdb.connect(database=':memory:') conn.execute(f"CREATE VIEW CSV AS SELECT * FROM read_csv('{url}')") return conn.execute(query).df() case "parquet": conn = duckdb.connect(database=':memory:') conn.execute(f"CREATE VIEW PARQUET AS SELECT * FROM read_parquet('{url}')") return conn.execute(query).df() case "bigquery": credentials = None if config.BIGQUERY_SERVICE_ACCOUNT_FILE: credentials = service_account.Credentials.from_service_account_file( config.BIGQUERY_SERVICE_ACCOUNT_FILE, ) chunks = source['url'].split('://')[1].split('/') bq_client = bigquery.Client( credentials = credentials, default_query_job_config = bigquery.QueryJobConfig( default_dataset = f'{chunks[0]}.{chunks[1]}' ) ) query_job = bq_client.query(query) return query_job.result().to_dataframe() case _: raise Exception("Unsupported Source")

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kdqed/zaturn'

If you have feedback or need assistance with the MCP directory API, please join our Discord server