"""Tool: permit_lookup — Look up permits by number, address, or parcel and show related permits."""
import logging
from datetime import date, timedelta
from src.db import get_connection, BACKEND, circuit_breaker
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# SQL helpers
# ---------------------------------------------------------------------------
# Placeholder style: %s for Postgres, ? for DuckDB
_PH = "%s" if BACKEND == "postgres" else "?"
def _exec(conn, sql, params=None):
"""Execute SQL and return all rows. Handles Postgres/DuckDB cursor differences."""
if BACKEND == "postgres":
with conn.cursor() as cur:
cur.execute(sql, params or [])
return cur.fetchall()
else:
return conn.execute(sql, params or []).fetchall()
def _exec_one(conn, sql, params=None):
"""Execute SQL and return first row, or None."""
rows = _exec(conn, sql, params)
return rows[0] if rows else None
# ---------------------------------------------------------------------------
# Column mappings (permits table column order for SELECT *)
# ---------------------------------------------------------------------------
PERMIT_COLS = [
"permit_number", "permit_type", "permit_type_definition", "status",
"status_date", "description", "filed_date", "issued_date", "approved_date",
"completed_date", "estimated_cost", "revised_cost", "existing_use",
"proposed_use", "existing_units", "proposed_units", "street_number",
"street_name", "street_suffix", "zipcode", "neighborhood",
"supervisor_district", "block", "lot", "adu", "data_as_of",
]
def _row_to_dict(row, cols=PERMIT_COLS) -> dict:
"""Convert a tuple row to a dict using column name list."""
return {cols[i]: row[i] for i in range(min(len(cols), len(row)))}
# ---------------------------------------------------------------------------
# Lookup functions
# ---------------------------------------------------------------------------
def _lookup_by_number(conn, permit_number: str) -> list[dict]:
"""Exact match on permit_number (PK)."""
sql = f"SELECT * FROM permits WHERE permit_number = {_PH}"
rows = _exec(conn, sql, [permit_number])
return [_row_to_dict(r) for r in rows]
def _strip_suffix(name: str) -> tuple[str, str | None]:
"""Split a street name into (base_name, suffix) if a known suffix is present.
Examples:
"16th Ave" → ("16th", "Ave")
"Robin Hood Dr" → ("Robin Hood", "Dr")
"Market" → ("Market", None)
"""
import re
m = re.match(
r'^(.+?)\s+(St(?:reet)?|Ave(?:nue)?|Blvd|Boulevard|Rd|Road|Dr(?:ive)?'
r'|Way|Ct|Court|Ln|Lane|Pl(?:ace)?|Ter(?:race)?)\.?\s*$',
name,
re.IGNORECASE,
)
if m:
return m.group(1).strip(), m.group(2).strip()
return name, None
def _lookup_by_address(conn, street_number: str, street_name: str) -> list[dict]:
"""Match on street_number + street_name (indexed).
Handles three DB storage patterns:
1. street_name="Market", street_suffix="St" (suffix in separate column)
2. street_name="Market St", street_suffix=NULL (suffix merged into name)
3. street_name="16th", street_suffix="Ave" (numbered streets)
Uses a two-pass strategy:
Pass 1 (fast): direct equality on street_name — uses idx_permits_street index.
Pass 2 (slow): UPPER() fallback for case-insensitive + space-variant matching.
"""
base_name, suffix = _strip_suffix(street_name)
# --- Pass 1: Fast indexed lookup (direct equality, title-case) ---
# Try multiple forms that match the B-tree index on (street_number, street_name)
sql = f"""
SELECT * FROM permits
WHERE street_number = {_PH}
AND street_name IN ({_PH}, {_PH})
ORDER BY filed_date DESC
LIMIT 50
"""
rows = _exec(conn, sql, [street_number, base_name, street_name])
if rows:
return [_row_to_dict(r) for r in rows]
# --- Pass 2: Slower case-insensitive + variant matching ---
nospace_name = base_name.replace(' ', '')
sql = f"""
SELECT * FROM permits
WHERE street_number = {_PH}
AND (
UPPER(street_name) = UPPER({_PH})
OR UPPER(street_name) = UPPER({_PH})
OR UPPER(COALESCE(street_name, '') || ' ' || COALESCE(street_suffix, '')) = UPPER({_PH})
OR REPLACE(UPPER(COALESCE(street_name, '')), ' ', '') = UPPER({_PH})
)
ORDER BY filed_date DESC
LIMIT 50
"""
rows = _exec(conn, sql, [street_number, base_name, street_name, street_name, nospace_name])
return [_row_to_dict(r) for r in rows]
def _suggest_street_names(conn, street_number: str, street_name: str) -> list[tuple[str, int]]:
"""Find similar street names when exact match returns nothing.
Returns list of (street_name, permit_count) tuples for streets at
the given street_number that contain the search term as a substring.
"""
base_name, _ = _strip_suffix(street_name)
pattern = f"%{base_name}%"
sql = f"""
SELECT street_name, COUNT(*) as cnt
FROM permits
WHERE street_number = {_PH}
AND UPPER(street_name) LIKE UPPER({_PH})
GROUP BY street_name
ORDER BY cnt DESC
LIMIT 10
"""
rows = _exec(conn, sql, [street_number, pattern])
return [(r[0], r[1]) for r in rows if r[0]]
def _find_historical_lots(conn, block: str, lot: str) -> list[str]:
"""Find all lot numbers historically associated with a parcel.
When a property undergoes condo conversion (or lot split/merge), the
Assessor assigns new lot numbers. The street address stays the same,
so we can discover old lots by:
1. Looking up the street address from the given block/lot.
2. Finding all distinct lots on the same block at that address.
Returns a list of lot numbers (always includes the input lot).
"""
# Step 1: get the street address from the current lot
sql = f"""
SELECT DISTINCT street_number, street_name
FROM permits
WHERE block = {_PH} AND lot = {_PH}
AND street_number IS NOT NULL
AND street_name IS NOT NULL
LIMIT 1
"""
addr_row = _exec_one(conn, sql, [block, lot])
if not addr_row:
return [lot]
street_number, street_name = addr_row[0], addr_row[1]
# Step 2: find all lots on the same block at the same address
# Try direct equality first (indexed), fall back to case-insensitive
sql = f"""
SELECT DISTINCT lot
FROM permits
WHERE block = {_PH}
AND street_number = {_PH}
AND street_name = {_PH}
AND lot IS NOT NULL
"""
rows = _exec(conn, sql, [block, street_number, street_name])
if not rows:
sql = f"""
SELECT DISTINCT lot
FROM permits
WHERE block = {_PH}
AND street_number = {_PH}
AND UPPER(street_name) = UPPER({_PH})
AND lot IS NOT NULL
"""
rows = _exec(conn, sql, [block, street_number, street_name])
lots = list({r[0] for r in rows if r[0]})
# Ensure the input lot is always included
if lot not in lots:
lots.append(lot)
return lots
def _lookup_by_block_lot(conn, block: str, lot: str) -> list[dict]:
"""Match on block + lot (indexed composite).
Automatically discovers historical lot numbers (e.g., from condo
conversions) so the result includes permits filed under old lots.
"""
# Discover any historical lot numbers for this parcel
lots = _find_historical_lots(conn, block, lot)
if len(lots) == 1:
# Fast path: single lot, no history
sql = f"""
SELECT * FROM permits
WHERE block = {_PH} AND lot = {_PH}
ORDER BY filed_date DESC
LIMIT 50
"""
rows = _exec(conn, sql, [block, lots[0]])
else:
# Multiple lots: search all of them
placeholders = ", ".join([_PH] * len(lots))
sql = f"""
SELECT * FROM permits
WHERE block = {_PH} AND lot IN ({placeholders})
ORDER BY filed_date DESC
LIMIT 50
"""
rows = _exec(conn, sql, [block] + lots)
return [_row_to_dict(r) for r in rows]
# ---------------------------------------------------------------------------
# Enrichment functions
# ---------------------------------------------------------------------------
def _get_contacts(conn, permit_number: str) -> list[dict]:
"""Get contacts + entity enrichment for a permit."""
sql = f"""
SELECT c.role, c.name, c.firm_name, c.entity_id,
e.canonical_name, e.canonical_firm, e.permit_count
FROM contacts c
LEFT JOIN entities e ON c.entity_id = e.entity_id
WHERE c.permit_number = {_PH}
ORDER BY
CASE LOWER(COALESCE(c.role, ''))
WHEN 'applicant' THEN 1
WHEN 'contractor' THEN 2
WHEN 'architect' THEN 3
WHEN 'engineer' THEN 4
ELSE 5
END
"""
rows = _exec(conn, sql, [permit_number])
cols = ["role", "name", "firm_name", "entity_id",
"canonical_name", "canonical_firm", "permit_count"]
return [{cols[i]: r[i] for i in range(len(cols))} for r in rows]
def _get_inspections(conn, permit_number: str) -> list[dict]:
"""Get inspections for a permit, ordered by date."""
sql = f"""
SELECT scheduled_date, inspector, result, inspection_description
FROM inspections
WHERE reference_number = {_PH}
ORDER BY scheduled_date DESC
"""
rows = _exec(conn, sql, [permit_number])
cols = ["scheduled_date", "inspector", "result", "description"]
return [{cols[i]: r[i] for i in range(len(cols))} for r in rows]
def _get_timeline(conn, permit_number: str) -> dict | None:
"""Get pre-computed timeline stats if available."""
sql = f"""
SELECT days_to_issuance, days_to_completion
FROM timeline_stats
WHERE permit_number = {_PH}
LIMIT 1
"""
row = _exec_one(conn, sql, [permit_number])
if row:
return {"days_to_issuance": row[0], "days_to_completion": row[1]}
return None
def _get_related_location(conn, block: str, lot: str, exclude: str) -> list[dict]:
"""Get other permits at the same parcel (including historical lots)."""
lots = _find_historical_lots(conn, block, lot)
if len(lots) == 1:
sql = f"""
SELECT permit_number, permit_type_definition, status,
filed_date, estimated_cost, description
FROM permits
WHERE block = {_PH} AND lot = {_PH}
AND permit_number != {_PH}
ORDER BY filed_date DESC
LIMIT 25
"""
rows = _exec(conn, sql, [block, lots[0], exclude])
else:
placeholders = ", ".join([_PH] * len(lots))
sql = f"""
SELECT permit_number, permit_type_definition, status,
filed_date, estimated_cost, description
FROM permits
WHERE block = {_PH} AND lot IN ({placeholders})
AND permit_number != {_PH}
ORDER BY filed_date DESC
LIMIT 25
"""
rows = _exec(conn, sql, [block] + lots + [exclude])
cols = ["permit_number", "type", "status", "filed_date", "cost", "description"]
return [{cols[i]: r[i] for i in range(len(cols))} for r in rows]
def _get_related_team(conn, permit_number: str) -> list[dict]:
"""Get permits sharing team members via pre-computed relationships table.
Approach (QS3-B): O(E) where E = entities on the permit (typically 3-5).
Before: O(N²) self-join across contacts × contacts × permits × entities.
Now: 1) get entity_ids from contacts, 2) query relationships table,
3) fetch entity details + recent permits for connected entities.
Falls back to old self-join if relationships table doesn't exist (DuckDB dev).
"""
import time as _time
t0 = _time.monotonic()
# Try the fast relationships-based approach first
try:
# Step 1: Get entity_ids on this permit
entity_rows = _exec(conn, f"""
SELECT DISTINCT entity_id FROM contacts
WHERE permit_number = {_PH} AND entity_id IS NOT NULL
""", [permit_number])
entity_ids = [r[0] for r in entity_rows]
if not entity_ids:
logger.debug("_get_related_team: no entities for permit %s", permit_number)
return []
# Step 2: Query relationships for each entity (both directions)
placeholders = ", ".join([_PH] * len(entity_ids))
rel_rows = _exec(conn, f"""
SELECT entity_id_a, entity_id_b, shared_permits, permit_numbers, neighborhoods
FROM relationships
WHERE entity_id_a IN ({placeholders}) OR entity_id_b IN ({placeholders})
""", entity_ids + entity_ids)
if not rel_rows:
elapsed = _time.monotonic() - t0
logger.debug("_get_related_team (relationships): 0 results in %.2fs", elapsed)
return []
# Collect connected entity IDs (exclude the ones on this permit)
connected_ids = set()
entity_id_set = set(entity_ids)
for r in rel_rows:
if r[0] not in entity_id_set:
connected_ids.add(r[0])
if r[1] not in entity_id_set:
connected_ids.add(r[1])
if not connected_ids:
return []
# Step 3: Get entity details for connected entities
conn_placeholders = ", ".join([_PH] * len(connected_ids))
conn_ids_list = list(connected_ids)
ent_rows = _exec(conn, f"""
SELECT entity_id, canonical_name, canonical_firm, permit_count
FROM entities WHERE entity_id IN ({conn_placeholders})
""", conn_ids_list)
entity_map = {r[0]: {"name": r[1], "firm": r[2], "count": r[3]} for r in ent_rows}
# Step 4: Collect permit numbers from relationships and get recent ones
all_permit_nums = set()
for r in rel_rows:
pnums_str = r[3] # permit_numbers column (comma-separated or array)
if pnums_str:
if isinstance(pnums_str, list):
all_permit_nums.update(pnums_str)
else:
all_permit_nums.update(str(pnums_str).split(","))
all_permit_nums.discard(permit_number)
all_permit_nums.discard("")
if not all_permit_nums:
return []
# Limit to 50 permit numbers for the query
pnum_list = list(all_permit_nums)[:50]
pnum_placeholders = ", ".join([_PH] * len(pnum_list))
permit_rows = _exec(conn, f"""
SELECT permit_number, permit_type_definition, status,
filed_date, estimated_cost, description
FROM permits
WHERE permit_number IN ({pnum_placeholders})
ORDER BY filed_date DESC
LIMIT 25
""", pnum_list)
# Build result — match permits to the entity that connects them
results = []
for pr in permit_rows:
# Find which connected entity links to this permit
shared_name = None
shared_role = None
for r in rel_rows:
pnums = r[3]
pnum_set = set(pnums) if isinstance(pnums, list) else set(str(pnums).split(","))
if pr[0] in pnum_set:
eid = r[1] if r[0] in entity_id_set else r[0]
ent = entity_map.get(eid, {})
shared_name = ent.get("name")
break
results.append({
"permit_number": pr[0], "type": pr[1], "status": pr[2],
"filed_date": pr[3], "cost": pr[4], "description": pr[5],
"shared_entity": shared_name, "shared_role": shared_role,
})
elapsed = _time.monotonic() - t0
logger.debug("_get_related_team (relationships): %d results in %.2fs", len(results), elapsed)
return results
except Exception as e:
# Relationships table might not exist (DuckDB dev) — fall back to self-join
if "relationships" in str(e).lower() or "Catalog Error" in str(e) or "does not exist" in str(e).lower():
logger.warning("_get_related_team: relationships table not available, falling back to self-join: %s", e)
else:
logger.warning("_get_related_team: relationships query failed, falling back: %s", e)
# Fallback: original O(N²) self-join approach
sql = f"""
SELECT DISTINCT ON (p.permit_number)
p.permit_number, p.permit_type_definition, p.status,
p.filed_date, p.estimated_cost, p.description,
e.canonical_name, c2.role
FROM contacts c1
JOIN contacts c2 ON c1.entity_id = c2.entity_id
AND c2.permit_number != {_PH}
JOIN permits p ON c2.permit_number = p.permit_number
JOIN entities e ON c1.entity_id = e.entity_id
WHERE c1.permit_number = {_PH}
AND c1.entity_id IS NOT NULL
ORDER BY p.permit_number, p.filed_date DESC
LIMIT 25
"""
if BACKEND == "duckdb":
sql = f"""
SELECT p.permit_number, p.permit_type_definition, p.status,
p.filed_date, p.estimated_cost, p.description,
e.canonical_name, c2.role
FROM contacts c1
JOIN contacts c2 ON c1.entity_id = c2.entity_id
AND c2.permit_number != {_PH}
JOIN permits p ON c2.permit_number = p.permit_number
JOIN entities e ON c1.entity_id = e.entity_id
WHERE c1.permit_number = {_PH}
AND c1.entity_id IS NOT NULL
GROUP BY p.permit_number, p.permit_type_definition, p.status,
p.filed_date, p.estimated_cost, p.description,
e.canonical_name, c2.role
ORDER BY p.filed_date DESC
LIMIT 25
"""
rows = _exec(conn, sql, [permit_number, permit_number])
elapsed = _time.monotonic() - t0
logger.debug("_get_related_team (self-join fallback): %d results in %.2fs", len(rows), elapsed)
cols = ["permit_number", "type", "status", "filed_date", "cost",
"description", "shared_entity", "shared_role"]
return [{cols[i]: r[i] for i in range(len(cols))} for r in rows]
def _get_planning_records(conn, block: str | None, lot: str | None) -> list[dict]:
"""Get planning entitlements for this parcel (from planning_records table)."""
if not block or not lot:
return []
try:
rows = _exec(conn, f"""
SELECT record_id, record_type, record_status, description,
assigned_planner, open_date, is_project
FROM planning_records
WHERE block = {_PH} AND lot = {_PH}
ORDER BY open_date DESC
LIMIT 10
""", [block, lot])
return [
{
"record_id": r[0],
"record_type": r[1],
"record_status": r[2],
"description": r[3],
"assigned_planner": r[4],
"open_date": r[5],
"is_project": r[6],
}
for r in rows
]
except Exception:
logger.debug("planning_records query failed", exc_info=True)
return []
def _get_boiler_permits(conn, block: str | None, lot: str | None) -> list[dict]:
"""Get boiler permits for this parcel (from boiler_permits table)."""
if not block or not lot:
return []
try:
rows = _exec(conn, f"""
SELECT permit_number, boiler_type, status, expiration_date, description
FROM boiler_permits
WHERE block = {_PH} AND lot = {_PH}
ORDER BY expiration_date DESC NULLS LAST
LIMIT 10
""", [block, lot])
return [
{
"permit_number": r[0],
"boiler_type": r[1],
"status": r[2],
"expiration_date": r[3],
"description": r[4],
}
for r in rows
]
except Exception:
logger.debug("boiler_permits query failed", exc_info=True)
return []
def _get_dev_pipeline(conn, block: str | None, lot: str | None) -> list[dict]:
"""Get development pipeline entries for this parcel (from development_pipeline table)."""
if not block or not lot:
return []
try:
# development_pipeline stores block_lot in various formats — try both
block_lot_dash = f"{block}-{lot}"
block_lot_slash = f"{block}/{lot}"
block_lot_plain = f"{block}{lot}"
rows = _exec(conn, f"""
SELECT record_id, name_address, current_status, proposed_units,
net_pipeline_units, zoning_district
FROM development_pipeline
WHERE block_lot IN ({_PH}, {_PH}, {_PH})
ORDER BY record_id DESC
LIMIT 10
""", [block_lot_dash, block_lot_slash, block_lot_plain])
return [
{
"record_id": r[0],
"name_address": r[1],
"current_status": r[2],
"proposed_units": r[3],
"net_pipeline_units": r[4],
"zoning_district": r[5],
}
for r in rows
]
except Exception:
logger.debug("development_pipeline query failed (table may not exist yet)", exc_info=True)
return []
def _get_addenda(conn, permit_number: str) -> list[dict]:
"""Get addenda routing steps for a permit, ordered by addenda then step."""
sql = f"""
SELECT addenda_number, step, station, plan_checked_by,
review_results, finish_date, hold_description, department,
arrive, start_date
FROM addenda
WHERE application_number = {_PH}
ORDER BY addenda_number, step
"""
rows = _exec(conn, sql, [permit_number])
cols = ["addenda_number", "step", "station", "reviewer",
"result", "finish_date", "notes", "department",
"arrive", "start_date"]
return [{cols[i]: r[i] for i in range(len(cols))} for r in rows]
# ---------------------------------------------------------------------------
# Recent activity summary (last 30 days)
# ---------------------------------------------------------------------------
def _get_recent_addenda_activity(conn, permit_numbers: list[str], days: int = 30) -> list[dict]:
"""Get recently completed plan review routing steps across multiple permits.
Returns list of dicts with: permit_number, station, reviewer, result,
finish_date, addenda_number. Sorted by finish_date DESC.
"""
if not permit_numbers:
return []
cutoff = date.today() - timedelta(days=days)
cutoff_str = cutoff.isoformat()
# Build IN clause
placeholders = ", ".join([_PH] * len(permit_numbers))
sql = f"""
SELECT application_number, station, plan_checked_by,
review_results, finish_date, addenda_number
FROM addenda
WHERE application_number IN ({placeholders})
AND finish_date IS NOT NULL
AND CAST(finish_date AS TEXT) >= {_PH}
ORDER BY finish_date DESC
LIMIT 20
"""
try:
rows = _exec(conn, sql, permit_numbers + [cutoff_str])
except Exception:
logger.debug("Recent addenda activity query failed", exc_info=True)
return []
cols = ["permit_number", "station", "reviewer", "result",
"finish_date", "addenda_number"]
return [{cols[i]: r[i] for i in range(len(cols))} for r in rows]
def _summarize_recent_activity(permits: list[dict], days: int = 30,
conn=None) -> str | None:
"""Build a markdown activity-summary block from an already-fetched permits list.
Returns None when there is nothing noteworthy in the window, so the caller
can skip the section entirely rather than show an empty card.
If *conn* is provided, also queries addenda for recent plan review activity.
"""
cutoff = date.today() - timedelta(days=days)
cutoff_str = cutoff.isoformat() # "YYYY-MM-DD" — same format as DB column
new_permits: list[dict] = []
recently_issued: list[dict] = []
recently_completed: list[dict] = []
for p in permits:
fd = (p.get("filed_date") or "")[:10]
issued = (p.get("issued_date") or "")[:10]
completed = (p.get("completed_date") or "")[:10]
if fd >= cutoff_str:
new_permits.append(p)
if issued >= cutoff_str:
recently_issued.append(p)
if completed >= cutoff_str:
recently_completed.append(p)
# ── Plan review activity (addenda) ──
recent_addenda: list[dict] = []
if conn:
pnums = [p.get("permit_number") for p in permits if p.get("permit_number")]
recent_addenda = _get_recent_addenda_activity(conn, pnums, days)
# Nothing interesting → skip section
if not new_permits and not recently_issued and not recently_completed and not recent_addenda:
return None
lines = [f"## 🕐 Last {days} Days at This Address\n"]
# ── Plan review activity (most actionable — show first) ──
if recent_addenda:
# Group by result type for a cleaner summary
approved = [a for a in recent_addenda
if a.get("result") and "approv" in (a["result"] or "").lower()]
comments = [a for a in recent_addenda
if a.get("result") and "comment" in (a["result"] or "").lower()]
other = [a for a in recent_addenda
if a not in approved and a not in comments]
total = len(recent_addenda)
noun = "review" if total == 1 else "reviews"
lines.append(f"**🗂️ {total} plan {noun} completed**")
# Show approved first (green signal)
for a in approved[:4]:
pn = a.get("permit_number", "")
station = a.get("station") or "—"
reviewer = a.get("reviewer") or "—"
fd = str(a.get("finish_date") or "")[:10]
rev = a.get("addenda_number")
rev_str = f" · rev {rev}" if rev is not None else ""
lines.append(f"- ✅ **{station}** approved by {reviewer} · {fd}{rev_str}")
# Show comments (needs response)
for a in comments[:3]:
pn = a.get("permit_number", "")
station = a.get("station") or "—"
reviewer = a.get("reviewer") or "—"
fd = str(a.get("finish_date") or "")[:10]
lines.append(f"- 💬 **{station}** issued comments by {reviewer} · {fd}")
# Show other completions
for a in other[:2]:
station = a.get("station") or "—"
result = a.get("result") or "completed"
fd = str(a.get("finish_date") or "")[:10]
lines.append(f"- {station} — {result} · {fd}")
shown = min(len(approved), 4) + min(len(comments), 3) + min(len(other), 2)
if total > shown:
lines.append(f"- *…and {total - shown} more*")
lines.append("")
# ── New filings ──
if new_permits:
count = len(new_permits)
noun = "permit" if count == 1 else "permits"
lines.append(f"**🆕 {count} new {noun} filed**")
for p in new_permits[:3]:
pn = p.get("permit_number", "")
pn_link = f"[{pn}](https://dbiweb02.sfgov.org/dbipts/default.aspx?page=Permit&PermitNumber={pn})"
pt = (p.get("permit_type_definition") or "Building Permit")[:40]
fd = (p.get("filed_date") or "")[:10]
cost = p.get("revised_cost") or p.get("estimated_cost") or 0
cost_str = f" · ${cost:,.0f}" if cost else ""
lines.append(f"- {pn_link} — {pt} · filed {fd}{cost_str}")
if count > 3:
lines.append(f"- *…and {count - 3} more*")
lines.append("")
# ── Issued ──
if recently_issued:
count = len(recently_issued)
noun = "permit" if count == 1 else "permits"
lines.append(f"**✅ {count} {noun} issued**")
for p in recently_issued[:3]:
pn = p.get("permit_number", "")
pn_link = f"[{pn}](https://dbiweb02.sfgov.org/dbipts/default.aspx?page=Permit&PermitNumber={pn})"
pt = (p.get("permit_type_definition") or "")[:40]
issued = (p.get("issued_date") or "")[:10]
lines.append(f"- {pn_link} — {pt} · issued {issued}")
if count > 3:
lines.append(f"- *…and {count - 3} more*")
lines.append("")
# ── Completed ──
if recently_completed:
count = len(recently_completed)
noun = "permit" if count == 1 else "permits"
lines.append(f"**🏁 {count} {noun} completed**")
for p in recently_completed[:3]:
pn = p.get("permit_number", "")
pn_link = f"[{pn}](https://dbiweb02.sfgov.org/dbipts/default.aspx?page=Permit&PermitNumber={pn})"
pt = (p.get("permit_type_definition") or "")[:40]
completed = (p.get("completed_date") or "")[:10]
lines.append(f"- {pn_link} — {pt} · completed {completed}")
if count > 3:
lines.append(f"- *…and {count - 3} more*")
lines.append("")
lines.append("---\n")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Markdown formatters
# ---------------------------------------------------------------------------
def _format_addenda(addenda: list[dict]) -> str:
"""Format addenda routing as markdown table."""
if not addenda:
return "*No plan review routing data available for this permit.*"
# Summary stats
total_steps = len(addenda)
completed = sum(1 for a in addenda if a.get("result"))
pending = total_steps - completed
stations = set(a.get("station") for a in addenda if a.get("station"))
lines = [
f"**{total_steps} routing steps** across **{len(stations)} stations** "
f"({completed} completed, {pending} pending)\n",
"| Station | Rev | Reviewer | Result | Finish Date | Notes |",
"|---------|-----|----------|--------|-------------|-------|",
]
for a in addenda[:50]: # Cap at 50 rows
station = a.get("station") or "---"
rev = a.get("addenda_number")
rev_str = str(rev) if rev is not None else "---"
reviewer = a.get("reviewer") or "---"
result = a.get("result") or "---"
finish = a.get("finish_date") or "---"
if finish and len(finish) > 10:
finish = finish[:10]
notes = a.get("notes") or "---"
if len(notes) > 80:
notes = notes[:80] + "..."
lines.append(f"| {station} | {rev_str} | {reviewer} | {result} | {finish} | {notes} |")
if len(addenda) > 50:
lines.append(f"\n*Showing 50 of {len(addenda)} routing steps.*")
return "\n".join(lines)
def _format_planning_records(records: list[dict]) -> str:
"""Format planning entitlements as markdown."""
if not records:
return ""
lines = [
"| Record ID | Type | Status | Open Date | Planner | Description |",
"|-----------|------|--------|-----------|---------|-------------|",
]
for r in records[:10]:
rid = r.get("record_id") or "—"
rtype = (r.get("record_type") or "—")[:30]
status = r.get("record_status") or "—"
opened = str(r.get("open_date") or "—")[:10]
planner = r.get("assigned_planner") or "—"
desc = (r.get("description") or "—")[:60]
if len(r.get("description") or "") > 60:
desc += "..."
lines.append(f"| {rid} | {rtype} | {status} | {opened} | {planner} | {desc} |")
return "\n".join(lines)
def _format_boiler_permits(boilers: list[dict]) -> str:
"""Format boiler permits as markdown."""
if not boilers:
return ""
lines = [
"| Permit # | Type | Status | Expiration | Description |",
"|----------|------|--------|------------|-------------|",
]
for b in boilers[:10]:
pnum = b.get("permit_number") or "—"
btype = (b.get("boiler_type") or "—")[:30]
status = b.get("status") or "—"
exp = str(b.get("expiration_date") or "—")[:10]
desc = (b.get("description") or "—")[:60]
if len(b.get("description") or "") > 60:
desc += "..."
lines.append(f"| {pnum} | {btype} | {status} | {exp} | {desc} |")
return "\n".join(lines)
def _format_dev_pipeline(pipeline: list[dict]) -> str:
"""Format development pipeline entries as markdown."""
if not pipeline:
return ""
lines = [
"| Record ID | Address / Name | Status | Proposed Units | Net Units | Zoning |",
"|-----------|---------------|--------|---------------|-----------|--------|",
]
for p in pipeline[:10]:
rid = p.get("record_id") or "—"
name = (p.get("name_address") or "—")[:40]
status = (p.get("current_status") or "—")[:30]
proposed = p.get("proposed_units") or "—"
net = p.get("net_pipeline_units") or "—"
zoning = p.get("zoning_district") or "—"
lines.append(f"| {rid} | {name} | {status} | {proposed} | {net} | {zoning} |")
return "\n".join(lines)
def _format_permit_detail(p: dict) -> str:
"""Format a single permit as markdown."""
lines = []
pn = p['permit_number']
pn_url = f"https://dbiweb02.sfgov.org/dbipts/default.aspx?page=Permit&PermitNumber={pn}"
from src.report_links import ReportLinks
details_url = ReportLinks.dbi_permit_details(pn)
lines.append(f"**Permit Number:** [{pn}]({pn_url}) | [DBI Permit Details]({details_url})")
lines.append(f"**Type:** {p.get('permit_type_definition') or p.get('permit_type') or 'Unknown'}")
lines.append(f"**Status:** {p.get('status') or 'Unknown'}")
if p.get("status_date"):
lines[-1] += f" (as of {p['status_date']})"
if p.get("description"):
desc = p["description"][:200]
if len(p["description"]) > 200:
desc += "..."
lines.append(f"**Description:** {desc}")
# Dates
dates = []
if p.get("filed_date"):
dates.append(f"Filed: {p['filed_date']}")
if p.get("issued_date"):
dates.append(f"Issued: {p['issued_date']}")
if p.get("approved_date"):
dates.append(f"Approved: {p['approved_date']}")
if p.get("completed_date"):
dates.append(f"Completed: {p['completed_date']}")
if dates:
lines.append(f"**Dates:** {' | '.join(dates)}")
# Cost
cost_parts = []
if p.get("estimated_cost"):
cost_parts.append(f"${p['estimated_cost']:,.0f} estimated")
if p.get("revised_cost") and p["revised_cost"] != p.get("estimated_cost"):
cost_parts.append(f"${p['revised_cost']:,.0f} revised")
if cost_parts:
lines.append(f"**Cost:** {' / '.join(cost_parts)}")
# Address
addr_parts = [p.get("street_number", ""), p.get("street_name", ""),
p.get("street_suffix", "")]
addr = " ".join(a for a in addr_parts if a).strip()
if addr:
lines.append(f"**Address:** {addr}, San Francisco CA {p.get('zipcode', '')}")
if p.get("neighborhood"):
lines.append(f"**Neighborhood:** {p['neighborhood']}")
if p.get("block") and p.get("lot"):
lines.append(f"**Parcel:** Block {p['block']}, Lot {p['lot']}")
# Use
if p.get("existing_use") or p.get("proposed_use"):
use = f"{p.get('existing_use', '—')} → {p.get('proposed_use', '—')}"
lines.append(f"**Use:** {use}")
return "\n".join(lines)
def _format_contacts(contacts: list[dict]) -> str:
"""Format contacts/team as markdown."""
if not contacts:
return "*No team contacts on file for this permit.*"
lines = []
for c in contacts:
role = (c.get("role") or "Unknown").title()
name = c.get("canonical_name") or c.get("name") or "Unknown"
firm = c.get("canonical_firm") or c.get("firm_name") or ""
permit_count = c.get("permit_count")
line = f"- **{role}:** {name}"
if firm:
line += f" ({firm})"
if permit_count and permit_count > 1:
line += f" — {permit_count:,} SF permits on file"
lines.append(line)
return "\n".join(lines)
def _format_inspections(inspections: list[dict]) -> str:
"""Format inspections as markdown table."""
if not inspections:
return "*No inspections recorded for this permit.*"
lines = [
"| Date | Inspector | Result | Description |",
"|------|-----------|--------|-------------|",
]
for insp in inspections[:30]: # Cap at 30 rows
date = insp.get("scheduled_date") or "—"
inspector = insp.get("inspector") or "—"
result = insp.get("result") or "—"
desc = insp.get("description") or "—"
if len(desc) > 60:
desc = desc[:60] + "..."
lines.append(f"| {date} | {inspector} | {result} | {desc} |")
if len(inspections) > 30:
lines.append(f"\n*Showing 30 of {len(inspections)} inspections.*")
return "\n".join(lines)
def _format_related(location_permits: list[dict], team_permits: list[dict],
block: str | None, lot: str | None) -> str:
"""Format related permits section."""
lines = []
# Same location
if block and lot:
lines.append(f"### Same Location (Block {block}, Lot {lot})")
if location_permits:
lines.append(f"Found **{len(location_permits)}** other permits at this parcel:\n")
lines.append("| Permit # | Type | Status | Filed | Cost |")
lines.append("|----------|------|--------|-------|------|")
for p in location_permits[:20]:
pnum = p.get("permit_number", "")
ptype = (p.get("type") or "")[:40]
status = p.get("status") or "—"
filed = p.get("filed_date") or "—"
try:
cost = f"${float(p['cost']):,.0f}" if p.get("cost") else "—"
except (ValueError, TypeError):
cost = "—"
lines.append(f"| {pnum} | {ptype} | {status} | {filed} | {cost} |")
if len(location_permits) > 20:
lines.append(f"\n*Showing 20 of {len(location_permits)}.*")
else:
lines.append("*No other permits found at this parcel.*")
# Same team
lines.append("\n### Same Team Members")
if team_permits:
lines.append(f"Found **{len(team_permits)}** permits sharing team members:\n")
lines.append("| Permit # | Type | Status | Filed | Shared Via |")
lines.append("|----------|------|--------|-------|------------|")
for p in team_permits[:20]:
pnum = p.get("permit_number", "")
ptype = (p.get("type") or "")[:35]
status = p.get("status") or "—"
filed = p.get("filed_date") or "—"
shared = f"{p.get('shared_entity', '')} ({p.get('shared_role', '')})"
lines.append(f"| {pnum} | {ptype} | {status} | {filed} | {shared} |")
if len(team_permits) > 20:
lines.append(f"\n*Showing 20 of {len(team_permits)}.*")
else:
lines.append("*No related permits found via shared team members.*")
return "\n".join(lines)
def _format_permit_list(permits: list[dict], search_desc: str) -> str:
"""Format a list of permits for address/block-lot results."""
lines = [
f"# Permit Lookup Results\n",
f"Found **{len(permits)}** permits matching {search_desc}.\n",
"| Permit # | Type | Status | Filed | Cost | Description |",
"|----------|------|--------|-------|------|-------------|",
]
for p in permits:
pnum = p.get("permit_number", "")
ptype = (p.get("permit_type_definition") or "")[:30]
status = p.get("status") or "—"
filed = p.get("filed_date") or "—"
cost = f"${p['estimated_cost']:,.0f}" if p.get("estimated_cost") else "—"
desc = (p.get("description") or "")[:50]
if len(p.get("description", "")) > 50:
desc += "..."
lines.append(f"| {pnum} | {ptype} | {status} | {filed} | {cost} | {desc} |")
lines.append(f"\n*Source: sfpermits.ai local database ({BACKEND})*")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Main tool entry point
# ---------------------------------------------------------------------------
async def permit_lookup(
permit_number: str | None = None,
street_number: str | None = None,
street_name: str | None = None,
block: str | None = None,
lot: str | None = None,
) -> str:
"""Look up SF permits by number, address, or parcel. Shows full details and related permits.
Searches the local database of 1.1M+ SF building permits. Provide ONE of:
- permit_number: exact permit number (e.g., '202301015555')
- street_number + street_name: address search (e.g., '123' + 'Main')
- block + lot: SF parcel identifier (e.g., '3512' + '001')
Returns permit details, project team, inspections, and related permits.
"""
# Validate input — at least one search mode
has_permit = bool(permit_number and permit_number.strip())
has_address = bool(street_number and street_number.strip() and street_name and street_name.strip())
has_parcel = bool(block and block.strip() and lot and lot.strip())
if not has_permit and not has_address and not has_parcel:
return "Please provide a permit number, address (street number + street name), or parcel (block + lot)."
conn = get_connection()
try:
# 1. Find the permit(s)
if has_permit:
permits = _lookup_by_number(conn, permit_number.strip())
if not permits:
return f"No permit found with number **{permit_number.strip()}**."
elif has_address:
sn = street_number.strip()
sname = street_name.strip()
permits = _lookup_by_address(conn, sn, sname)
if not permits:
# Fallback: suggest similar street names so the user can pick
suggestions = _suggest_street_names(conn, sn, sname)
if suggestions:
lines = [f"No exact match for **{sn} {sname}**. Did you mean:\n"]
for name, count in suggestions:
lines.append(f"- **{name}** ({count} permits)")
return "\n".join(lines)
return f"No permits found at **{sn} {sname}**."
# Merge parcel-level permits: multi-unit buildings may have
# permits filed under different street numbers (e.g., 144 vs 146).
# Resolve the block/lot from the address results, then pull all
# permits at that parcel (including historical lots).
first_block = permits[0].get("block")
first_lot = permits[0].get("lot")
if first_block and first_lot:
parcel_permits = _lookup_by_block_lot(conn, first_block, first_lot)
# Merge: add any parcel permits not already in the address results
seen = {p["permit_number"] for p in permits}
for pp in parcel_permits:
if pp["permit_number"] not in seen:
permits.append(pp)
seen.add(pp["permit_number"])
# Re-sort by filed_date DESC after merge
permits.sort(
key=lambda p: p.get("filed_date") or "",
reverse=True,
)
else:
permits = _lookup_by_block_lot(conn, block.strip(), lot.strip())
if not permits:
return f"No permits found at **Block {block.strip()}, Lot {lot.strip()}**."
# 2. If multiple permits (address/parcel search), show list + detail for first
primary = permits[0]
pnum = primary["permit_number"]
lines = ["# Permit Lookup Results\n"]
# 2a. Recent activity banner (address/parcel searches only — not single permit lookup)
if has_address or has_parcel:
activity_md = _summarize_recent_activity(permits, conn=conn)
if activity_md:
lines.append(activity_md)
# If address/parcel returned multiple, show the list first
if len(permits) > 1:
search_desc = ""
if has_address:
search_desc = f"{street_number.strip()} {street_name.strip()}"
elif has_parcel:
search_desc = f"Block {block.strip()}, Lot {lot.strip()}"
lines.append(f"Found **{len(permits)}** permits at **{search_desc}**.\n")
lines.append("| Permit # | Type | Status | Filed | Cost |")
lines.append("|----------|------|--------|-------|------|")
for p in permits[:20]:
pn = p.get("permit_number", "")
# Hyperlink permit number to DBI tracker
pn_link = f"[{pn}](https://dbiweb02.sfgov.org/dbipts/default.aspx?page=Permit&PermitNumber={pn})" if pn else "—"
pt = (p.get("permit_type_definition") or "")[:35]
st = p.get("status") or "—"
fd = p.get("filed_date") or "—"
c = f"${p['estimated_cost']:,.0f}" if p.get("estimated_cost") else "—"
lines.append(f"| {pn_link} | {pt} | {st} | {fd} | {c} |")
if len(permits) > 20:
lines.append(f"\n*Showing 20 of {len(permits)}.*")
lines.append(f"\n---\n\n**Showing details for most recent: {pnum}**\n")
# 3. Detailed view of primary permit
lines.append("## Permit Details\n")
lines.append(_format_permit_detail(primary))
# Timeline stats
try:
timeline = _get_timeline(conn, pnum)
if timeline:
if timeline.get("days_to_issuance"):
lines.append(f"\n**Filing → Issuance:** {timeline['days_to_issuance']} days")
if timeline.get("days_to_completion"):
lines.append(f"**Issuance → Completion:** {timeline['days_to_completion']} days")
except Exception:
pass # timeline_stats table might not exist in all envs
# 4. Team
lines.append("\n## Project Team\n")
if circuit_breaker.is_open("contacts"):
contacts = []
lines.append("*Contact data temporarily unavailable (circuit breaker open).*")
else:
try:
contacts = _get_contacts(conn, pnum)
lines.append(_format_contacts(contacts))
circuit_breaker.record_success("contacts")
except Exception as e:
contacts = []
circuit_breaker.record_failure("contacts")
logger.warning("Contacts query failed for %s: %s", pnum, e)
lines.append("*Contact data temporarily unavailable.*")
# 5. Inspections
lines.append("\n## Inspection History\n")
if circuit_breaker.is_open("inspections"):
inspections = []
lines.append("*Inspection data temporarily unavailable (circuit breaker open).*")
else:
try:
inspections = _get_inspections(conn, pnum)
lines.append(_format_inspections(inspections))
circuit_breaker.record_success("inspections")
except Exception as e:
inspections = []
circuit_breaker.record_failure("inspections")
logger.warning("Inspections query failed for %s: %s", pnum, e)
lines.append("*Inspection data temporarily unavailable.*")
# 5.25 Severity Score
try:
from src.severity import PermitInput, score_permit
insp_count = len(inspections)
permit_input = PermitInput.from_dict(primary, inspection_count=insp_count)
sev = score_permit(permit_input)
lines.append(f"\n## Severity Score: {sev.score}/100 — {sev.tier}\n")
lines.append(f"**Category:** {sev.category} | **Top Driver:** "
f"{sev.top_driver.replace('_', ' ').title()}")
lines.append(f"**Explanation:** {sev.explanation}\n")
lines.append("| Dimension | Score | Weight |")
lines.append("|-----------|-------|--------|")
dim_labels = {
"inspection_activity": "Inspection Activity",
"age_staleness": "Age / Staleness",
"expiration_proximity": "Expiration Proximity",
"cost_tier": "Cost Tier",
"category_risk": "Category Risk",
}
for name, dim in sev.dimensions.items():
label = dim_labels.get(name, name)
marker = " **<<**" if name == sev.top_driver else ""
lines.append(f"| {label} | {dim['score']:.0f}/100 | {dim['weight']:.0%}{marker} |")
except Exception:
logger.debug("Severity scoring failed for %s", pnum, exc_info=True)
# 5.5 Plan Review Routing (Addenda)
lines.append("\n## Plan Review Routing\n")
if circuit_breaker.is_open("addenda"):
lines.append("*Plan review routing data temporarily unavailable (circuit breaker open).*")
else:
try:
addenda_rows = _get_addenda(conn, pnum)
lines.append(_format_addenda(addenda_rows))
circuit_breaker.record_success("addenda")
except Exception:
circuit_breaker.record_failure("addenda")
lines.append("*Plan review routing data not available.*")
# 6. Related permits
lines.append("\n## Related Permits\n")
# Related by location
p_block = primary.get("block")
p_lot = primary.get("lot")
location_permits = []
if p_block and p_lot:
try:
location_permits = _get_related_location(conn, p_block, p_lot, pnum)
except Exception as e:
logger.warning("Related location query failed for %s: %s", pnum, e)
# Related by team
team_permits = []
if circuit_breaker.is_open("related_team"):
logger.info("Skipping related team lookup — circuit breaker open")
else:
try:
team_permits = _get_related_team(conn, pnum)
circuit_breaker.record_success("related_team")
except Exception as e:
circuit_breaker.record_failure("related_team")
logger.warning("Related team query failed: %s", e)
lines.append(_format_related(location_permits, team_permits, p_block, p_lot))
# 7. Planning records (entitlements)
if not circuit_breaker.is_open("planning_records"):
planning_records = _get_planning_records(conn, p_block, p_lot)
if planning_records:
circuit_breaker.record_success("planning_records")
lines.append("\n## Planning Records (Entitlements)\n")
lines.append(_format_planning_records(planning_records))
# 8. Boiler permits
if not circuit_breaker.is_open("boiler_permits"):
boiler_permits = _get_boiler_permits(conn, p_block, p_lot)
if boiler_permits:
circuit_breaker.record_success("boiler_permits")
lines.append("\n## Boiler Permits\n")
lines.append(_format_boiler_permits(boiler_permits))
# 9. Development pipeline
dev_pipeline = _get_dev_pipeline(conn, p_block, p_lot)
if dev_pipeline:
lines.append("\n## Development Pipeline\n")
lines.append(_format_dev_pipeline(dev_pipeline))
# Source citation
lines.append(f"\n---\n*Source: sfpermits.ai local database ({BACKEND}) — 1.1M permits, 1.8M contacts, 671K inspections*")
return "\n".join(lines)
finally:
conn.close()