Skip to main content
Glama
kdqed
by kdqed

run_query

Execute SQL queries on CSV and Parquet data sources using DuckDB syntax to retrieve structured results for data analysis and visualization.

Instructions

Run query against specified source For both csv and parquet sources, use DuckDB SQL syntax Use 'CSV' as the table name for csv sources. Use 'PARQUET' as the table name for parquet sources.

This will return a dataframe with the results.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
source_idYesThe data source to run the query on
queryYesSQL query to run on the data source

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • The handler function for the 'run_query' tool. Takes source_id and query, executes the query using query_utils.execute_query, and formats results as a markdown table.
    def run_query(self,
        source_id: Annotated[
            str, Field(description='The data source to run the query on')
        ],  
        query: Annotated[
            str, Field(description='SQL query to run on the data source')
        ]
    ) -> str:
        """
        Run query against specified source
        For both csv and parquet sources, use DuckDB SQL syntax
        Use 'CSV' as the table name for csv sources.
        Use 'PARQUET' as the table name for parquet sources.
    
        This will return a dataframe with the results.
        """
        
        try:
            source = self.data_sources.get(source_id)
            if not source:
                return f"Source {source_id} Not Found"
                
            df = query_utils.execute_query(source, query)
            return df.to_markdown(index=False)
        except Exception as e:
            return str(e)
  • Registers 'run_query' (included in zaturn_tools.tools) as an MCP tool using FastMCP.add_tool(Tool.from_function(tool_function)) in the ZaturnMCP server setup.
    for tool_function in zaturn_tools.tools:
        zaturn_mcp.add_tool(Tool.from_function(tool_function))
  • Helper function execute_query that implements the actual query execution logic for various data sources (SQLite, MySQL, PostgreSQL, etc.) in read-only mode, called directly by the run_query handler.
    def execute_query(source: dict, query: str):
        """Run the query using the appropriate engine and read only config"""
        url = source['url']
                    
        match source['source_type']:
            case "sqlite":
                with sqlalchemy.create_engine(url).connect() as conn:
                    conn.execute(sqlalchemy.text('PRAGMA query_only = ON;'))
                    result = conn.execute(sqlalchemy.text(query))
                    return pd.DataFrame(result)
    
            case "mysql":
                engine = sqlalchemy.create_engine(url)
                with Session(engine) as session:
                    session.autoflush = False
                    session.autocommit = False
                    session.flush = lambda *args: None
                    session.execute(sqlalchemy.text('SET SESSION TRANSACTION READ ONLY;'))
                    result = session.execute(sqlalchemy.text(query))
                    return pd.DataFrame(result)
    
            case "mssql":
                engine = sqlalchemy.create_engine(url)
                with Session(engine) as session:
                    # no known way to ensure read-only here
                    # please use read-only credentials
                    result = session.execute(sqlalchemy.text(query))
                    return pd.DataFrame(result)
    
            case "postgresql":
                engine = sqlalchemy.create_engine(url)
                with engine.connect() as conn:
                    conn = conn.execution_options(
                        isolation_level="SERIALIZABLE",
                        postgresql_readonly=True,
                        postgresql_deferrable=True,
                    )
                    with conn.begin():
                        result = conn.execute(sqlalchemy.text(query))
                        return pd.DataFrame(result)
    
            case "clickhouse":
                client = clickhouse_connect.get_client(dsn=url)
                client.query('SET readonly=1;')
                return client.query_df(query, use_extended_dtypes=False)
    
            case "duckdb":
                conn = duckdb.connect(url, read_only=True)
                return conn.execute(query).df()
    
            case "csv":
                conn = duckdb.connect(database=':memory:')
                conn.execute(f"CREATE VIEW CSV AS SELECT * FROM read_csv('{url}')")
                return conn.execute(query).df()
    
            case "parquet":
                conn = duckdb.connect(database=':memory:')
                conn.execute(f"CREATE VIEW PARQUET AS SELECT * FROM read_parquet('{url}')")
                return conn.execute(query).df()
    
            case "bigquery":
                credentials = None
                if config.BIGQUERY_SERVICE_ACCOUNT_FILE:
                    credentials = service_account.Credentials.from_service_account_file(
                        config.BIGQUERY_SERVICE_ACCOUNT_FILE,
                    )
    
                chunks = source['url'].split('://')[1].split('/')
                bq_client = bigquery.Client(
                    credentials = credentials, 
                    default_query_job_config = bigquery.QueryJobConfig(
                        default_dataset = f'{chunks[0]}.{chunks[1]}'
                    )
                )
                
                query_job = bq_client.query(query)
                return query_job.result().to_dataframe()
                
            case _:
                raise Exception("Unsupported Source")
  • Initial registration of the run_query method in the Core class tools list, which is later aggregated into higher-level tool collections.
    self.tools = [
        self.list_data_sources,
        self.describe_table,
        self.run_query,
    ]
  • Aggregates tools from core.Core (including run_query) into ZaturnTools.tools list, used by MCP server.
    def __init__(self, data_sources):
        self.tools = [
            *core.Core(data_sources).tools,
            *visualizations.Visualizations(data_sources).tools,
        ]
Behavior3/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries the full burden. It discloses that the tool returns a dataframe and specifies SQL syntax requirements, but lacks details on permissions, error handling, rate limits, or data modification behavior (e.g., whether queries can mutate data). The description adds basic behavioral context but misses important operational details.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is appropriately sized with four concise sentences. It's front-loaded with the core purpose, followed by syntax details and output information. There's no wasted text, though it could be slightly more structured for clarity.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness4/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's moderate complexity (SQL queries on data sources), the description covers the essential purpose, syntax, and output. With an output schema present, it doesn't need to detail return values. However, it lacks information on error cases, data source prerequisites, or integration with sibling tools, leaving some gaps in completeness.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 100%, so the schema already documents both parameters well. The description adds minimal value beyond the schema: it implies 'source_id' refers to csv/parquet sources and 'query' uses DuckDB SQL, but doesn't provide additional syntax examples or constraints. This meets the baseline for high schema coverage.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose: 'Run query against specified source' and 'This will return a dataframe with the results.' It specifies the action (run query) and resource (data source), but doesn't explicitly differentiate from siblings like 'describe_table' or 'list_data_sources' beyond the query focus.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines3/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides some usage context: 'For both csv and parquet sources, use DuckDB SQL syntax' and specifies table naming conventions. However, it doesn't explicitly state when to use this tool versus alternatives like 'describe_table' for metadata or the various plotting tools for visualization, leaving usage somewhat implied rather than clearly defined.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kdqed/zaturn'

If you have feedback or need assistance with the MCP directory API, please join our Discord server