state_confidence.py•7.77 kB
"""PostgreSQL state confidence query module."""
import logging
from typing import Dict, Optional
import psycopg2
from psycopg2.extras import RealDictCursor
logger = logging.getLogger(__name__)
class StateConfidenceClient:
"""Client for querying state confidence scores from PostgreSQL."""
def __init__(
self,
host: str,
port: int,
database: str,
user: str,
password: str,
sslmode: str = "prefer",
):
"""Initialize PostgreSQL connection.
Args:
host: PostgreSQL host
port: PostgreSQL port
database: Database name
user: Database user
password: Database password
sslmode: SSL mode (default: prefer)
"""
self.host = host
self.port = port
self.database = database
self.user = user
self.password = password
self.sslmode = sslmode
self._conn: Optional[psycopg2.extensions.connection] = None
def _get_connection(self) -> Optional[psycopg2.extensions.connection]:
"""Get PostgreSQL connection with retry logic.
Returns:
Database connection or None if connection fails
"""
if self._conn and not self._conn.closed:
return self._conn
try:
self._conn = psycopg2.connect(
host=self.host,
port=self.port,
database=self.database,
user=self.user,
password=self.password,
sslmode=self.sslmode,
)
logger.debug("PostgreSQL connection established")
return self._conn
except psycopg2.Error as e:
logger.warning(f"Failed to connect to PostgreSQL: {e}")
return None
def get_certainty_score(self, state_id: str, state_type: str) -> Optional[Dict]:
"""Get certainty score for a state object.
Args:
state_id: State identifier (hostname, IP address, VLAN ID, etc.)
state_type: Type of state (device, vm, ip_address, vlan, etc.)
Returns:
Dictionary with certainty score and metadata, or None if not found
"""
conn = self._get_connection()
if not conn:
logger.debug("PostgreSQL connection unavailable, skipping certainty score lookup")
return None
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
SELECT
certainty_score,
data_age_seconds,
source_reliability,
verification_status,
state_type_bonus,
certainty_factors,
verification_sources,
last_verified,
updated_at
FROM state.certainty_scores
WHERE state_id = %s AND state_type = %s
ORDER BY updated_at DESC
LIMIT 1
""",
(state_id, state_type),
)
row = cur.fetchone()
if row:
return {
"certainty_score": float(row["certainty_score"]),
"data_age_seconds": row["data_age_seconds"],
"source_reliability": float(row["source_reliability"])
if row["source_reliability"]
else None,
"verification_status": float(row["verification_status"])
if row["verification_status"]
else None,
"state_type_bonus": float(row["state_type_bonus"])
if row["state_type_bonus"]
else None,
"certainty_factors": row["certainty_factors"],
"verification_sources": row["verification_sources"],
"last_verified": row["last_verified"].isoformat()
if row["last_verified"]
else None,
"updated_at": row["updated_at"].isoformat() if row["updated_at"] else None,
}
return None
except psycopg2.Error as e:
logger.warning(f"Error querying certainty scores: {e}")
return None
def get_certainty_scores_batch(
self, state_ids: list[tuple[str, str]]
) -> Dict[tuple[str, str], Dict]:
"""Get certainty scores for multiple state objects.
Args:
state_ids: List of tuples (state_id, state_type)
Returns:
Dictionary mapping (state_id, state_type) to certainty score data
"""
conn = self._get_connection()
if not conn:
logger.debug("PostgreSQL connection unavailable, skipping batch certainty lookup")
return {}
results = {}
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# Build query with IN clause
placeholders = ",".join(["%s"] * len(state_ids))
cur.execute(
f"""
SELECT
state_id,
state_type,
certainty_score,
data_age_seconds,
source_reliability,
verification_status,
state_type_bonus,
certainty_factors,
verification_sources,
last_verified,
updated_at
FROM state.certainty_scores
WHERE (state_id, state_type) IN ({placeholders})
""",
state_ids,
)
rows = cur.fetchall()
for row in rows:
key = (row["state_id"], row["state_type"])
results[key] = {
"certainty_score": float(row["certainty_score"]),
"data_age_seconds": row["data_age_seconds"],
"source_reliability": float(row["source_reliability"])
if row["source_reliability"]
else None,
"verification_status": float(row["verification_status"])
if row["verification_status"]
else None,
"state_type_bonus": float(row["state_type_bonus"])
if row["state_type_bonus"]
else None,
"certainty_factors": row["certainty_factors"],
"verification_sources": row["verification_sources"],
"last_verified": row["last_verified"].isoformat()
if row["last_verified"]
else None,
"updated_at": row["updated_at"].isoformat() if row["updated_at"] else None,
}
except psycopg2.Error as e:
logger.warning(f"Error querying batch certainty scores: {e}")
return results
def close(self):
"""Close database connection."""
if self._conn and not self._conn.closed:
self._conn.close()
logger.debug("PostgreSQL connection closed")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.close()