expire_snapshots
Remove outdated snapshots from Iceberg tables to free storage space and maintain performance using a configurable retention threshold.
Instructions
Remove old snapshots from an Iceberg table
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| catalog | Yes | catalog name | |
| schema_name | Yes | schema name | |
| retention_threshold | No | Age threshold for snapshot removal (e.g., '7d', '30d') | 7d |
| table | Yes | The name of the table |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/server.py:179-200 (registration)Tool registration for expire_snapshots in the MCP server – defines the tool with required parameters (catalog, schema_name, table) and optional retention_threshold (default '7d'), then delegates to the client method.
@mcp.tool(description="Remove old snapshots from an Iceberg table") def expire_snapshots( catalog: str = Field(description="catalog name "), schema_name: str = Field(description="schema name "), retention_threshold: str = Field( description="Age threshold for snapshot removal (e.g., '7d', '30d')", default="7d" ), table: str = Field(description="The name of the table"), ) -> str: """Remove old snapshots from an Iceberg table. Args: catalog: catalog name schema_name: schema name table: The name of the table retention_threshold: Age threshold for snapshot removal (e.g., "7d", "30d") Returns: str: Confirmation message """ return client.expire_snapshots(catalog, schema_name, table, retention_threshold) - src/trino_client.py:289-323 (handler)Core handler that builds and executes the SQL query (ALTER TABLE ... EXECUTE expire_snapshots) against Trino, with fallback to configured defaults for catalog/schema and a CatalogSchemaError check.
def expire_snapshots( self, catalog: str, table: str, schema: str, retention_threshold: str = "7d", ) -> str: """Remove old snapshots from an Iceberg table. This operation removes snapshots older than the specified retention threshold, helping to manage storage and improve performance. Args: table: The name of the table. retention_threshold: Age threshold for snapshot removal (e.g., "7d"). catalog: The catalog name. If None, uses configured default. schema: The schema name. If None, uses configured default. Returns: Success message indicating snapshots were expired. Raises: CatalogSchemaError: If either catalog or schema is not specified and not configured. """ catalog = catalog or self.config.catalog schema = schema or self.config.schema if not catalog or not schema: msg = "Both catalog and schema must be specified" raise CatalogSchemaError(msg) query = ( f"ALTER TABLE {catalog}.{schema}.{table} " f"EXECUTE expire_snapshots(retention_threshold => '{retention_threshold}')" ) self.execute_query(query) return f"Snapshots older than {retention_threshold} expired for table {catalog}.{schema}.{table}" - src/trino_client.py:67-85 (helper)Helper method execute_query used by expire_snapshots to run the generated SQL and return results.
def execute_query(self, query: str) -> str: """Execute a SQL query against Trino and return results as a formatted string. Args: query (str): The SQL query to execute. params (Optional[dict]): Dictionary of query parameters with primitive types. Returns: str: JSON-formatted string containing query results or success message. """ cur: trino.dbapi.Cursor = self.client.cursor() cur.execute(query) if cur.description: return json.dumps( [dict(zip([col[0] for col in cur.description], row, strict=True)) for row in cur.fetchall()], default=str, ) return "Query executed successfully (no results to display)" - src/trino_client.py:23-27 (helper)Error class raised by expire_snapshots when catalog/schema are missing.
class CatalogSchemaError(TrinoError): """Error raised when catalog or schema information is missing.""" def __init__(self): super().__init__("Both catalog and schema must be specified") - src/config.py:10-40 (schema)Configuration schema – TrinoConfig provides default catalog and schema values that expire_snapshots falls back to if not explicitly provided.
@dataclass class TrinoConfig: """Configuration class for Trino connection settings.""" host: str port: int user: str catalog: str | None = None schema: str | None = None http_scheme: str = "http" auth: trino.auth.BasicAuthentication | None = None source: str = "mcp-trino-python" def load_config() -> TrinoConfig: """Load Trino configuration from environment variables.""" load_dotenv(override=True) return TrinoConfig( host=os.getenv("TRINO_HOST", "localhost"), port=int(os.getenv("TRINO_PORT", "8080")), user=os.getenv("TRINO_USER", os.getenv("USER", "trino")), catalog=os.getenv("TRINO_CATALOG"), schema=os.getenv("TRINO_SCHEMA"), http_scheme=os.getenv("TRINO_HTTP_SCHEME", "http"), auth=None if os.getenv("TRINO_PASSWORD", None) is None else trino.auth.BasicAuthentication(os.getenv("TRINO_USER", None), os.getenv("TRINO_PASSWORD", None)), source="mcp-trino-python", )