Skip to main content
Glama
ahodroj

MCP Iceberg Catalog

by ahodroj

execute_query

Run SQL queries on Apache Iceberg tables directly from Claude desktop. Supports LIST TABLES, DESCRIBE TABLE, SELECT, and CREATE TABLE operations.

Instructions

Execute a query on Iceberg tables

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryYesQuery to execute (supports: LIST TABLES, DESCRIBE TABLE, SELECT, CREATE TABLE)

Implementation Reference

  • Core implementation of execute_query in IcebergConnection class. Handles LIST TABLES, DESCRIBE TABLE, SELECT with column projection, INSERT single row, CREATE TABLE. Parses SQL, executes using PyIceberg, converts to list of dicts.
    def execute_query(self, query: str) -> list[dict[str, Any]]:
        """
        Execute query on Iceberg tables
        
        Args:
            query (str): Query to execute
            
        Returns:
            list[dict[str, Any]]: Query results
        """
        start_time = time.time()
        logger.info(f"Executing query: {query[:200]}...")
        
        try:
            catalog = self.ensure_connection()
            query_upper = query.strip().upper()
            
            # Handle special commands
            if query_upper.startswith("LIST TABLES"):
                results = []
                for namespace in catalog.list_namespaces():
                    for table in catalog.list_tables(namespace):
                        results.append({
                            "namespace": ".".join(namespace),
                            "table": table
                        })
                logger.info(f"Listed {len(results)} tables in {time.time() - start_time:.2f}s")
                return results
            
            elif query_upper.startswith("DESCRIBE TABLE"):
                table_name = query[len("DESCRIBE TABLE"):].strip()
                table = catalog.load_table(table_name)
                schema_dict = {
                    "schema": str(table.schema()),
                    "partition_spec": [str(field) for field in (table.spec().fields if table.spec() else [])],
                    "sort_order": [str(field) for field in (table.sort_order().fields if table.sort_order() else [])],
                    "properties": table.properties
                }
                return [schema_dict]
            
            # Handle SQL queries
            parsed = self.parse_sql(query)
            
            if parsed["type"] == "SELECT":
                table = catalog.load_table(parsed["table"])
                scan = table.scan()
                
                # Apply column projection if specified
                if parsed["columns"] and "*" not in parsed["columns"]:
                    scan = scan.select(*parsed["columns"])
                
                # Convert results to dicts
                results = []
                arrow_table = scan.to_arrow()
                
                # Convert PyArrow Table to list of dicts
                for batch in arrow_table.to_batches():
                    for row_idx in range(len(batch)):
                        row_dict = {}
                        for col_name in batch.schema.names:
                            val = batch[col_name][row_idx].as_py()
                            row_dict[col_name] = val
                        results.append(row_dict)
                
                logger.info(f"Query returned {len(results)} rows in {time.time() - start_time:.2f}s")
                return results
            
            elif parsed["type"] == "INSERT":
                # Extract table name and values
                table_name = None
                values = []
                
                # Parse INSERT INTO table_name VALUES (...) syntax
                parsed_stmt = sqlparse.parse(query)[0]
                logger.info(f"Parsed statement: {parsed_stmt}")
                
                # Find the VALUES token and extract values
                values_token = None
                table_identifier = None
                
                for token in parsed_stmt.tokens:
                    logger.info(f"Token: {token}, Type: {token.ttype}, Value: {token.value}")
                    if isinstance(token, sqlparse.sql.Identifier):
                        table_identifier = token
                    elif token.value.upper() == 'VALUES':
                        values_token = token
                        break
                
                if table_identifier:
                    # Handle multi-part identifiers (e.g., schema.table)
                    table_name = str(table_identifier)
                    logger.info(f"Found table name: {table_name}")
                
                if values_token and len(parsed_stmt.tokens) > parsed_stmt.tokens.index(values_token) + 1:
                    next_token = parsed_stmt.tokens[parsed_stmt.tokens.index(values_token) + 1]
                    if isinstance(next_token, sqlparse.sql.Parenthesis):
                        values_str = next_token.value.strip('()').split(',')
                        values = []
                        for v in values_str:
                            v = v.strip()
                            if v.startswith("'") and v.endswith("'"):
                                values.append(v.strip("'"))
                            elif v.lower() == 'true':
                                values.append(True)
                            elif v.lower() == 'false':
                                values.append(False)
                            elif v.lower() == 'null':
                                values.append(None)
                            else:
                                try:
                                    values.append(int(v))
                                except ValueError:
                                    try:
                                        values.append(float(v))
                                    except ValueError:
                                        values.append(v)
                        logger.info(f"Extracted values: {values}")
                
                if not table_name or values is None:
                    raise ValueError(f"Invalid INSERT statement format. Table: {table_name}, Values: {values}")
                
                logger.info(f"Inserting into table: {table_name}")
                logger.info(f"Values: {values}")
                
                # Load table and schema
                table = catalog.load_table(table_name)
                schema = table.schema()
                
                # Create PyArrow arrays for each field
                arrays = []
                names = []
                for i, field in enumerate(schema.fields):
                    names.append(field.name)
                    value = values[i] if i < len(values) else None
                    if isinstance(field.field_type, IntegerType):
                        arrays.append(pa.array([value], type=pa.int32()))
                    elif isinstance(field.field_type, StringType):
                        arrays.append(pa.array([value], type=pa.string()))
                    elif isinstance(field.field_type, BooleanType):
                        arrays.append(pa.array([value], type=pa.bool_()))
                    elif isinstance(field.field_type, DoubleType):
                        arrays.append(pa.array([value], type=pa.float64()))
                    elif isinstance(field.field_type, TimestampType):
                        arrays.append(pa.array([value], type=pa.timestamp('us')))
                    else:
                        arrays.append(pa.array([value], type=pa.string()))
                
                # Create PyArrow table
                pa_table = pa.Table.from_arrays(arrays, names=names)
                
                # Append the PyArrow table directly to the Iceberg table
                table.append(pa_table)
                
                return [{"status": "Inserted 1 row successfully"}]
            
            elif parsed["type"] == "CREATE":
                # Basic CREATE TABLE support
                if "CREATE TABLE" in query_upper:
                    # Extract table name and schema
                    parts = query.split("(", 1)
                    table_name = parts[0].replace("CREATE TABLE", "").strip()
                    schema_str = parts[1].strip()[:-1]  # Remove trailing )
                    
                    # Parse schema definition
                    schema_fields = []
                    for field in schema_str.split(","):
                        name, type_str = field.strip().split(" ", 1)
                        type_str = type_str.upper()
                        if "STRING" in type_str:
                            field_type = StringType()
                        elif "INT" in type_str:
                            field_type = IntegerType()
                        elif "DOUBLE" in type_str:
                            field_type = DoubleType()
                        elif "TIMESTAMP" in type_str:
                            field_type = TimestampType()
                        else:
                            field_type = StringType()
                        schema_fields.append(NestedField(len(schema_fields), name, field_type, required=False))
                    
                    schema = Schema(*schema_fields)
                    catalog.create_table(table_name, schema)
                    return [{"status": "Table created successfully"}]
            
            else:
                raise ValueError(f"Unsupported query type: {parsed['type']}")
                
        except Exception as e:
            logger.error(f"Query error: {str(e)}")
            logger.error(f"Error type: {type(e).__name__}")
            raise
  • Input schema definition for the execute_query tool: requires a 'query' string parameter.
    inputSchema={
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "Query to execute (supports: LIST TABLES, DESCRIBE TABLE, SELECT, CREATE TABLE)"
            }
        },
        "required": ["query"]
    }
  • Registration of the execute_query tool via the list_tools() decorator method, defining name, description, and schema.
        Tool(
            name="execute_query",
            description="Execute a query on Iceberg tables",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Query to execute (supports: LIST TABLES, DESCRIBE TABLE, SELECT, CREATE TABLE)"
                    }
                },
                "required": ["query"]
            }
        )
    ]
  • MCP call_tool handler that invokes the execute_query implementation and formats the response as TextContent.
    if name == "execute_query":
        start_time = time.time()
        try:
            result = self.db.execute_query(arguments["query"])
            execution_time = time.time() - start_time
            
            return [TextContent(
                type="text",
                text=f"Results (execution time: {execution_time:.2f}s):\n{json.dumps(result, indent=2)}"
            )]
        except Exception as e:
            error_message = f"Error executing query: {str(e)}"
            logger.error(error_message)
            return [TextContent(
                type="text",
                text=error_message
            )]
  • Helper method parse_sql used by execute_query to parse SQL queries and extract type, table, columns.
    def parse_sql(self, query: str) -> Dict:
        """
        Parse SQL query and extract relevant information
        
        Args:
            query (str): SQL query to parse
            
        Returns:
            Dict: Parsed query information
        """
        parsed = sqlparse.parse(query)[0]
        tokens = [token for token in parsed.tokens if not token.is_whitespace]
        
        result = {
            "type": None,
            "table": None,
            "columns": None,
            "where": None,
            "order_by": None,
            "limit": None
        }
        
        # Determine query type
        for token in tokens:
            if token.ttype is DML:
                result["type"] = token.value.upper()
                break
        
        # Extract table name
        for i, token in enumerate(tokens):
            if token.value.upper() == "FROM":
                if i + 1 < len(tokens):
                    result["table"] = tokens[i + 1].value
                break
        
        # Extract columns for SELECT
        if result["type"] == "SELECT":
            for i, token in enumerate(tokens):
                if token.value.upper() == "SELECT":
                    if i + 1 < len(tokens):
                        cols = tokens[i + 1].value
                        result["columns"] = [c.strip() for c in cols.split(",")]
                    break
        
        return result
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries the full burden of behavioral disclosure but offers minimal information. It mentions the query types supported (LIST TABLES, DESCRIBE TABLE, SELECT, CREATE TABLE), which adds some context, but fails to address critical aspects like permissions needed, whether it's read-only or mutating, error handling, or output format expectations.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, efficient sentence that directly states the tool's purpose without unnecessary words. It is appropriately sized and front-loaded, making it easy to understand at a glance.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity of a query execution tool with no annotations and no output schema, the description is insufficient. It lacks details on behavioral traits, error handling, permissions, or what to expect from results, leaving significant gaps for an AI agent to operate effectively.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The input schema has 100% description coverage, explicitly documenting the 'query' parameter with supported query types. The description does not add any additional semantic details beyond what the schema already provides, so it meets the baseline for adequate but unremarkable coverage.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the action ('Execute a query') and target resource ('on Iceberg tables'), providing a specific verb+resource combination. However, with no sibling tools mentioned, it cannot demonstrate differentiation from alternatives, so it falls short of a perfect score.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives, prerequisites, or contextual constraints. It merely states what the tool does without indicating appropriate scenarios or limitations.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ahodroj/mcp-iceberg-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server