"""Morning brief — data logic for the /brief dashboard.
Provides six sections:
1. What Changed — status changes on watched permits
2. Permit Health — are watched permits on track vs statistical norms?
3. Inspection Results — recent pass/fail on watched permits
4. New Filings — new permits at watched addresses/parcels
5. Team Activity — watched contractors/architects appearing on new permits
6. Expiring Permits — permits approaching Table B expiration deadline
"""
from __future__ import annotations
import logging
from datetime import date, timedelta
from src.db import BACKEND, query, query_one, get_connection
logger = logging.getLogger(__name__)
# Table B (SFBC Section 106A.4.4) — permit expiration by valuation tier.
# Demolition permits have a flat 180-day limit regardless of valuation.
EXPIRATION_WARNING_DAYS = 30
def _validity_days(permit: dict) -> int:
"""Look up Table B expiration period based on permit valuation.
SFBC Section 106A.4.4 — Maximum Time Allowed to Complete Work:
- $1 to $100,000: 360 days (extension: 360 days)
- $100,001 to $2,499,999: 1,080 days (extension: 720 days)
- $2,500,000 and above: 1,440 days (extension: 720 days)
- Demolition permits: 180 days (extension: 180 days)
"""
ptype = (permit.get("permit_type_definition") or "").lower()
if "demolition" in ptype:
return 180
cost = permit.get("revised_cost") or permit.get("estimated_cost") or 0
try:
cost = float(cost)
except (ValueError, TypeError):
cost = 0
if cost >= 2_500_000:
return 1440
if cost >= 100_001:
return 1080
return 360
def _ph() -> str:
"""Placeholder for parameterized queries (%s for Postgres, ? for DuckDB)."""
return "%s" if BACKEND == "postgres" else "?"
def _parse_date(text) -> date | None:
"""Parse a TEXT/date field to a Python date."""
if not text:
return None
if isinstance(text, date):
return text
try:
return date.fromisoformat(str(text)[:10])
except (ValueError, TypeError):
return None
# ── Main entry point ──────────────────────────────────────────────
def get_morning_brief(user_id: int, lookback_days: int = 1,
primary_address: dict | None = None) -> dict:
"""Build the complete morning brief data structure.
Args:
user_id: Current user's ID.
lookback_days: How many days back to look for changes (1=today, 7=week).
primary_address: Optional dict with ``street_number`` and ``street_name``
for the user's primary (home) address. When provided, a property
synopsis section is included in the brief.
Returns:
Dict with keys: changes, health, inspections, new_filings,
team_activity, expiring, property_synopsis, summary, lookback_days.
"""
since = date.today() - timedelta(days=lookback_days)
changes = _get_watched_changes(user_id, since)
plan_reviews = _get_plan_review_activity(user_id, since)
health = _get_predictability(user_id)
inspections = _get_inspection_results(user_id, since)
new_filings = _get_new_filings(user_id, since)
team_activity = _get_team_activity(user_id, since)
expiring = _get_expiring_permits(user_id)
regulatory_alerts = _get_regulatory_alerts()
property_cards = _get_property_snapshot(user_id, lookback_days)
# Property synopsis for primary address
property_synopsis = None
if primary_address:
property_synopsis = _get_property_synopsis(
primary_address["street_number"],
primary_address["street_name"],
)
# Count watches
watch_count_row = query(
f"SELECT COUNT(*) FROM watch_items WHERE user_id = {_ph()} AND is_active = TRUE",
(user_id,),
)
total_watches = watch_count_row[0][0] if watch_count_row else 0
at_risk = sum(1 for h in health if h.get("status") in ("behind", "at_risk"))
enforcement_count = sum(
1 for p in property_cards
if p.get("enforcement_total") and p["enforcement_total"] > 0
)
# Merge property-level activity into changes so "What Changed" section
# and the "Changed" summary count are consistent. The permit_changes
# detection log is sparse (only captures nightly diffs), but property
# cards have accurate days_since_activity from SODA status_date.
# Build activity entries for recently-active properties not already in
# the permit_changes list, then use the combined list for the count.
changes_addresses = set()
for c in changes:
if c.get("street_number") and c.get("street_name"):
changes_addresses.add(
f"{c['street_number']} {c['street_name']}".strip().upper()
)
for p in property_cards:
dsa = p.get("days_since_activity")
if dsa is None or dsa > lookback_days:
continue
addr_key = p.get("address", "").upper()
if addr_key in changes_addresses:
continue
changes_addresses.add(addr_key)
# Look up which specific permits changed at this property to show
# a meaningful description instead of just "activity Xd ago".
activity_permits = _get_recent_permit_activity(p, since)
if activity_permits:
# Add one change card per permit that had a status change
for ap in activity_permits:
changes.append(ap)
else:
# Fallback: generic activity entry (no specific permit identified)
changes.append({
"permit_number": p.get("latest_permit", ""),
"change_date": p.get("latest_activity", ""),
"old_status": None,
"new_status": "activity",
"change_type": "activity",
"permit_type": p.get("latest_permit_type", ""),
"latest_permit_status": p.get("latest_permit_status", ""),
"street_number": p.get("address", "").split()[0] if p.get("address") else "",
"street_name": " ".join(p.get("address", "").split()[1:]) if p.get("address") else "",
"neighborhood": p.get("neighborhood", ""),
"label": p.get("label", "") or p.get("address", ""),
"watch_type": "property",
"total_permits": p.get("total_permits", 0),
"active_permits": p.get("active_permits", 0),
"health": p.get("worst_health", "on_track"),
"health_reason": p.get("health_reason", ""),
"days_since_activity": dsa,
})
changed_count = sum(
1 for p in property_cards
if p.get("days_since_activity") is not None
and p["days_since_activity"] <= lookback_days
)
# Data freshness from cron_log
last_refresh = _get_last_refresh()
# Sprint 53 Session C: pipeline health section
pipeline_health = get_pipeline_health_for_brief()
# Sprint 55 Session D: planning context, compliance calendar, data quality
planning_context = _get_planning_context(user_id)
compliance_calendar = _get_compliance_calendar(user_id)
data_quality = _get_data_quality()
# Sprint 56 Session C: street use activity + nearby development
street_use_activity = get_street_use_activity_for_user(user_id)
nearby_development = get_nearby_development_for_user(user_id)
# Sprint 64: change velocity breakdown
change_velocity = _get_change_velocity(since)
# QS3-A: Permit Prep summary
prep_summary = _get_prep_summary(user_id)
# QS8-T1-B: Pipeline stats (last 5 nightly durations + 24h success/fail)
pipeline_stats = _get_pipeline_stats()
return {
"changes": changes,
"plan_reviews": plan_reviews,
"health": health,
"inspections": inspections,
"new_filings": new_filings,
"team_activity": team_activity,
"expiring": expiring,
"regulatory_alerts": regulatory_alerts,
"property_cards": property_cards,
"property_synopsis": property_synopsis,
"last_refresh": last_refresh,
"planning_context": planning_context,
"compliance_calendar": compliance_calendar,
"data_quality": data_quality,
"street_use_activity": street_use_activity,
"nearby_development": nearby_development,
"change_velocity": change_velocity,
"prep_summary": prep_summary,
"pipeline_stats": pipeline_stats,
"summary": {
"total_watches": total_watches,
"total_properties": len(property_cards),
"changes_count": changed_count,
"plan_reviews_count": len(plan_reviews),
"at_risk_count": at_risk,
"enforcement_count": enforcement_count,
"inspections_count": len(inspections),
"new_filings_count": len(new_filings),
"team_count": len(team_activity),
"expiring_count": len(expiring),
"regulatory_count": len(regulatory_alerts),
"planning_context_count": len(planning_context),
"compliance_calendar_count": len(compliance_calendar),
"street_use_count": len(street_use_activity),
"nearby_development_count": len(nearby_development),
},
"lookback_days": lookback_days,
"pipeline_health": pipeline_health,
}
# ── Section 1: What Changed ──────────────────────────────────────
def _get_watched_changes(user_id: int, since: date) -> list[dict]:
"""Get permit status changes matching any of the user's watches."""
ph = _ph()
results: list[dict] = []
# Permit watches — direct match on permit_number
rows = query(
f"SELECT pc.permit_number, pc.change_date, pc.old_status, pc.new_status, "
f"pc.change_type, pc.permit_type, pc.street_number, pc.street_name, "
f"pc.neighborhood, wi.label "
f"FROM permit_changes pc "
f"JOIN watch_items wi ON wi.permit_number = pc.permit_number "
f" AND wi.watch_type = 'permit' AND wi.is_active = TRUE "
f"WHERE wi.user_id = {ph} AND pc.change_date >= {ph} "
f" AND pc.is_new_permit = FALSE "
f"ORDER BY pc.change_date DESC",
(user_id, since),
)
results.extend(_rows_to_changes(rows, "permit"))
# Address watches
rows = query(
f"SELECT pc.permit_number, pc.change_date, pc.old_status, pc.new_status, "
f"pc.change_type, pc.permit_type, pc.street_number, pc.street_name, "
f"pc.neighborhood, wi.label "
f"FROM permit_changes pc "
f"JOIN watch_items wi ON wi.street_number = pc.street_number "
f" AND UPPER(wi.street_name) = UPPER(pc.street_name) "
f" AND wi.watch_type = 'address' AND wi.is_active = TRUE "
f"WHERE wi.user_id = {ph} AND pc.change_date >= {ph} "
f" AND pc.is_new_permit = FALSE "
f"ORDER BY pc.change_date DESC",
(user_id, since),
)
results.extend(_rows_to_changes(rows, "address"))
# Parcel watches
rows = query(
f"SELECT pc.permit_number, pc.change_date, pc.old_status, pc.new_status, "
f"pc.change_type, pc.permit_type, pc.street_number, pc.street_name, "
f"pc.neighborhood, wi.label "
f"FROM permit_changes pc "
f"JOIN watch_items wi ON wi.block = pc.block AND wi.lot = pc.lot "
f" AND wi.watch_type = 'parcel' AND wi.is_active = TRUE "
f"WHERE wi.user_id = {ph} AND pc.change_date >= {ph} "
f" AND pc.is_new_permit = FALSE "
f"ORDER BY pc.change_date DESC",
(user_id, since),
)
results.extend(_rows_to_changes(rows, "parcel"))
# Neighborhood watches (capped)
rows = query(
f"SELECT pc.permit_number, pc.change_date, pc.old_status, pc.new_status, "
f"pc.change_type, pc.permit_type, pc.street_number, pc.street_name, "
f"pc.neighborhood, wi.label "
f"FROM permit_changes pc "
f"JOIN watch_items wi ON wi.neighborhood = pc.neighborhood "
f" AND wi.watch_type = 'neighborhood' AND wi.is_active = TRUE "
f"WHERE wi.user_id = {ph} AND pc.change_date >= {ph} "
f" AND pc.is_new_permit = FALSE "
f"ORDER BY pc.change_date DESC "
f"LIMIT 20",
(user_id, since),
)
results.extend(_rows_to_changes(rows, "neighborhood"))
# Deduplicate (a permit could match multiple watches)
seen = set()
unique = []
for r in results:
key = (r["permit_number"], str(r["change_date"]), r["new_status"])
if key not in seen:
seen.add(key)
unique.append(r)
return unique
def _rows_to_changes(rows: list[tuple], watch_type: str) -> list[dict]:
return [
{
"permit_number": r[0],
"change_date": r[1],
"old_status": r[2],
"new_status": r[3],
"change_type": r[4],
"permit_type": r[5],
"street_number": r[6],
"street_name": r[7],
"neighborhood": r[8],
"label": r[9],
"watch_type": watch_type,
}
for r in rows
]
def _get_recent_permit_activity(prop: dict, since: date) -> list[dict]:
"""Look up specific permits at a property that changed since ``since``.
When the nightly permit_changes log doesn't capture a specific transition
(e.g., SODA status_date updated but our diff didn't fire), this function
queries the permits table directly to find permits with recent status_date
changes. Returns permit-level change dicts with current status so the
"What Changed" UI can show meaningful info instead of generic "activity".
"""
ph = _ph()
block = prop.get("block", "")
lot = prop.get("lot", "")
addr = prop.get("address", "")
# Build WHERE clause for this property
conditions = []
params: list = []
if block and lot:
conditions.append(f"(p.block = {ph} AND p.lot = {ph})")
params.extend([block, lot])
elif addr:
parts = addr.split(None, 1)
if len(parts) >= 2:
conditions.append(f"(p.street_number = {ph} AND UPPER(p.street_name) LIKE UPPER({ph}))")
params.extend([parts[0], f"%{parts[1]}%"])
if not conditions:
return []
try:
rows = query(
f"SELECT p.permit_number, p.status, p.status_date, "
f"p.permit_type_definition, p.street_number, p.street_name, "
f"p.neighborhood "
f"FROM permits p "
f"WHERE ({' OR '.join(conditions)}) "
f" AND p.status_date >= {ph} "
f"ORDER BY p.status_date DESC "
f"LIMIT 10",
(*params, str(since)),
)
except Exception:
logger.debug("_get_recent_permit_activity query failed", exc_info=True)
return []
if not rows:
return []
label = prop.get("label", "") or prop.get("address", "")
results = []
for r in rows:
pn, status, status_date, ptype, snum, sname, neighborhood = r
results.append({
"permit_number": pn,
"change_date": status_date,
"old_status": None, # We don't know the previous status
"new_status": (status or "").upper(),
"change_type": "status",
"permit_type": ptype or "",
"street_number": snum or "",
"street_name": sname or "",
"neighborhood": neighborhood or "",
"label": label,
"watch_type": "property",
})
return results
# ── Section 2: Permit Health / Predictability ─────────────────────
def _get_predictability(user_id: int) -> list[dict]:
"""Compare watched permits' elapsed time against statistical benchmarks."""
ph = _ph()
# Get watched permits in active status
# NOTE: The `permits` table only exists in DuckDB (local), not in PostgreSQL (prod).
# Gracefully return empty if the table doesn't exist.
try:
rows = query(
f"SELECT p.permit_number, p.status, p.filed_date, p.issued_date, "
f"p.permit_type_definition, p.neighborhood, p.estimated_cost, "
f"p.street_number, p.street_name, wi.label "
f"FROM watch_items wi "
f"JOIN permits p ON wi.permit_number = p.permit_number "
f"WHERE wi.user_id = {ph} AND wi.watch_type = 'permit' "
f" AND wi.is_active = TRUE "
f" AND p.status IN ('filed', 'approved', 'issued', 'reinstated')",
(user_id,),
)
except Exception:
logger.debug("Permit health query failed (permits table may not exist)", exc_info=True)
return []
results = []
conn = get_connection()
try:
for row in rows:
(permit_number, status, filed_date, issued_date,
permit_type, neighborhood, estimated_cost,
street_number, street_name, label) = row
filed = _parse_date(filed_date)
if not filed:
continue
elapsed_days = (date.today() - filed).days
if elapsed_days <= 0:
continue
# Get benchmarks using the same logic as estimate_timeline.py
try:
from src.tools.estimate_timeline import _query_timeline, _cost_bracket
review_path = "otc" if permit_type and "otc" in permit_type.lower() else "in_house"
bracket = _cost_bracket(estimated_cost)
benchmarks = _query_timeline(conn, review_path, neighborhood, bracket, permit_type)
if not benchmarks and neighborhood:
benchmarks = _query_timeline(conn, review_path, None, bracket, permit_type)
if not benchmarks and bracket:
benchmarks = _query_timeline(conn, review_path, None, None, permit_type)
if not benchmarks:
benchmarks = _query_timeline(conn, review_path, None, None, None)
except Exception:
# timeline_stats table may not exist in all environments
benchmarks = None
if not benchmarks:
continue
p50 = benchmarks["p50_days"] or 1
p75 = benchmarks["p75_days"] or p50
p90 = benchmarks["p90_days"] or p75
if elapsed_days <= p50:
health_status = "on_track"
elif elapsed_days <= p75:
health_status = "slower"
elif elapsed_days <= p90:
health_status = "behind"
else:
health_status = "at_risk"
pct_of_typical = round(elapsed_days / p50 * 100) if p50 else 0
results.append({
"permit_number": permit_number,
"status": health_status,
"permit_status": status,
"elapsed_days": elapsed_days,
"p50": p50,
"p75": p75,
"p90": p90,
"pct_of_typical": pct_of_typical,
"permit_type": permit_type,
"street_number": street_number,
"street_name": street_name,
"label": label,
"sample_size": benchmarks["sample_size"],
})
finally:
conn.close()
# Sort: at_risk first, then behind, then slower, then on_track
status_order = {"at_risk": 0, "behind": 1, "slower": 2, "on_track": 3}
results.sort(key=lambda x: status_order.get(x["status"], 4))
return results
# ── Section 3: Inspection Results ─────────────────────────────────
def _get_inspection_results(user_id: int, since: date) -> list[dict]:
"""Get recent inspection results for watched permits."""
ph = _ph()
# NOTE: The `inspections` table only exists in DuckDB (local), not in PostgreSQL (prod).
try:
rows = query(
f"SELECT i.reference_number, i.scheduled_date, i.result, "
f"i.inspection_description, i.inspector, wi.label "
f"FROM inspections i "
f"JOIN watch_items wi ON wi.permit_number = i.reference_number "
f" AND wi.watch_type = 'permit' AND wi.is_active = TRUE "
f"WHERE wi.user_id = {ph} AND i.scheduled_date >= {ph} "
f"ORDER BY i.scheduled_date DESC "
f"LIMIT 50",
(user_id, str(since)),
)
except Exception:
logger.debug("Inspection results query failed (inspections table may not exist)", exc_info=True)
return []
return [
{
"permit_number": r[0],
"date": r[1],
"result": r[2],
"description": r[3],
"inspector": r[4],
"label": r[5],
"is_pass": r[2] and r[2].lower() in ("approved", "ok", "pass", "passed"),
"is_fail": r[2] and r[2].lower() in ("disapproved", "fail", "failed", "not approved"),
}
for r in rows
]
# ── Section 4: New Filings ────────────────────────────────────────
def _get_new_filings(user_id: int, since: date) -> list[dict]:
"""Get new permits filed at watched addresses/parcels/neighborhoods."""
ph = _ph()
results: list[dict] = []
# Address watches
rows = query(
f"SELECT pc.permit_number, pc.change_date, pc.new_status, "
f"pc.permit_type, pc.street_number, pc.street_name, "
f"pc.neighborhood, wi.label "
f"FROM permit_changes pc "
f"JOIN watch_items wi ON wi.street_number = pc.street_number "
f" AND UPPER(wi.street_name) = UPPER(pc.street_name) "
f" AND wi.watch_type = 'address' AND wi.is_active = TRUE "
f"WHERE wi.user_id = {ph} AND pc.change_date >= {ph} "
f" AND pc.is_new_permit = TRUE "
f"ORDER BY pc.change_date DESC",
(user_id, since),
)
results.extend(_rows_to_filings(rows))
# Parcel watches
rows = query(
f"SELECT pc.permit_number, pc.change_date, pc.new_status, "
f"pc.permit_type, pc.street_number, pc.street_name, "
f"pc.neighborhood, wi.label "
f"FROM permit_changes pc "
f"JOIN watch_items wi ON wi.block = pc.block AND wi.lot = pc.lot "
f" AND wi.watch_type = 'parcel' AND wi.is_active = TRUE "
f"WHERE wi.user_id = {ph} AND pc.change_date >= {ph} "
f" AND pc.is_new_permit = TRUE "
f"ORDER BY pc.change_date DESC",
(user_id, since),
)
results.extend(_rows_to_filings(rows))
# Neighborhood watches (capped)
rows = query(
f"SELECT pc.permit_number, pc.change_date, pc.new_status, "
f"pc.permit_type, pc.street_number, pc.street_name, "
f"pc.neighborhood, wi.label "
f"FROM permit_changes pc "
f"JOIN watch_items wi ON wi.neighborhood = pc.neighborhood "
f" AND wi.watch_type = 'neighborhood' AND wi.is_active = TRUE "
f"WHERE wi.user_id = {ph} AND pc.change_date >= {ph} "
f" AND pc.is_new_permit = TRUE "
f"ORDER BY pc.change_date DESC "
f"LIMIT 20",
(user_id, since),
)
results.extend(_rows_to_filings(rows))
return results
def _rows_to_filings(rows: list[tuple]) -> list[dict]:
return [
{
"permit_number": r[0],
"change_date": r[1],
"status": r[2],
"permit_type": r[3],
"street_number": r[4],
"street_name": r[5],
"neighborhood": r[6],
"label": r[7],
}
for r in rows
]
# ── Section 5: Team Activity ─────────────────────────────────────
def _get_team_activity(user_id: int, since: date) -> list[dict]:
"""Get new permits involving watched entities (contractors/architects)."""
ph = _ph()
# NOTE: The `permits`, `entities`, and `contacts` tables only exist in DuckDB (local),
# not in PostgreSQL (prod).
try:
rows = query(
f"SELECT p.permit_number, p.permit_type_definition, p.status, "
f"p.filed_date, p.street_number, p.street_name, p.neighborhood, "
f"c.role, e.canonical_name, wi.label "
f"FROM watch_items wi "
f"JOIN entities e ON wi.entity_id = e.entity_id "
f"JOIN contacts c ON e.entity_id = c.entity_id "
f"JOIN permits p ON c.permit_number = p.permit_number "
f"WHERE wi.user_id = {ph} AND wi.watch_type = 'entity' "
f" AND wi.is_active = TRUE AND p.filed_date >= {ph} "
f"ORDER BY p.filed_date DESC "
f"LIMIT 30",
(user_id, str(since)),
)
except Exception:
logger.debug("Team activity query failed (permits/entities tables may not exist)", exc_info=True)
return []
return [
{
"permit_number": r[0],
"permit_type": r[1],
"status": r[2],
"filed_date": r[3],
"street_number": r[4],
"street_name": r[5],
"neighborhood": r[6],
"role": r[7],
"entity_name": r[8],
"label": r[9],
}
for r in rows
]
# ── Section 5.5: Plan Review Activity ────────────────────────────
def _get_plan_review_activity(user_id: int, since: date) -> list[dict]:
"""Get recent plan review routing activity for watched permits.
Queries addenda_changes for reviews completed on watched permits
since the given date.
"""
ph = _ph()
results: list[dict] = []
# Permit watches — direct match
try:
rows = query(
f"SELECT ac.application_number, ac.change_date, ac.station, "
f"ac.plan_checked_by, ac.new_review_results, ac.hold_description, "
f"ac.change_type, ac.department, ac.finish_date, "
f"ac.permit_type, ac.street_number, ac.street_name, "
f"ac.neighborhood, wi.label "
f"FROM addenda_changes ac "
f"JOIN watch_items wi ON wi.permit_number = ac.application_number "
f" AND wi.watch_type = 'permit' AND wi.is_active = TRUE "
f"WHERE wi.user_id = {ph} AND ac.change_date >= {ph} "
f"ORDER BY ac.change_date DESC, ac.finish_date DESC "
f"LIMIT 50",
(user_id, since),
)
except Exception:
logger.debug("Plan review activity query failed (addenda_changes may not exist)", exc_info=True)
return []
results.extend(_rows_to_plan_reviews(rows))
# Address watches
try:
rows = query(
f"SELECT ac.application_number, ac.change_date, ac.station, "
f"ac.plan_checked_by, ac.new_review_results, ac.hold_description, "
f"ac.change_type, ac.department, ac.finish_date, "
f"ac.permit_type, ac.street_number, ac.street_name, "
f"ac.neighborhood, wi.label "
f"FROM addenda_changes ac "
f"JOIN watch_items wi ON wi.street_number = ac.street_number "
f" AND UPPER(wi.street_name) = UPPER(ac.street_name) "
f" AND wi.watch_type = 'address' AND wi.is_active = TRUE "
f"WHERE wi.user_id = {ph} AND ac.change_date >= {ph} "
f"ORDER BY ac.change_date DESC "
f"LIMIT 30",
(user_id, since),
)
results.extend(_rows_to_plan_reviews(rows))
except Exception:
pass
# Parcel watches
try:
rows = query(
f"SELECT ac.application_number, ac.change_date, ac.station, "
f"ac.plan_checked_by, ac.new_review_results, ac.hold_description, "
f"ac.change_type, ac.department, ac.finish_date, "
f"ac.permit_type, ac.street_number, ac.street_name, "
f"ac.neighborhood, wi.label "
f"FROM addenda_changes ac "
f"JOIN watch_items wi ON wi.block = ac.block AND wi.lot = ac.lot "
f" AND wi.watch_type = 'parcel' AND wi.is_active = TRUE "
f"WHERE wi.user_id = {ph} AND ac.change_date >= {ph} "
f"ORDER BY ac.change_date DESC "
f"LIMIT 30",
(user_id, since),
)
results.extend(_rows_to_plan_reviews(rows))
except Exception:
pass
# Deduplicate by (permit_number, station, change_date)
seen = set()
unique = []
for r in results:
key = (r["permit_number"], r["station"], str(r["change_date"]))
if key not in seen:
seen.add(key)
unique.append(r)
# Enrich with routing progress and station velocity context
if unique:
_enrich_plan_reviews_with_routing(unique)
return unique
def _enrich_plan_reviews_with_routing(reviews: list[dict]) -> None:
"""Add routing completion context and station velocity to plan review items.
Modifies items in-place, adding:
- routing_completion_pct, routing_total_stations, routing_completed_stations
- routing_pending_stations (list of station names)
- station_typical_label (e.g. "~12 days")
"""
permit_numbers = list({r["permit_number"] for r in reviews})
# Batch routing progress
routing_map: dict = {}
try:
from web.routing import get_routing_progress_batch
routing_map = get_routing_progress_batch(permit_numbers)
except Exception:
logger.debug("Plan review routing enrichment failed", exc_info=True)
# Station velocity cache
velocity_cache: dict[str, str | None] = {}
prediction_cache: dict = {}
for pr in reviews:
rp = routing_map.get(pr["permit_number"])
if rp:
pr["routing_completion_pct"] = rp.completion_pct
pr["routing_total_stations"] = rp.total_stations
pr["routing_completed_stations"] = rp.completed_stations
pr["routing_pending_stations"] = rp.pending_station_names
else:
pr["routing_completion_pct"] = None
# Station velocity for this specific station
station = pr.get("station", "")
if station and station not in velocity_cache:
try:
from web.station_velocity import get_station_baseline
baseline = get_station_baseline(station)
velocity_cache[station] = baseline.label if baseline else None
except Exception:
velocity_cache[station] = None
pr["station_typical_label"] = velocity_cache.get(station)
# Sprint 60B: Station path prediction
if station and station not in prediction_cache:
try:
from src.tools.station_predictor import predict_total_remaining_days
prediction = predict_total_remaining_days(station)
prediction_cache[station] = prediction
except Exception:
prediction_cache[station] = None
pred = prediction_cache.get(station)
if pred:
pr["predicted_next"] = pred.get("next_station")
pr["predicted_next_days"] = round(pred["next_p50_days"]) if pred.get("next_p50_days") else None
pr["predicted_remaining_days"] = pred.get("p50_remaining_days")
else:
pr["predicted_next"] = None
pr["predicted_next_days"] = None
pr["predicted_remaining_days"] = None
def _rows_to_plan_reviews(rows: list[tuple]) -> list[dict]:
return [
{
"permit_number": r[0],
"change_date": r[1],
"station": r[2],
"reviewer": r[3],
"result": r[4],
"notes": (r[5] or "")[:120],
"change_type": r[6],
"department": r[7],
"finish_date": r[8],
"permit_type": r[9],
"street_number": r[10],
"street_name": r[11],
"neighborhood": r[12],
"label": r[13],
}
for r in rows
]
# ── Section 6: Expiring Permits ──────────────────────────────────
def _get_expiring_permits(user_id: int) -> list[dict]:
"""Flag watched permits approaching Table B expiration deadline."""
ph = _ph()
# NOTE: The `permits` table only exists in DuckDB (local), not in PostgreSQL (prod).
try:
rows = query(
f"SELECT p.permit_number, p.issued_date, p.status, "
f"p.permit_type_definition, p.street_number, p.street_name, "
f"p.neighborhood, wi.label, p.revised_cost, p.estimated_cost "
f"FROM watch_items wi "
f"JOIN permits p ON wi.permit_number = p.permit_number "
f"WHERE wi.user_id = {ph} AND wi.watch_type = 'permit' "
f" AND wi.is_active = TRUE "
f" AND p.issued_date IS NOT NULL "
f" AND p.completed_date IS NULL "
f" AND p.status NOT IN ('completed', 'expired', 'cancelled', 'withdrawn')",
(user_id,),
)
except Exception:
logger.debug("Expiring permits query failed (permits table may not exist)", exc_info=True)
return []
results = []
for row in rows:
(permit_number, issued_date, status, permit_type,
street_number, street_name, neighborhood, label,
revised_cost, estimated_cost) = row
issued = _parse_date(issued_date)
if not issued:
continue
permit_dict = {
"permit_type_definition": permit_type,
"revised_cost": revised_cost,
"estimated_cost": estimated_cost,
}
validity = _validity_days(permit_dict)
days_since_issued = (date.today() - issued).days
expires_in = validity - days_since_issued
# Only flag if within warning window or already past
if expires_in > EXPIRATION_WARNING_DAYS:
continue
results.append({
"permit_number": permit_number,
"issued_date": issued_date,
"status": status,
"permit_type": permit_type,
"street_number": street_number,
"street_name": street_name,
"neighborhood": neighborhood,
"label": label,
"days_since_issued": days_since_issued,
"validity_days": validity,
"expires_in": expires_in,
"is_expired": expires_in <= 0,
})
# Sort: expired first, then soonest to expire
results.sort(key=lambda x: x["expires_in"])
return results
# ── Section 7: Property Synopsis ─────────────────────────────────
def _get_property_synopsis(street_number: str, street_name: str) -> dict | None:
"""Build a property overview from permits at the user's primary address.
Returns a dict with total counts, status breakdown, latest permit info,
neighborhood, and parcel identifier — or None if no permits found.
"""
ph = _ph()
# Match the same way permit_lookup does — base name + full name+suffix
# Includes fuzzy space matching: "robin hood" matches "ROBINHOOD"
# NOTE: The `permits` table only exists in DuckDB (local), not in PostgreSQL (prod).
try:
from src.tools.permit_lookup import _strip_suffix
base_name, _suffix = _strip_suffix(street_name)
base_pattern = f"%{base_name}%"
full_pattern = f"%{street_name}%"
nospace_pattern = f"%{base_name.replace(' ', '')}%"
rows = query(
f"SELECT permit_number, permit_type_definition, status, "
f"filed_date, issued_date, completed_date, estimated_cost, "
f"description, neighborhood, block, lot, street_suffix "
f"FROM permits "
f"WHERE street_number = {ph} "
f" AND ("
f" UPPER(street_name) LIKE UPPER({ph})"
f" OR UPPER(street_name) LIKE UPPER({ph})"
f" OR UPPER(COALESCE(street_name, '') || ' ' || COALESCE(street_suffix, '')) LIKE UPPER({ph})"
f" OR REPLACE(UPPER(COALESCE(street_name, '')), ' ', '') LIKE UPPER({ph})"
f" ) "
f"ORDER BY filed_date DESC",
(street_number, base_pattern, full_pattern, full_pattern, nospace_pattern),
)
except Exception:
logger.debug("Property synopsis query failed (permits table may not exist)", exc_info=True)
return None
if not rows:
return None
total = len(rows)
# Status breakdown
status_counts: dict[str, int] = {}
for r in rows:
st = (r[2] or "unknown").lower()
status_counts[st] = status_counts.get(st, 0) + 1
active_statuses = {"filed", "approved", "issued", "reinstated"}
active_count = sum(v for k, v in status_counts.items() if k in active_statuses)
completed_count = status_counts.get("complete", 0) + status_counts.get("completed", 0)
# Most recent permit
latest = rows[0]
latest_info = {
"permit_number": latest[0],
"type": latest[1] or "Unknown",
"status": latest[2] or "Unknown",
"filed_date": latest[3],
"description": (latest[7] or "")[:120],
}
# Collect unique permit types
type_counts: dict[str, int] = {}
for r in rows:
pt = r[1] or "Other"
type_counts[pt] = type_counts.get(pt, 0) + 1
top_types = sorted(type_counts.items(), key=lambda x: -x[1])[:5]
# Neighborhood + parcel from first row
neighborhood = latest[8]
block = latest[9]
lot = latest[10]
street_suffix = latest[11] or ""
# Full display address
display_address = f"{street_number} {street_name}"
if street_suffix and street_suffix.lower() not in street_name.lower():
display_address = f"{street_number} {street_name} {street_suffix}"
# Date range
dates = [r[3] for r in rows if r[3]]
earliest_date = min(dates) if dates else None
latest_date = max(dates) if dates else None
return {
"address": display_address,
"neighborhood": neighborhood,
"block": block,
"lot": lot,
"total_permits": total,
"active_count": active_count,
"completed_count": completed_count,
"status_counts": status_counts,
"latest_permit": latest_info,
"top_types": top_types,
"earliest_date": earliest_date,
"latest_date": latest_date,
}
# ── Section 8: Regulatory Alerts ──────────────────────────────────
def _get_regulatory_alerts() -> list[dict]:
"""Get active regulatory watch items for the morning brief.
Returns items with status 'monitoring' or 'passed' (not 'effective' or
'withdrawn') so users stay aware of pending changes.
"""
try:
from web.regulatory_watch import get_regulatory_alerts
return get_regulatory_alerts()
except Exception:
logger.debug("Regulatory watch query failed (non-fatal)", exc_info=True)
return []
# ── Section 9: Property Snapshot (always visible) ────────────────
def _get_property_snapshot(user_id: int, lookback_days: int = 30) -> list[dict]:
"""Build always-visible property cards from user's watch list.
For each watched property (grouped by block/lot or address key):
- Active/total permit counts
- Worst health status (simple day-threshold à la portfolio.py)
- Open violation + complaint counts
- Routing progress for the primary permit in plan check
Returns list of property snapshot dicts, sorted by recently-changed
first (within lookback window), then worst_health desc.
"""
from web.auth import get_watches
ph = _ph()
watches = get_watches(user_id)
if not watches:
return []
# Collect watch targets by type (same pattern as portfolio.py lines 46-59)
watch_permits: set[str] = set()
watch_addresses: set[tuple[str, str]] = set()
watch_parcels: set[tuple[str, str]] = set()
for w in watches:
wt = w["watch_type"]
if wt == "permit" and w.get("permit_number"):
watch_permits.add(w["permit_number"])
elif wt == "address" and w.get("street_number") and w.get("street_name"):
watch_addresses.add((w["street_number"], w["street_name"].upper()))
elif wt == "parcel" and w.get("block") and w.get("lot"):
watch_parcels.add((w["block"], w["lot"]))
# Skip neighborhood and entity watches (too broad for property cards)
# Build SQL conditions (same pattern as portfolio.py lines 74-106)
conditions: list[str] = []
params: list = []
if watch_permits:
placeholders = ",".join([ph] * len(watch_permits))
conditions.append(f"p.permit_number IN ({placeholders})")
params.extend(watch_permits)
if watch_addresses:
addr_conds = []
for sn, st in watch_addresses:
addr_conds.append(f"(p.street_number = {ph} AND UPPER(p.street_name) = {ph})")
params.extend([sn, st])
conditions.append("(" + " OR ".join(addr_conds) + ")")
if watch_parcels:
parcel_conds = []
for b, l in watch_parcels:
parcel_conds.append(f"(p.block = {ph} AND p.lot = {ph})")
params.extend([b, l])
conditions.append("(" + " OR ".join(parcel_conds) + ")")
if not conditions:
return []
where = " OR ".join(conditions)
try:
rows = query(
f"SELECT p.permit_number, p.status, p.filed_date, p.issued_date, "
f"p.street_number, p.street_name, p.block, p.lot, p.neighborhood, "
f"p.permit_type_definition, p.street_suffix, p.status_date, "
f"p.revised_cost, p.estimated_cost "
f"FROM permits p WHERE {where} "
f"ORDER BY p.status_date DESC",
params,
)
except Exception:
logger.debug("_get_property_snapshot permits query failed", exc_info=True)
return []
if not rows:
return []
# Group by address (normalized) so multiple lots at the same address
# become one card. Track all block/lot pairs for enforcement queries.
today = date.today()
property_map: dict[str, dict] = {}
health_order = {"on_track": 0, "slower": 1, "behind": 2, "at_risk": 3, "high_risk": 4}
# v2 signal-based health: try to load pre-computed property_health table.
# Falls back to v1 per-permit scoring if table doesn't exist or is empty.
v2_health: dict[str, dict] = {}
try:
v2_rows = query(
"SELECT block_lot, tier, signal_count, at_risk_count "
"FROM property_health"
)
for vr in (v2_rows or []):
v2_health[vr[0]] = {
"tier": vr[1],
"signal_count": vr[2],
"at_risk_count": vr[3],
}
except Exception:
# Table doesn't exist or query failed — v1 fallback
logger.debug("property_health table not available, using v1 scoring", exc_info=True)
for row in rows:
pn, status = row[0], (row[1] or "").lower()
filed_str, issued_str = row[2], row[3]
snum, sname = row[4] or "", row[5] or ""
block, lot = row[6] or "", row[7] or ""
neighborhood = row[8] or ""
ptype_def = row[9] or ""
street_suffix = row[10] or "" if len(row) > 10 else ""
status_date_str = row[11] if len(row) > 11 else None
status_date = _parse_date(status_date_str)
revised_cost = row[12] if len(row) > 12 else None
estimated_cost = row[13] if len(row) > 13 else None
# Build full address with suffix: "125 MASON ST"
full_name = sname
if street_suffix and street_suffix.upper() not in sname.upper():
full_name = f"{sname} {street_suffix}"
# Group by normalized address so 125 MASON on lots 018/003/004
# becomes a single property card
addr = f"{snum} {full_name}".strip().upper()
key = addr if addr else f"{block}/{lot}"
# Health calculation: v2 signal-based (pre-computed) with v1 fallback.
filed_date = _parse_date(filed_str)
issued_date = _parse_date(issued_str)
block_lot_key = f"{block}/{lot}" if block and lot else ""
# Try v2 signal-based health first
v2_entry = v2_health.get(block_lot_key) if block_lot_key else None
if v2_entry:
health = v2_entry["tier"]
sig_ct = v2_entry["signal_count"]
ar_ct = v2_entry["at_risk_count"]
health_reason = f"{sig_ct} signal(s), {ar_ct} at-risk" if health != "on_track" else ""
else:
# v1 fallback: per-permit severity scoring
from src.severity import PermitInput, score_permit
sev_input = PermitInput(
permit_number=pn,
status=status,
permit_type_definition=ptype_def,
description="", # brief doesn't carry description
filed_date=filed_date,
issued_date=issued_date,
status_date=status_date,
estimated_cost=float(estimated_cost) if estimated_cost else 0.0,
revised_cost=float(revised_cost) if revised_cost else None,
inspection_count=0,
)
sev_result = score_permit(sev_input, today=today)
_SEVERITY_TO_HEALTH = {
"CRITICAL": "at_risk",
"HIGH": "behind",
"MEDIUM": "slower",
"LOW": "on_track",
"GREEN": "on_track",
}
health = _SEVERITY_TO_HEALTH.get(sev_result.tier, "on_track")
health_reason = sev_result.explanation if health != "on_track" else ""
if key not in property_map:
# Title-case for display: "125 MASON ST" -> "125 Mason St"
display_addr = addr.title() if addr else f"{block}/{lot}"
property_map[key] = {
"address": display_addr,
"block": block,
"lot": lot,
"parcels": set(), # all block/lot pairs at this address
"neighborhood": neighborhood,
"label": "",
"total_permits": 0,
"active_permits": 0,
"worst_health": "on_track",
"health_reason": "",
"plancheck_permits": [],
"open_violations": None,
"open_complaints": None,
"enforcement_total": None,
"routing": None,
"latest_activity": str(status_date) if status_date else "",
"latest_permit": "",
"latest_permit_status": "",
"latest_permit_type": "",
"days_since_activity": None,
"severity_score": None,
"severity_tier": None,
"search_url": f"/?q={addr.replace(' ', '+')}" if addr else f"/?q={block}/{lot}",
}
prop = property_map[key]
prop["total_permits"] += 1
# Track worst severity score across permits at this property
if v2_entry:
# v2: map tier to a synthetic score for sorting (higher = worse)
_TIER_SCORE = {"on_track": 0, "slower": 25, "behind": 50, "at_risk": 75, "high_risk": 100}
v2_score = _TIER_SCORE.get(health, 0)
if prop["severity_score"] is None or v2_score > prop["severity_score"]:
prop["severity_score"] = v2_score
prop["severity_tier"] = health.upper()
elif prop["severity_score"] is None or sev_result.score > prop["severity_score"]:
prop["severity_score"] = sev_result.score
prop["severity_tier"] = sev_result.tier
# Track latest status_date across all permits at this address
if status_date and str(status_date) > prop["latest_activity"]:
prop["latest_activity"] = str(status_date)
prop["latest_permit"] = pn
prop["latest_permit_status"] = status
prop["latest_permit_type"] = ptype_def
# Track all block/lot pairs for this address
if block and lot:
prop["parcels"].add((block, lot))
# Keep first non-empty block/lot as primary display parcel
if not prop["block"]:
prop["block"] = block
prop["lot"] = lot
active_statuses = ("filed", "issued", "approved", "reinstated")
if status in active_statuses:
prop["active_permits"] += 1
if status == "filed":
prop["plancheck_permits"].append(pn)
if health_order.get(health, 0) > health_order.get(prop["worst_health"], 0):
prop["worst_health"] = health
prop["health_reason"] = health_reason
# Assign watch labels
for w in watches:
label = w.get("label", "")
if not label:
continue
# Try to match by address first. Watch items store street_name
# without suffix (e.g. "MASON") while keys may include it
# (e.g. "125 MASON ST"), so use startswith matching.
matched = False
if w.get("street_number") and w.get("street_name"):
addr_prefix = f"{w['street_number']} {w['street_name']}".strip().upper()
for pkey, pprop in property_map.items():
if pkey.startswith(addr_prefix) and not pprop["label"]:
pprop["label"] = label
matched = True
break
# Fall back to finding by block/lot in parcels sets
if not matched and w.get("block") and w.get("lot"):
parcel = (w["block"], w["lot"])
for prop in property_map.values():
if parcel in prop.get("parcels", set()) and not prop["label"]:
prop["label"] = label
break
# Enforcement: violations + complaints per property
# Query across ALL parcels at each address for complete enforcement picture
for key, prop in property_map.items():
parcels = prop.get("parcels", set())
if not parcels:
continue
try:
total_v = 0
total_c = 0
for b, l in parcels:
v_rows = query(
f"SELECT COUNT(*) FROM violations "
f"WHERE block = {ph} AND lot = {ph} AND LOWER(status) = 'open'",
(b, l),
)
c_rows = query(
f"SELECT COUNT(*) FROM complaints "
f"WHERE block = {ph} AND lot = {ph} AND LOWER(status) = 'open'",
(b, l),
)
total_v += v_rows[0][0] if v_rows else 0
total_c += c_rows[0][0] if c_rows else 0
prop["open_violations"] = total_v
prop["open_complaints"] = total_c
prop["enforcement_total"] = total_v + total_c
except Exception:
logger.debug("Property snapshot enforcement query failed for %s", key, exc_info=True)
# Leave as None (data unavailable)
# Routing progress: batch query for all plancheck permits
all_plancheck = []
for prop in property_map.values():
all_plancheck.extend(prop["plancheck_permits"])
routing_map: dict = {}
if all_plancheck:
try:
from web.routing import get_routing_progress_batch
routing_map = get_routing_progress_batch(all_plancheck)
except Exception:
logger.debug("Property snapshot routing batch failed", exc_info=True)
# Station velocity cache
velocity_cache: dict = {}
def _get_velocity(station_name: str) -> str | None:
if station_name in velocity_cache:
return velocity_cache[station_name]
try:
from web.station_velocity import get_station_baseline
baseline = get_station_baseline(station_name)
label = baseline.label if baseline else None
except Exception:
label = None
velocity_cache[station_name] = label
return label
# Assign routing to properties
for prop in property_map.values():
best_routing = None
for pn in prop["plancheck_permits"]:
rp = routing_map.get(pn)
if rp is None:
continue
# Pick permit with lowest completion (most interesting to show)
if best_routing is None or rp.completion_pct < best_routing.completion_pct:
best_routing = rp
if best_routing:
pending_names = best_routing.pending_station_names
stalled = [s.station for s in best_routing.stalled_stations]
held = [s.station for s in best_routing.held_stations]
velocity = []
for sn in pending_names[:5]: # Cap velocity lookups
typical = _get_velocity(sn)
if typical:
velocity.append({"station": sn, "typical": typical})
prop["routing"] = {
"permit_number": best_routing.permit_number,
"completion_pct": best_routing.completion_pct,
"total_stations": best_routing.total_stations,
"completed_stations": best_routing.completed_stations,
"pending_station_names": pending_names,
"stalled_stations": stalled,
"held_stations": held,
"velocity": velocity,
}
# Active holds are a real action signal — always set reason so
# post-processing doesn't accidentally downgrade this property.
if held:
prop["worst_health"] = "at_risk"
prop["health_reason"] = f"Hold at {', '.join(held[:2])}"
# Upgrade health if property has open enforcement
enf_total = prop.get("enforcement_total")
if enf_total and enf_total > 0 and health_order.get(prop["worst_health"], 0) < health_order["at_risk"]:
parts = []
if prop.get("open_violations"):
parts.append(f"{prop['open_violations']} violation{'s' if prop['open_violations'] != 1 else ''}")
if prop.get("open_complaints"):
parts.append(f"{prop['open_complaints']} complaint{'s' if prop['open_complaints'] != 1 else ''}")
prop["worst_health"] = "at_risk"
prop["health_reason"] = f"Open enforcement: {', '.join(parts)}"
# Remove internal-only fields (parcels set is not JSON-serializable)
del prop["plancheck_permits"]
parcels_list = sorted(prop.get("parcels", set()))
del prop["parcels"]
# Store parcels as display string: "0331/018, 0331/003" for multi-lot
if len(parcels_list) > 1:
prop["parcels_display"] = ", ".join(f"{b}/{l}" for b, l in parcels_list)
elif parcels_list:
prop["parcels_display"] = f"{parcels_list[0][0]}/{parcels_list[0][1]}"
else:
prop["parcels_display"] = ""
# Compute days_since_activity from latest_activity date
for prop in property_map.values():
la = _parse_date(prop.get("latest_activity", ""))
prop["days_since_activity"] = (today - la).days if la else None
# Post-process: downgrade expired-permit AT RISK → BEHIND for active sites.
# Per-permit scoring can't see property-level activity (latest across ALL
# permits at the address), so we reconcile here. An expired permit at a
# site with recent activity (≤30d) OR multiple active permits is
# administrative paperwork — the contractor needs a recommencement
# application (SFBICC §106A.4.4), not an emergency response.
# NEVER downgrade if there are active holds or open enforcement — those
# are real action signals regardless of how many permits are active.
for prop in property_map.values():
dsa = prop.get("days_since_activity")
reason = prop.get("health_reason", "")
has_holds = prop.get("routing") and prop["routing"].get("held_stations")
has_enforcement = prop.get("enforcement_total") and prop["enforcement_total"] > 0
if (
prop["worst_health"] == "at_risk"
and "permit expired" in reason
and not has_holds
and not has_enforcement
):
recent_activity = dsa is not None and dsa <= 90
active = prop.get("active_permits", 0)
has_other_active = active > 1
if recent_activity or has_other_active:
# Active site + expired permit = normal administrative status
prop["worst_health"] = "on_track"
prop["health_reason"] = ""
else:
# No recent activity AND no other active permits — genuinely stale
prop["worst_health"] = "at_risk"
prop["health_reason"] += " (no recent activity)"
# Sort: recently-changed properties first, then by worst health desc.
# Within each group, highest risk sorts first.
properties = sorted(
property_map.values(),
key=lambda p: (
0 if (p.get("days_since_activity") or 999) <= lookback_days else 1,
-health_order.get(p["worst_health"], 0),
),
)
return properties
# ── Section 10: Data Freshness ───────────────────────────────────
def _get_last_refresh() -> dict | None:
"""Get data freshness info from cron_log.
Returns dict with last_success timestamp, hours_ago, and is_stale flag,
or None if cron_log table doesn't exist or has no entries.
"""
try:
row = query_one(
"SELECT started_at, completed_at, was_catchup "
"FROM cron_log "
"WHERE job_type = 'nightly' AND status = 'success' "
"ORDER BY started_at DESC LIMIT 1"
)
except Exception:
# Table doesn't exist yet (first deploy before any cron run)
return None
if not row:
return None
started_at = row[0]
was_catchup = row[2] if len(row) > 2 else False
# Parse timestamp
if isinstance(started_at, str):
try:
from datetime import datetime, timezone
ts = datetime.fromisoformat(started_at.replace("Z", "+00:00"))
except (ValueError, TypeError):
return None
else:
ts = started_at
# Calculate hours ago
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
delta = now - ts
hours_ago = delta.total_seconds() / 3600
result = {
"last_success": ts.strftime("%b %d, %Y at %I:%M %p UTC"),
"last_success_date": ts.strftime("%b %d"),
"hours_ago": round(hours_ago, 1),
"is_stale": hours_ago > 36, # Allow some buffer beyond 24h
"was_catchup": bool(was_catchup),
}
# Enrich with nightly pipeline stats (non-fatal)
try:
from datetime import datetime as _dt, timezone as _tz
cutoff = _dt.now(_tz.utc) - timedelta(hours=24)
cutoff_str = cutoff.strftime("%Y-%m-%d %H:%M:%S")
ph = _ph()
changes_row = query_one(
f"SELECT COUNT(*) FROM permit_changes WHERE detected_at >= {ph}",
(cutoff_str,),
)
result["changes_detected"] = changes_row[0] if changes_row else 0
insp_row = query_one(
"SELECT inspections_updated FROM cron_log "
"WHERE job_type = 'nightly' AND status = 'success' "
"ORDER BY started_at DESC LIMIT 1"
)
result["inspections_updated"] = insp_row[0] if insp_row and insp_row[0] else 0
except Exception:
pass # Non-fatal — basic last_refresh still returned
return result
# ── Change velocity breakdown (Sprint 64) ────────────────────────
def _get_change_velocity(since: date) -> dict:
"""Count permit_changes grouped by change_type since *since*.
Returns a dict like ``{"status_change": 5, "new_permit": 3, ...}``.
Fails silently — returns empty dict on error.
"""
ph = _ph()
try:
rows = query(
f"SELECT change_type, COUNT(*) FROM permit_changes "
f"WHERE detected_at >= {ph} GROUP BY change_type",
(str(since),),
)
return {r[0]: r[1] for r in rows} if rows else {}
except Exception:
return {}
# ── Pipeline health section (Sprint 53 Session C) ────────────────
def get_pipeline_health_for_brief() -> dict:
"""Return pipeline health summary for inclusion in the morning brief.
Returns a compact dict with overall status and any active issues.
Fails silently — never raises, so brief still renders if health check fails.
"""
try:
from web.pipeline_health import get_pipeline_health_brief
return get_pipeline_health_brief()
except Exception as e:
logger.warning("Pipeline health check failed (non-fatal): %s", e)
return {"status": "unknown", "issues": [str(e)], "checks": []}
# ── Pipeline stats (QS8-T1-B) ────────────────────────────────────
def _get_pipeline_stats() -> dict:
"""Compute pipeline stats from cron_log for inclusion in morning brief.
Returns:
Dict with:
- recent_jobs: list of last 5 nightly job dicts (job_type, status,
started_at, elapsed_seconds)
- avg_duration_seconds: average duration of last 5 completed nightly
jobs (None if insufficient data)
- last_24h_success: count of jobs with status='success' in last 24h
- last_24h_failed: count of jobs with status='failed' in last 24h
- last_24h_jobs: count of all jobs in last 24h
Never raises — returns empty dict on error.
"""
try:
from datetime import datetime, timezone
ph = _ph()
# Last 5 nightly jobs for duration computation
rows = query(
"SELECT job_type, started_at, completed_at, status "
"FROM cron_log "
"WHERE job_type = 'nightly' "
"ORDER BY started_at DESC "
"LIMIT 5"
)
recent_jobs = []
durations = []
for r in rows:
job_type, started_at, completed_at, status = r[0], r[1], r[2], r[3]
elapsed = None
if started_at and completed_at:
try:
s = started_at if isinstance(started_at, datetime) else datetime.fromisoformat(str(started_at))
c = completed_at if isinstance(completed_at, datetime) else datetime.fromisoformat(str(completed_at))
elapsed = round((c - s).total_seconds(), 2)
if status == "success" and elapsed >= 0:
durations.append(elapsed)
except Exception:
pass
recent_jobs.append({
"job_type": job_type,
"status": status,
"started_at": str(started_at) if started_at else None,
"elapsed_seconds": elapsed,
})
avg_duration = round(sum(durations) / len(durations), 2) if durations else None
# 24h success/failed counts
cutoff_rows = query(
f"SELECT status, COUNT(*) FROM cron_log "
f"WHERE started_at >= NOW() - INTERVAL '24 hours' "
f"GROUP BY status"
)
status_counts: dict[str, int] = {}
for r in (cutoff_rows or []):
status_counts[str(r[0])] = int(r[1])
return {
"recent_jobs": recent_jobs,
"avg_duration_seconds": avg_duration,
"last_24h_success": status_counts.get("success", 0),
"last_24h_failed": status_counts.get("failed", 0),
"last_24h_jobs": sum(status_counts.values()),
}
except Exception as e:
logger.warning("_get_pipeline_stats failed (non-fatal): %s", e)
return {}
# ── Section 11: Planning & Zoning Context ────────────────────────
def _get_planning_context(user_id: int) -> list[dict]:
"""Get active planning entitlements and zoning for watched parcels.
For each parcel watch (block + lot), queries:
- planning_records for active entitlements
- tax_rolls for current zoning_code
Returns a list of dicts, one per watched parcel with active records.
Fails silently — never raises, so brief still renders if query fails.
"""
ph = _ph()
# Fetch parcel watches for this user
try:
watch_rows = query(
f"SELECT block, lot, label FROM watch_items "
f"WHERE user_id = {ph} AND watch_type = 'parcel' AND is_active = TRUE",
(user_id,),
)
except Exception:
logger.debug("Planning context: watch_items query failed", exc_info=True)
return []
if not watch_rows:
return []
results = []
for row in watch_rows:
block, lot, label = row[0], row[1], row[2] if len(row) > 2 else None
if not block or not lot:
continue
block_lot_key = f"{block}-{lot}"
# Query active planning records for this parcel
try:
planning_rows = query(
f"SELECT record_id, record_type, description, open_date, status "
f"FROM planning_records "
f"WHERE block = {ph} AND lot = {ph} "
f" AND COALESCE(status, '') NOT IN ('withdrawn', 'closed') "
f"ORDER BY open_date DESC "
f"LIMIT 10",
(block, lot),
)
except Exception:
logger.debug(
"Planning context: planning_records query failed for %s", block_lot_key,
exc_info=True,
)
planning_rows = []
# Query zoning code from most recent tax roll entry
zoning_code = None
try:
tax_row = query_one(
f"SELECT zoning_code FROM tax_rolls "
f"WHERE block = {ph} AND lot = {ph} "
f"ORDER BY tax_year DESC LIMIT 1",
(block, lot),
)
if tax_row:
zoning_code = tax_row[0]
except Exception:
logger.debug(
"Planning context: tax_rolls query failed for %s", block_lot_key,
exc_info=True,
)
# Only include parcels that have active planning records or zoning info
if not planning_rows and not zoning_code:
continue
planning_records = [
{
"record_id": r[0],
"record_type": r[1],
"description": (r[2] or "")[:120],
"open_date": r[3],
"status": r[4],
}
for r in planning_rows
]
results.append({
"block_lot": block_lot_key,
"block": block,
"lot": lot,
"label": label,
"zoning_code": zoning_code,
"planning_records": planning_records,
})
return results
# ── Section 12: Compliance Calendar ──────────────────────────────
COMPLIANCE_WARNING_DAYS = 90 # Warn this many days before expiration
def _get_compliance_calendar(user_id: int) -> list[dict]:
"""Get boiler permits approaching expiration for watched parcels.
For each parcel watch (block + lot), queries boiler_permits for permits
expiring within COMPLIANCE_WARNING_DAYS days.
Returns a list of dicts sorted by days_until ascending (soonest first).
Fails silently — never raises, so brief still renders if query fails.
"""
ph = _ph()
# Fetch parcel watches for this user
try:
watch_rows = query(
f"SELECT block, lot FROM watch_items "
f"WHERE user_id = {ph} AND watch_type = 'parcel' AND is_active = TRUE",
(user_id,),
)
except Exception:
logger.debug("Compliance calendar: watch_items query failed", exc_info=True)
return []
if not watch_rows:
return []
today = date.today()
cutoff = today + timedelta(days=COMPLIANCE_WARNING_DAYS)
results = []
for row in watch_rows:
block, lot = row[0], row[1]
if not block or not lot:
continue
try:
boiler_rows = query(
f"SELECT permit_number, boiler_type, expiration_date "
f"FROM boiler_permits "
f"WHERE block = {ph} AND lot = {ph} "
f" AND expiration_date IS NOT NULL",
(block, lot),
)
except Exception:
logger.debug(
"Compliance calendar: boiler_permits query failed for %s-%s", block, lot,
exc_info=True,
)
continue
for brow in boiler_rows:
permit_number, boiler_type, expiration_date = brow[0], brow[1], brow[2]
exp = _parse_date(expiration_date)
if not exp:
continue
if exp > cutoff:
continue # Not expiring soon enough
days_until = (exp - today).days
results.append({
"permit_number": permit_number,
"boiler_type": boiler_type,
"expiration_date": expiration_date,
"days_until": days_until,
"block": block,
"lot": lot,
"is_expired": days_until < 0,
})
# Sort: expired first, then soonest to expire
results.sort(key=lambda x: x["days_until"])
return results
# ── Section 13: Data Quality Footer ──────────────────────────────
DATA_QUALITY_WARN_THRESHOLD = 5.0 # Warn if match rate below this percent
def _get_data_quality() -> dict:
"""Compute cross-reference match rates for the morning brief footer.
Runs the same queries as /cron/cross-ref-check to show data health.
Returns a dict with match percentages and a warnings list.
Fails silently — returns empty dict on error.
"""
try:
conn = get_connection()
results: dict = {}
try:
if BACKEND == "postgres":
conn.autocommit = True
cur = conn.cursor()
cur.execute("""
SELECT COUNT(DISTINCT pr.record_id)
FROM planning_records pr
JOIN permits p ON pr.block = p.block AND pr.lot = p.lot
""")
planning_matched = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM planning_records")
total_planning = cur.fetchone()[0]
cur.execute("""
SELECT COUNT(DISTINCT tr.block || '-' || tr.lot)
FROM tax_rolls tr
JOIN permits p ON tr.block = p.block AND tr.lot = p.lot
WHERE p.status IN ('issued', 'complete', 'filed', 'approved')
""")
tax_matched = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM tax_rolls")
total_tax = cur.fetchone()[0]
cur.execute("""
SELECT COUNT(DISTINCT bp.permit_number)
FROM boiler_permits bp
JOIN permits p ON bp.block = p.block AND bp.lot = p.lot
""")
boiler_matched = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM boiler_permits")
total_boiler = cur.fetchone()[0]
cur.close()
else:
planning_matched = conn.execute("""
SELECT COUNT(DISTINCT pr.record_id)
FROM planning_records pr
JOIN permits p ON pr.block = p.block AND pr.lot = p.lot
""").fetchone()[0]
total_planning = conn.execute(
"SELECT COUNT(*) FROM planning_records"
).fetchone()[0]
tax_matched = conn.execute("""
SELECT COUNT(DISTINCT tr.block || '-' || tr.lot)
FROM tax_rolls tr
JOIN permits p ON tr.block = p.block AND tr.lot = p.lot
WHERE p.status IN ('issued', 'complete', 'filed', 'approved')
""").fetchone()[0]
total_tax = conn.execute(
"SELECT COUNT(*) FROM tax_rolls"
).fetchone()[0]
boiler_matched = conn.execute("""
SELECT COUNT(DISTINCT bp.permit_number)
FROM boiler_permits bp
JOIN permits p ON bp.block = p.block AND bp.lot = p.lot
""").fetchone()[0]
total_boiler = conn.execute(
"SELECT COUNT(*) FROM boiler_permits"
).fetchone()[0]
finally:
conn.close()
def _pct(matched: int, total: int) -> float:
return round(matched * 100 / total, 1) if total > 0 else 0.0
planning_pct = _pct(planning_matched, total_planning)
tax_pct = _pct(tax_matched, total_tax)
boiler_pct = _pct(boiler_matched, total_boiler)
warnings = []
if planning_pct < DATA_QUALITY_WARN_THRESHOLD and total_planning > 0:
warnings.append(
f"Planning match rate low: {planning_pct}% "
f"({planning_matched}/{total_planning})"
)
if tax_pct < DATA_QUALITY_WARN_THRESHOLD and total_tax > 0:
warnings.append(
f"Tax roll match rate low: {tax_pct}% "
f"({tax_matched}/{total_tax})"
)
if boiler_pct < DATA_QUALITY_WARN_THRESHOLD and total_boiler > 0:
warnings.append(
f"Boiler match rate low: {boiler_pct}% "
f"({boiler_matched}/{total_boiler})"
)
for w in warnings:
logger.warning("DATA QUALITY: %s", w)
return {
"planning_match_pct": planning_pct,
"planning_matched": planning_matched,
"total_planning": total_planning,
"tax_match_pct": tax_pct,
"tax_matched": tax_matched,
"total_tax_rolls": total_tax,
"boiler_match_pct": boiler_pct,
"boiler_matched": boiler_matched,
"total_boiler": total_boiler,
"warnings": warnings,
}
except Exception as e:
logger.debug("Data quality check failed (non-fatal): %s", e, exc_info=True)
return {}
# ── Section 14: Street Use Activity ──────────────────────────────
def _get_street_use_activity(conn, watched_addresses: list[tuple[str, str]]) -> list[dict]:
"""Get recent street-use permits at watched addresses.
Queries the street_use_permits table for any active or recently approved
permits matching watched street names.
Args:
conn: Database connection.
watched_addresses: List of (street_number, street_name) tuples.
Returns:
List of dicts with street-use permit info sorted by approved_date desc.
Returns empty list on error or when no matches found.
"""
if not watched_addresses:
return []
ph = "%s" if BACKEND == "postgres" else "?"
results: list[dict] = []
try:
for street_number, street_name in watched_addresses:
rows = query(
f"SELECT permit_number, permit_type, permit_purpose, status, "
f"agent, street_name, cross_street_1, cross_street_2, "
f"approved_date, expiration_date, neighborhood "
f"FROM street_use_permits "
f"WHERE UPPER(street_name) LIKE UPPER({ph}) "
f" AND COALESCE(status, '') NOT IN ('expired', 'cancelled') "
f"ORDER BY approved_date DESC "
f"LIMIT 10",
(f"%{street_name.strip()}%",),
)
for r in rows:
results.append({
"permit_number": r[0],
"permit_type": r[1],
"permit_purpose": r[2],
"status": r[3],
"agent": r[4],
"street_name": r[5],
"cross_street_1": r[6],
"cross_street_2": r[7],
"approved_date": r[8],
"expiration_date": r[9],
"neighborhood": r[10],
"watched_address": f"{street_number} {street_name}",
})
except Exception:
logger.debug("Street use activity query failed (non-fatal)", exc_info=True)
return []
# Deduplicate by permit_number
seen: set[str] = set()
unique = []
for item in results:
key = item["permit_number"]
if key not in seen:
seen.add(key)
unique.append(item)
return unique
def get_street_use_activity_for_user(user_id: int) -> list[dict]:
"""Get street-use permits at addresses watched by a user.
Fetches address-type watches for the user and queries street_use_permits
for matching street names.
Returns empty list on error or when no watched addresses.
"""
ph = _ph()
try:
watch_rows = query(
f"SELECT street_number, street_name FROM watch_items "
f"WHERE user_id = {ph} AND watch_type = 'address' "
f" AND is_active = TRUE AND street_name IS NOT NULL",
(user_id,),
)
except Exception:
logger.debug("Street use activity: watch_items query failed", exc_info=True)
return []
if not watch_rows:
return []
watched_addresses = [(r[0] or "", r[1] or "") for r in watch_rows if r[1]]
conn = get_connection()
try:
return _get_street_use_activity(conn, watched_addresses)
finally:
conn.close()
# ── Section 15: Nearby Development ───────────────────────────────
def _get_nearby_development(conn, watched_parcels: list[tuple[str, str]]) -> list[dict]:
"""Get development pipeline projects near watched parcels.
Queries the development_pipeline table for active projects at the same
block (block-level proximity) as watched parcels.
Args:
conn: Database connection.
watched_parcels: List of (block, lot) tuples.
Returns:
List of dicts with development pipeline project info.
Returns empty list on error or when no matches found.
"""
if not watched_parcels:
return []
ph = "%s" if BACKEND == "postgres" else "?"
results: list[dict] = []
seen_records: set[str] = set()
try:
for block, lot in watched_parcels:
if not block:
continue
rows = query(
f"SELECT record_id, name_address, current_status, "
f"proposed_units, net_pipeline_units, affordable_units, "
f"neighborhood, block_lot, description_planning, description_dbi, "
f"approved_date_planning "
f"FROM development_pipeline "
f"WHERE block_lot LIKE {ph} "
f" AND COALESCE(current_status, '') NOT IN ('withdrawn', 'cancelled') "
f"ORDER BY approved_date_planning DESC "
f"LIMIT 10",
(f"{block}%",),
)
for r in rows:
rec_id = r[0] or ""
if rec_id in seen_records:
continue
seen_records.add(rec_id)
description = (r[8] or r[9] or "")[:150]
results.append({
"record_id": rec_id,
"name_address": r[1],
"current_status": r[2],
"proposed_units": r[3],
"net_pipeline_units": r[4],
"affordable_units": r[5],
"neighborhood": r[6],
"block_lot": r[7],
"description": description,
"approved_date_planning": r[10],
"watched_parcel": f"{block}/{lot}",
})
except Exception:
logger.debug("Nearby development query failed (non-fatal)", exc_info=True)
return []
return results
def get_nearby_development_for_user(user_id: int) -> list[dict]:
"""Get development pipeline projects near parcels watched by a user.
Fetches parcel-type watches for the user and queries development_pipeline
for nearby projects using block-level matching.
Returns empty list on error or when no watched parcels.
"""
ph = _ph()
try:
watch_rows = query(
f"SELECT block, lot FROM watch_items "
f"WHERE user_id = {ph} AND watch_type = 'parcel' "
f" AND is_active = TRUE AND block IS NOT NULL AND lot IS NOT NULL",
(user_id,),
)
except Exception:
logger.debug("Nearby development: watch_items query failed", exc_info=True)
return []
if not watch_rows:
return []
watched_parcels = [(r[0] or "", r[1] or "") for r in watch_rows]
conn = get_connection()
try:
return _get_nearby_development(conn, watched_parcels)
finally:
conn.close()
# ── QS3-A: Permit Prep Summary ───────────────────────────────────
def _get_prep_summary(user_id: int) -> list[dict]:
"""Return prep checklists for this user with progress counts.
Used in the morning brief to highlight permits with incomplete checklists.
"""
ph = _ph()
try:
rows = query(
f"SELECT pc.checklist_id, pc.permit_number, pc.updated_at "
f"FROM prep_checklists pc WHERE pc.user_id = {ph} "
f"ORDER BY pc.updated_at DESC LIMIT 10",
(user_id,),
)
except Exception:
return [] # Table may not exist yet
if not rows:
return []
result = []
for r in rows:
checklist_id = r[0]
try:
item_rows = query(
f"SELECT status, COUNT(*) FROM prep_items "
f"WHERE checklist_id = {ph} GROUP BY status",
(checklist_id,),
)
except Exception:
item_rows = []
total = 0
completed = 0
missing = 0
for ir in (item_rows or []):
count = ir[1]
total += count
if ir[0] in ("submitted", "verified", "waived", "n_a"):
completed += count
if ir[0] == "required":
missing += count
result.append({
"permit_number": r[1],
"total_items": total,
"completed_items": completed,
"missing_required": missing,
})
return result