"""
Data storage layer for saving parsed XML data to database
"""
from sqlalchemy.orm import Session
from database import (
FileRecord, NetworkElement, MeasurementInterval,
InterfaceCounter, IPCounter, TCPCounter, SystemCounter,
BGPCounter, ThresholdAlert
)
from datetime import datetime
from typing import Dict, Any, List
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataStorage:
"""Handle data storage operations"""
def __init__(self, db: Session):
self.db = db
def save_file_data(self, parsed_data: Dict[str, Any]) -> int:
"""Save complete parsed XML data to database"""
try:
# Save file record
file_info = parsed_data['file_info']
file_record = FileRecord(
filename=file_info['filename'],
downloaded_at=datetime.utcnow(),
file_size=file_info['file_size'],
checksum=file_info['checksum']
)
self.db.add(file_record)
self.db.flush()
# Save or get network element
ne_data = parsed_data['network_element']
network_element = self.db.query(NetworkElement).filter(
NetworkElement.ne_name == ne_data['ne_name']
).first()
if not network_element:
network_element = NetworkElement(
ne_name=ne_data['ne_name'],
ne_type=ne_data.get('ne_type', ''),
site=ne_data.get('site', ''),
region=ne_data.get('region', ''),
country=ne_data.get('country', ''),
management_ip=ne_data.get('management_ip', '')
)
self.db.add(network_element)
self.db.flush()
# Save measurement intervals
for interval_data in parsed_data['measurement_intervals']:
interval = MeasurementInterval(
file_record_id=file_record.id,
network_element_id=network_element.id,
start_time=interval_data['start_time'],
end_time=interval_data['end_time']
)
self.db.add(interval)
self.db.flush()
# Save interface counters
for if_counter in interval_data['interface_group']:
counter = InterfaceCounter(
interval_id=interval.id,
interface_name=if_counter['interface_name'],
if_index=if_counter.get('if_index', ''),
if_type=if_counter.get('if_type', ''),
counter_name=if_counter['counter_name'],
value=if_counter['value'],
unit=if_counter['unit']
)
self.db.add(counter)
# Save IP counters
for ip_counter in interval_data['ip_group']:
counter = IPCounter(
interval_id=interval.id,
counter_name=ip_counter['counter_name'],
value=ip_counter['value'],
unit=ip_counter['unit']
)
self.db.add(counter)
# Save TCP counters
for tcp_counter in interval_data['tcp_group']:
counter = TCPCounter(
interval_id=interval.id,
counter_name=tcp_counter['counter_name'],
value=tcp_counter['value'],
unit=tcp_counter['unit']
)
self.db.add(counter)
# Save system counters
for sys_counter in interval_data['system_group']:
counter = SystemCounter(
interval_id=interval.id,
counter_name=sys_counter['counter_name'],
value=sys_counter['value'],
unit=sys_counter['unit']
)
self.db.add(counter)
# Save BGP counters
for bgp_counter in interval_data['bgp_group']:
counter = BGPCounter(
interval_id=interval.id,
peer_address=bgp_counter.get('peer_address', ''),
as_number=bgp_counter.get('as_number', ''),
counter_name=bgp_counter['counter_name'],
value=bgp_counter['value'],
unit=bgp_counter['unit']
)
self.db.add(counter)
# Save threshold alerts
for alert_data in parsed_data['threshold_alerts']:
alert = ThresholdAlert(
file_record_id=file_record.id,
severity=alert_data.get('severity', ''),
timestamp=alert_data.get('timestamp'),
parameter=alert_data.get('parameter', ''),
value=alert_data.get('value', 0),
threshold=alert_data.get('threshold', 0),
description=alert_data.get('description', '')
)
self.db.add(alert)
# Mark file as processed
file_record.processed_at = datetime.utcnow()
self.db.commit()
logger.info(f"Successfully saved data from {file_info['filename']}")
return file_record.id
except Exception as e:
self.db.rollback()
logger.error(f"Failed to save data: {e}")
raise
def file_already_processed(self, filename: str) -> bool:
"""Check if file has already been processed"""
file_record = self.db.query(FileRecord).filter(
FileRecord.filename == filename
).first()
return file_record is not None and file_record.processed_at is not None