"""
Database writer for inserting parsed XML data into PostgreSQL
"""
import logging
from typing import Dict, List, Optional, Any
import psycopg2
from psycopg2.extras import execute_values
from psycopg2 import sql
logger = logging.getLogger(__name__)
class DBWriter:
"""Database writer for PM data"""
def __init__(self, host: str, database: str, user: str, password: str, port: int = 5432):
self.host = host
self.database = database
self.user = user
self.password = password
self.port = port
self.conn: Optional[psycopg2.extensions.connection] = None
def connect(self) -> bool:
"""Establish database connection"""
try:
self.conn = psycopg2.connect(
host=self.host,
database=self.database,
user=self.user,
password=self.password,
port=self.port
)
self.conn.autocommit = False
logger.info(f"Connected to database {self.database}")
return True
except Exception as e:
logger.error(f"Failed to connect to database: {e}")
return False
def disconnect(self):
"""Close database connection"""
if self.conn:
self.conn.close()
logger.info("Disconnected from database")
def get_processed_checksums(self) -> set:
"""Get set of all processed file checksums"""
checksums = set()
try:
with self.conn.cursor() as cur:
cur.execute("SELECT checksum FROM pm_files")
checksums = {row[0] for row in cur.fetchall()}
except Exception as e:
logger.error(f"Failed to get processed checksums: {e}")
return checksums
def file_exists(self, checksum: str) -> bool:
"""Check if file with given checksum already exists"""
try:
with self.conn.cursor() as cur:
cur.execute("SELECT id FROM pm_files WHERE checksum = %s", (checksum,))
return cur.fetchone() is not None
except Exception as e:
logger.error(f"Failed to check file existence: {e}")
return False
def insert_file_data(self, filename: str, checksum: str, file_header: Dict[str, Any],
network_element: Dict[str, Any], intervals: List[Dict[str, Any]],
alerts: List[Dict[str, Any]], quality_indicators: List[Dict[str, Any]]) -> Optional[int]:
"""Insert complete file data into database"""
if not self.conn:
raise ConnectionError("Not connected to database")
try:
with self.conn.cursor() as cur:
# Insert file record
cur.execute("""
INSERT INTO pm_files (filename, checksum, generation_time, node_id, vendor,
equipment_type, software_version, reporting_period,
file_format_version, collection_frequency, processed_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
RETURNING id
""", (
filename,
checksum,
file_header.get("generation_time"),
file_header.get("node_id"),
file_header.get("vendor"),
file_header.get("equipment_type"),
file_header.get("software_version"),
file_header.get("reporting_period"),
file_header.get("file_format_version"),
file_header.get("collection_frequency"),
))
file_id = cur.fetchone()[0]
# Insert network element
if network_element:
cur.execute("""
INSERT INTO network_elements (file_id, ne_name, ne_type, site, region, country, management_ip)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
file_id,
network_element.get("ne_name"),
network_element.get("ne_type"),
network_element.get("site"),
network_element.get("region"),
network_element.get("country"),
network_element.get("management_ip"),
))
# Insert measurement intervals and their data
for interval in intervals:
cur.execute("""
INSERT INTO measurement_intervals (file_id, start_time, end_time)
VALUES (%s, %s, %s)
RETURNING id
""", (
file_id,
interval["start_time"],
interval["end_time"],
))
interval_id = cur.fetchone()[0]
# Insert interfaces and their counters
for interface in interval.get("interfaces", []):
cur.execute("""
INSERT INTO interfaces (interval_id, name, if_index, if_type)
VALUES (%s, %s, %s, %s)
RETURNING id
""", (
interval_id,
interface["name"],
interface.get("if_index"),
interface.get("if_type"),
))
interface_id = cur.fetchone()[0]
# Insert interface counters
if interface.get("counters"):
counter_data = [
(interface_id, c["name"], c.get("value"), c.get("unit"))
for c in interface["counters"]
]
execute_values(
cur,
"INSERT INTO interface_counters (interface_id, counter_name, value, unit) VALUES %s",
counter_data
)
# Insert IP counters
if interval.get("ip_counters"):
ip_counter_data = [
(interval_id, c["name"], c.get("value"), c.get("unit"))
for c in interval["ip_counters"]
]
execute_values(
cur,
"INSERT INTO ip_counters (interval_id, counter_name, value, unit) VALUES %s",
ip_counter_data
)
# Insert TCP counters
if interval.get("tcp_counters"):
tcp_counter_data = [
(interval_id, c["name"], c.get("value"), c.get("unit"))
for c in interval["tcp_counters"]
]
execute_values(
cur,
"INSERT INTO tcp_counters (interval_id, counter_name, value, unit) VALUES %s",
tcp_counter_data
)
# Insert System counters
if interval.get("system_counters"):
system_counter_data = [
(interval_id, c["name"], c.get("value"), c.get("unit"))
for c in interval["system_counters"]
]
execute_values(
cur,
"INSERT INTO system_counters (interval_id, counter_name, value, unit) VALUES %s",
system_counter_data
)
# Insert BGP peers and counters
for bgp_peer in interval.get("bgp_data", []):
cur.execute("""
INSERT INTO bgp_peers (interval_id, address, as_number)
VALUES (%s, %s, %s)
RETURNING id
""", (
interval_id,
bgp_peer["address"],
bgp_peer.get("as_number"),
))
peer_id = cur.fetchone()[0]
# Insert BGP counters
if bgp_peer.get("counters"):
bgp_counter_data = [
(peer_id, c["name"], c.get("value"), c.get("unit"))
for c in bgp_peer["counters"]
]
execute_values(
cur,
"INSERT INTO bgp_counters (peer_id, counter_name, value, unit) VALUES %s",
bgp_counter_data
)
# Insert threshold alerts
if alerts:
alert_data = [
(file_id, a.get("severity"), a.get("timestamp"), a.get("parameter"),
a.get("value"), a.get("threshold"), a.get("description"))
for a in alerts
]
execute_values(
cur,
"""INSERT INTO threshold_alerts
(file_id, severity, timestamp, parameter, value, threshold, description)
VALUES %s""",
alert_data
)
# Insert data quality indicators
if quality_indicators:
quality_data = [
(file_id, q["name"], q.get("value"), q.get("unit"))
for q in quality_indicators
]
execute_values(
cur,
"INSERT INTO data_quality (file_id, indicator_name, value, unit) VALUES %s",
quality_data
)
self.conn.commit()
logger.info(f"Successfully inserted file {filename} (ID: {file_id})")
return file_id
except Exception as e:
self.conn.rollback()
logger.error(f"Failed to insert file data: {e}")
raise
def get_fetch_interval(self) -> int:
"""Get current fetch interval in minutes"""
try:
with self.conn.cursor() as cur:
cur.execute("SELECT value FROM config WHERE key = 'fetch_interval_minutes'")
result = cur.fetchone()
if result:
return int(result[0])
return 5 # default
except Exception as e:
logger.error(f"Failed to get fetch interval: {e}")
return 5
def update_fetch_interval(self, minutes: int) -> bool:
"""Update fetch interval in minutes"""
try:
with self.conn.cursor() as cur:
cur.execute("""
UPDATE config SET value = %s, updated_at = CURRENT_TIMESTAMP
WHERE key = 'fetch_interval_minutes'
""", (str(minutes),))
self.conn.commit()
logger.info(f"Updated fetch interval to {minutes} minutes")
return True
except Exception as e:
self.conn.rollback()
logger.error(f"Failed to update fetch interval: {e}")
return False
def __enter__(self):
"""Context manager entry"""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.disconnect()