expire_snapshots
Clean up old snapshots from Iceberg tables by defining retention periods to free up storage and improve query performance.
Instructions
Remove old snapshots from an Iceberg table
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| catalog | Yes | catalog name | |
| schema_name | Yes | schema name | |
| retention_threshold | No | Age threshold for snapshot removal (e.g., '7d', '30d') | 7d |
| table | Yes | The name of the table |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/trino_client.py:289-323 (handler)Main implementation of expire_snapshots tool. Contains the function that builds and executes the SQL query 'ALTER TABLE {catalog}.{schema}.{table} EXECUTE expire_snapshots(retention_threshold => ...)' using the Trino client.
def expire_snapshots( self, catalog: str, table: str, schema: str, retention_threshold: str = "7d", ) -> str: """Remove old snapshots from an Iceberg table. This operation removes snapshots older than the specified retention threshold, helping to manage storage and improve performance. Args: table: The name of the table. retention_threshold: Age threshold for snapshot removal (e.g., "7d"). catalog: The catalog name. If None, uses configured default. schema: The schema name. If None, uses configured default. Returns: Success message indicating snapshots were expired. Raises: CatalogSchemaError: If either catalog or schema is not specified and not configured. """ catalog = catalog or self.config.catalog schema = schema or self.config.schema if not catalog or not schema: msg = "Both catalog and schema must be specified" raise CatalogSchemaError(msg) query = ( f"ALTER TABLE {catalog}.{schema}.{table} " f"EXECUTE expire_snapshots(retention_threshold => '{retention_threshold}')" ) self.execute_query(query) return f"Snapshots older than {retention_threshold} expired for table {catalog}.{schema}.{table}" - src/server.py:179-199 (registration)MCP tool registration for expire_snapshots. Decorated with @mcp.tool(), this function defines the tool's interface with Pydantic Field schemas for parameters (catalog, schema_name, table, retention_threshold) and delegates to the Trino client implementation.
@mcp.tool(description="Remove old snapshots from an Iceberg table") def expire_snapshots( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), retention_threshold: str = Field( description="Age threshold for snapshot removal (e.g., '7d', '30d')", default="7d" ), table: str = Field(description="The name of the table"), ) -> str: """Remove old snapshots from an Iceberg table. Args: catalog: catalog name schema_name: schema name table: The name of the table retention_threshold: Age threshold for snapshot removal (e.g., "7d", "30d") Returns: str: Confirmation message """ return client.expire_snapshots(catalog, schema_name, table, retention_threshold)