e4ebee1091e7_expand_data_model_info.py•5.4 kB
"""Expand data model info
Revision ID: e4ebee1091e7
Revises: ab7e313804ae
Create Date: 2025-07-24 13:21:30.738486
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = "e4ebee1091e7"
down_revision: Union[str, None] = "ab7e313804ae"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def _get_column(inspector, table, name, schema=None):
for col in inspector.get_columns(table, schema=schema):
if col["name"] == name:
return col
return None
def _index_exists(inspector, table, name, schema=None):
return any(ix["name"] == name for ix in inspector.get_indexes(table, schema=schema))
def upgrade() -> None:
TABLES_TO_DROP = [
"file_metadata",
"_dlt_loads",
"_dlt_version",
"_dlt_pipeline_state",
]
conn = op.get_bind()
insp = sa.inspect(conn)
existing = set(insp.get_table_names())
for tbl in TABLES_TO_DROP:
if tbl in existing:
op.drop_table(tbl)
DATA_TABLE = "data"
DATA_TENANT_COL = "tenant_id"
DATA_SIZE_COL = "data_size"
DATA_TENANT_IDX = "ix_data_tenant_id"
# --- tenant_id ---
col = _get_column(insp, DATA_TABLE, DATA_TENANT_COL)
if col is None:
op.add_column(
DATA_TABLE,
sa.Column(DATA_TENANT_COL, postgresql.UUID(as_uuid=True), nullable=True),
)
else:
# Column exists – fix nullability if needed
if col.get("nullable", True) is False:
op.alter_column(
DATA_TABLE,
DATA_TENANT_COL,
existing_type=postgresql.UUID(as_uuid=True),
nullable=True,
)
# --- data_size ---
col = _get_column(insp, DATA_TABLE, DATA_SIZE_COL)
if col is None:
op.add_column(DATA_TABLE, sa.Column(DATA_SIZE_COL, sa.Integer(), nullable=True))
else:
# If you also need to change nullability for data_size, do it here
if col.get("nullable", True) is False:
op.alter_column(
DATA_TABLE,
DATA_SIZE_COL,
existing_type=sa.Integer(),
nullable=True,
)
# --- index on tenant_id ---
if not _index_exists(insp, DATA_TABLE, DATA_TENANT_IDX):
op.create_index(DATA_TENANT_IDX, DATA_TABLE, [DATA_TENANT_COL], unique=False)
def downgrade() -> None:
op.drop_index(op.f("ix_data_tenant_id"), table_name="data")
op.drop_column("data", "data_size")
op.drop_column("data", "tenant_id")
op.create_table(
"_dlt_pipeline_state",
sa.Column("version", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column("engine_version", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column("pipeline_name", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("state", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column(
"created_at", postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=False
),
sa.Column("version_hash", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("_dlt_load_id", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("_dlt_id", sa.VARCHAR(length=128), autoincrement=False, nullable=False),
)
op.create_table(
"_dlt_version",
sa.Column("version", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column("engine_version", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"inserted_at", postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=False
),
sa.Column("schema_name", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("version_hash", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("schema", sa.TEXT(), autoincrement=False, nullable=False),
)
op.create_table(
"_dlt_loads",
sa.Column("load_id", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("schema_name", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("status", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"inserted_at", postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=False
),
sa.Column("schema_version_hash", sa.TEXT(), autoincrement=False, nullable=True),
)
op.create_table(
"file_metadata",
sa.Column("id", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("name", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("file_path", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("extension", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("mime_type", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("content_hash", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("owner_id", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("_dlt_load_id", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("_dlt_id", sa.VARCHAR(length=128), autoincrement=False, nullable=False),
sa.Column("node_set", sa.TEXT(), autoincrement=False, nullable=True),
)