"""Excel / CSV parsing, header detection, and SQLite loading."""
from __future__ import annotations
import re
from pathlib import Path
import pandas as pd
from . import config, database
# ---------------------------------------------------------------------------
# Column name cleaning
# ---------------------------------------------------------------------------
_NON_ALNUM = re.compile(r"[^a-z0-9_]+")
def clean_column_name(name: str) -> str:
"""Strip, lowercase, spaces→underscores, remove non-alphanumeric."""
name = str(name).strip().lower()
name = name.replace(" ", "_")
name = _NON_ALNUM.sub("", name)
return name or "column"
def _deduplicate_columns(cols: list[str]) -> list[str]:
"""Append _2, _3, … for duplicate column names."""
seen: dict[str, int] = {}
result: list[str] = []
for c in cols:
if c in seen:
seen[c] += 1
result.append(f"{c}_{seen[c]}")
else:
seen[c] = 1
result.append(c)
return result
# ---------------------------------------------------------------------------
# Header detection
# ---------------------------------------------------------------------------
def detect_header_row(df_raw: pd.DataFrame) -> int:
"""Return the index of the first row where >60% of cells are non-null.
If all rows qualify, returns 0.
"""
for idx in range(min(20, len(df_raw))):
row = df_raw.iloc[idx]
non_null_ratio = row.notna().sum() / max(len(row), 1)
if non_null_ratio >= config.HEADER_THRESHOLD:
return idx
return 0
# ---------------------------------------------------------------------------
# Table name from filename + sheet
# ---------------------------------------------------------------------------
def make_table_name(filename: str, sheet_name: str) -> str:
"""Generate a SQLite-safe table name: ``filename__sheet``."""
base = Path(filename).stem
base = clean_column_name(base)
sheet = clean_column_name(sheet_name)
return f"{base}__{sheet}"
# ---------------------------------------------------------------------------
# Ingest a single DataFrame into SQLite
# ---------------------------------------------------------------------------
def _load_df_to_sqlite(
df: pd.DataFrame,
table_name: str,
source_file: str,
sheet_name: str,
) -> dict:
"""Write *df* to the SQLite database and record metadata."""
database.ensure_meta_table()
conn = database.get_connection()
df.to_sql(table_name, conn, if_exists="replace", index=False)
database.upsert_meta(
table_name=table_name,
source_file=source_file,
sheet_name=sheet_name,
row_count=len(df),
col_count=len(df.columns),
)
return {
"table": table_name,
"rows": len(df),
"columns": len(df.columns),
"column_names": list(df.columns),
}
# ---------------------------------------------------------------------------
# Type coercion
# ---------------------------------------------------------------------------
def _coerce_numeric(df: pd.DataFrame) -> pd.DataFrame:
"""Try to convert object columns to numeric types where >80% of values parse."""
for col in df.columns:
if df[col].dtype == object:
converted = pd.to_numeric(df[col], errors="coerce")
non_null_orig = df[col].notna().sum()
non_null_conv = converted.notna().sum()
if non_null_orig > 0 and non_null_conv / non_null_orig >= 0.8:
df[col] = converted
return df
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def ingest_excel(filepath: str | Path) -> list[dict]:
"""Ingest an Excel workbook (.xlsx / .xls). Returns info per sheet."""
filepath = Path(filepath)
results: list[dict] = []
xls = pd.ExcelFile(filepath)
for sheet_name in xls.sheet_names:
# Read without assuming headers
df_raw = pd.read_excel(xls, sheet_name=sheet_name, header=None)
if df_raw.empty:
continue
header_idx = detect_header_row(df_raw)
# Use detected row as header
headers = [clean_column_name(str(v)) for v in df_raw.iloc[header_idx]]
headers = _deduplicate_columns(headers)
df = df_raw.iloc[header_idx + 1:].reset_index(drop=True)
df.columns = headers
# Drop fully-empty rows
df = df.dropna(how="all").reset_index(drop=True)
# Coerce columns to numeric where possible
df = _coerce_numeric(df)
table_name = make_table_name(filepath.name, sheet_name)
info = _load_df_to_sqlite(df, table_name, filepath.name, sheet_name)
results.append(info)
return results
def ingest_csv(filepath: str | Path) -> list[dict]:
"""Ingest a CSV file. Returns a single-item list for consistency."""
filepath = Path(filepath)
# Try to detect encoding
df_raw = pd.read_csv(filepath, header=None, encoding_errors="replace")
if df_raw.empty:
return []
header_idx = detect_header_row(df_raw)
headers = [clean_column_name(str(v)) for v in df_raw.iloc[header_idx]]
headers = _deduplicate_columns(headers)
df = df_raw.iloc[header_idx + 1:].reset_index(drop=True)
df.columns = headers
df = df.dropna(how="all").reset_index(drop=True)
# Coerce columns to numeric where possible
df = _coerce_numeric(df)
table_name = make_table_name(filepath.name, "data")
info = _load_df_to_sqlite(df, table_name, filepath.name, "data")
return [info]
def ingest_file(filepath: str | Path) -> list[dict]:
"""Auto-detect format and ingest."""
filepath = Path(filepath)
suffix = filepath.suffix.lower()
if suffix in (".xlsx", ".xls"):
return ingest_excel(filepath)
elif suffix == ".csv":
return ingest_csv(filepath)
else:
raise ValueError(f"Unsupported file format: {suffix}. Use .xlsx, .xls, or .csv")