"""
XML Parser for Performance Monitoring XML files
"""
import logging
from datetime import datetime
from typing import Dict, List, Optional, Any
from pathlib import Path
from lxml import etree
logger = logging.getLogger(__name__)
class XMLParser:
"""Parser for PM XML files"""
def __init__(self, xml_file_path: str):
self.xml_file_path = Path(xml_file_path)
self.tree: Optional[etree.ElementTree] = None
self.root: Optional[etree.Element] = None
self.namespace = {"pm": "http://schemas.examplenet.com/pm/3.0"}
def parse(self) -> bool:
"""Parse XML file"""
try:
self.tree = etree.parse(str(self.xml_file_path))
self.root = self.tree.getroot()
return True
except Exception as e:
logger.error(f"Failed to parse XML file {self.xml_file_path}: {e}")
return False
def get_file_header(self) -> Dict[str, Any]:
"""Extract file header information"""
header = {}
try:
file_header = self.root.find(".//pm:FileHeader", self.namespace)
if file_header is not None:
header = {
"vendor": self._get_text(file_header, "pm:Vendor"),
"equipment_type": self._get_text(file_header, "pm:EquipmentType"),
"software_version": self._get_text(file_header, "pm:SoftwareVersion"),
"node_id": self._get_text(file_header, "pm:NodeId"),
"generation_time": self._parse_datetime(self._get_text(file_header, "pm:GenerationTime")),
"reporting_period": self._get_text(file_header, "pm:ReportingPeriod"),
"file_format_version": self._get_text(file_header, "pm:FileFormatVersion"),
"collection_frequency": self._get_text(file_header, "pm:CollectionFrequency"),
}
except Exception as e:
logger.error(f"Failed to extract file header: {e}")
return header
def get_network_element(self) -> Dict[str, Any]:
"""Extract network element information"""
ne_info = {}
try:
ne = self.root.find(".//pm:NetworkElement", self.namespace)
if ne is not None:
location = ne.find("pm:Location", self.namespace)
ne_info = {
"ne_name": self._get_text(ne, "pm:NeName"),
"ne_type": self._get_text(ne, "pm:NeType"),
"site": self._get_text(location, "pm:Site") if location is not None else None,
"region": self._get_text(location, "pm:Region") if location is not None else None,
"country": self._get_text(location, "pm:Country") if location is not None else None,
"management_ip": self._get_text(ne, "pm:ManagementIP"),
}
except Exception as e:
logger.error(f"Failed to extract network element: {e}")
return ne_info
def get_measurement_intervals(self) -> List[Dict[str, Any]]:
"""Extract all measurement intervals with their data"""
intervals = []
try:
interval_elements = self.root.findall(".//pm:MeasurementInterval", self.namespace)
for interval_elem in interval_elements:
interval_data = {
"start_time": self._parse_datetime(interval_elem.get("startTime")),
"end_time": self._parse_datetime(interval_elem.get("endTime")),
"interfaces": self._extract_interfaces(interval_elem),
"ip_counters": self._extract_counters(interval_elem, "pm:IPGroup"),
"tcp_counters": self._extract_counters(interval_elem, "pm:TCPGroup"),
"system_counters": self._extract_counters(interval_elem, "pm:SystemGroup"),
"bgp_data": self._extract_bgp_data(interval_elem),
}
intervals.append(interval_data)
except Exception as e:
logger.error(f"Failed to extract measurement intervals: {e}")
return intervals
def _extract_interfaces(self, interval_elem: etree.Element) -> List[Dict[str, Any]]:
"""Extract interface data from interval"""
interfaces = []
try:
interface_group = interval_elem.find("pm:InterfaceGroup", self.namespace)
if interface_group is not None:
for interface_elem in interface_group.findall("pm:Interface", self.namespace):
interface_data = {
"name": interface_elem.get("name"),
"if_index": self._parse_int(interface_elem.get("ifIndex")),
"if_type": interface_elem.get("ifType"),
"counters": []
}
for counter_elem in interface_elem.findall("pm:Counter", self.namespace):
counter_data = {
"name": counter_elem.get("name"),
"value": self._parse_numeric(counter_elem.get("value")),
"unit": counter_elem.get("unit"),
}
interface_data["counters"].append(counter_data)
interfaces.append(interface_data)
except Exception as e:
logger.error(f"Failed to extract interfaces: {e}")
return interfaces
def _extract_counters(self, interval_elem: etree.Element, group_name: str) -> List[Dict[str, Any]]:
"""Extract counters from a group (IP, TCP, System)"""
counters = []
try:
group = interval_elem.find(group_name, self.namespace)
if group is not None:
for counter_elem in group.findall("pm:Counter", self.namespace):
counter_data = {
"name": counter_elem.get("name"),
"value": self._parse_numeric(counter_elem.get("value")),
"unit": counter_elem.get("unit"),
}
counters.append(counter_data)
except Exception as e:
logger.error(f"Failed to extract counters from {group_name}: {e}")
return counters
def _extract_bgp_data(self, interval_elem: etree.Element) -> List[Dict[str, Any]]:
"""Extract BGP peer data"""
bgp_data = []
try:
bgp_group = interval_elem.find("pm:BGPGroup", self.namespace)
if bgp_group is not None:
for peer_elem in bgp_group.findall("pm:Peer", self.namespace):
peer_data = {
"address": peer_elem.get("address"),
"as_number": self._parse_int(peer_elem.get("asNumber")),
"counters": []
}
for counter_elem in peer_elem.findall("pm:Counter", self.namespace):
counter_data = {
"name": counter_elem.get("name"),
"value": self._parse_numeric(counter_elem.get("value")),
"unit": counter_elem.get("unit"),
}
peer_data["counters"].append(counter_data)
bgp_data.append(peer_data)
except Exception as e:
logger.error(f"Failed to extract BGP data: {e}")
return bgp_data
def get_threshold_alerts(self) -> List[Dict[str, Any]]:
"""Extract threshold alerts"""
alerts = []
try:
alerts_elem = self.root.find(".//pm:ThresholdAlerts", self.namespace)
if alerts_elem is not None:
for alert_elem in alerts_elem.findall("pm:Alert", self.namespace):
alert_data = {
"severity": alert_elem.get("severity"),
"timestamp": self._parse_datetime(alert_elem.get("timestamp")),
"parameter": self._get_text(alert_elem, "pm:Parameter"),
"value": self._parse_numeric(self._get_text(alert_elem, "pm:Value")),
"threshold": self._parse_numeric(self._get_text(alert_elem, "pm:Threshold")),
"description": self._get_text(alert_elem, "pm:Description"),
}
alerts.append(alert_data)
except Exception as e:
logger.error(f"Failed to extract threshold alerts: {e}")
return alerts
def get_data_quality(self) -> List[Dict[str, Any]]:
"""Extract data quality indicators"""
indicators = []
try:
dq_elem = self.root.find(".//pm:DataQuality", self.namespace)
if dq_elem is not None:
for indicator_elem in dq_elem.findall("pm:Indicator", self.namespace):
indicator_data = {
"name": indicator_elem.get("name"),
"value": self._parse_numeric(indicator_elem.get("value")),
"unit": indicator_elem.get("unit"),
}
indicators.append(indicator_data)
except Exception as e:
logger.error(f"Failed to extract data quality: {e}")
return indicators
def _get_text(self, parent: etree.Element, xpath: str) -> Optional[str]:
"""Get text content from element"""
elem = parent.find(xpath, self.namespace)
return elem.text if elem is not None and elem.text else None
def _parse_datetime(self, dt_str: Optional[str]) -> Optional[datetime]:
"""Parse ISO datetime string"""
if not dt_str:
return None
try:
# Handle ISO format with timezone
if dt_str.endswith('Z'):
dt_str = dt_str[:-1] + '+00:00'
return datetime.fromisoformat(dt_str.replace('Z', '+00:00'))
except Exception as e:
logger.warning(f"Failed to parse datetime {dt_str}: {e}")
return None
def _parse_int(self, value: Optional[str]) -> Optional[int]:
"""Parse integer value"""
if not value:
return None
try:
return int(value)
except (ValueError, TypeError):
return None
def _parse_numeric(self, value: Optional[str]) -> Optional[float]:
"""Parse numeric value"""
if not value:
return None
try:
return float(value)
except (ValueError, TypeError):
return None