"""
XML parser for PM Counter XML files
"""
from lxml import etree
from datetime import datetime
from typing import Dict, List, Any
import hashlib
import os
class XMLParser:
"""Parse PM Counter XML files"""
def __init__(self, xml_file_path: str):
self.xml_file_path = xml_file_path
self.tree = None
self.root = None
def parse(self) -> Dict[str, Any]:
"""Parse XML file and return structured data"""
self.tree = etree.parse(self.xml_file_path)
self.root = self.tree.getroot()
# Remove namespace for easier parsing
for elem in self.root.iter():
tag = elem.tag
if isinstance(tag, str) and tag.startswith('{'):
elem.tag = tag.split('}')[1]
result = {
'file_header': self._parse_file_header(),
'network_element': self._parse_network_element(),
'measurement_intervals': self._parse_measurement_intervals(),
'threshold_alerts': self._parse_threshold_alerts(),
'data_quality': self._parse_data_quality(),
'file_footer': self._parse_file_footer(),
'file_info': self._get_file_info()
}
return result
def _parse_file_header(self) -> Dict[str, Any]:
"""Parse file header section"""
header = {}
header_elem = self.root.find('FileHeader')
if header_elem is not None:
for child in header_elem:
header[child.tag.lower()] = child.text
return header
def _parse_network_element(self) -> Dict[str, Any]:
"""Parse network element section"""
ne = {}
ne_elem = self.root.find('NetworkElement')
if ne_elem is not None:
ne['ne_name'] = ne_elem.findtext('NeName', '')
ne['ne_type'] = ne_elem.findtext('NeType', '')
location = ne_elem.find('Location')
if location is not None:
ne['site'] = location.findtext('Site', '')
ne['region'] = location.findtext('Region', '')
ne['country'] = location.findtext('Country', '')
ne['management_ip'] = ne_elem.findtext('ManagementIP', '')
return ne
def _parse_measurement_intervals(self) -> List[Dict[str, Any]]:
"""Parse all measurement intervals"""
intervals = []
pm_data = self.root.find('PMData')
if pm_data is not None:
for interval_elem in pm_data.findall('MeasurementInterval'):
interval = {
'start_time': datetime.fromisoformat(interval_elem.get('startTime').replace('Z', '+00:00')),
'end_time': datetime.fromisoformat(interval_elem.get('endTime').replace('Z', '+00:00')),
'interface_group': self._parse_interface_group(interval_elem),
'ip_group': self._parse_ip_group(interval_elem),
'tcp_group': self._parse_tcp_group(interval_elem),
'system_group': self._parse_system_group(interval_elem),
'bgp_group': self._parse_bgp_group(interval_elem)
}
intervals.append(interval)
return intervals
def _parse_interface_group(self, interval_elem) -> List[Dict[str, Any]]:
"""Parse interface group counters"""
interfaces = []
interface_group = interval_elem.find('InterfaceGroup')
if interface_group is not None:
for interface_elem in interface_group.findall('Interface'):
interface_name = interface_elem.get('name')
if_index = interface_elem.get('ifIndex')
if_type = interface_elem.get('ifType')
for counter in interface_elem.findall('Counter'):
interfaces.append({
'interface_name': interface_name,
'if_index': if_index,
'if_type': if_type,
'counter_name': counter.get('name'),
'value': float(counter.get('value')),
'unit': counter.get('unit')
})
return interfaces
def _parse_ip_group(self, interval_elem) -> List[Dict[str, Any]]:
"""Parse IP group counters"""
counters = []
ip_group = interval_elem.find('IPGroup')
if ip_group is not None:
for counter in ip_group.findall('Counter'):
counters.append({
'counter_name': counter.get('name'),
'value': float(counter.get('value')),
'unit': counter.get('unit')
})
return counters
def _parse_tcp_group(self, interval_elem) -> List[Dict[str, Any]]:
"""Parse TCP group counters"""
counters = []
tcp_group = interval_elem.find('TCPGroup')
if tcp_group is not None:
for counter in tcp_group.findall('Counter'):
counters.append({
'counter_name': counter.get('name'),
'value': float(counter.get('value')),
'unit': counter.get('unit')
})
return counters
def _parse_system_group(self, interval_elem) -> List[Dict[str, Any]]:
"""Parse system group counters"""
counters = []
system_group = interval_elem.find('SystemGroup')
if system_group is not None:
for counter in system_group.findall('Counter'):
counters.append({
'counter_name': counter.get('name'),
'value': float(counter.get('value')),
'unit': counter.get('unit')
})
return counters
def _parse_bgp_group(self, interval_elem) -> List[Dict[str, Any]]:
"""Parse BGP group counters"""
counters = []
bgp_group = interval_elem.find('BGPGroup')
if bgp_group is not None:
for peer in bgp_group.findall('Peer'):
peer_address = peer.get('address')
as_number = peer.get('asNumber')
for counter in peer.findall('Counter'):
counters.append({
'peer_address': peer_address,
'as_number': as_number,
'counter_name': counter.get('name'),
'value': float(counter.get('value')),
'unit': counter.get('unit')
})
return counters
def _parse_threshold_alerts(self) -> List[Dict[str, Any]]:
"""Parse threshold alerts"""
alerts = []
alerts_elem = self.root.find('ThresholdAlerts')
if alerts_elem is not None:
for alert in alerts_elem.findall('Alert'):
alerts.append({
'severity': alert.get('severity'),
'timestamp': datetime.fromisoformat(alert.findtext('timestamp', '').replace('Z', '+00:00')) if alert.findtext('timestamp') else None,
'parameter': alert.findtext('Parameter', ''),
'value': float(alert.findtext('Value', '0')),
'threshold': float(alert.findtext('Threshold', '0')),
'description': alert.findtext('Description', '')
})
return alerts
def _parse_data_quality(self) -> Dict[str, Any]:
"""Parse data quality indicators"""
quality = {}
quality_elem = self.root.find('DataQuality')
if quality_elem is not None:
for indicator in quality_elem.findall('Indicator'):
quality[indicator.get('name')] = {
'value': float(indicator.get('value')),
'unit': indicator.get('unit')
}
return quality
def _parse_file_footer(self) -> Dict[str, Any]:
"""Parse file footer"""
footer = {}
footer_elem = self.root.find('FileFooter')
if footer_elem is not None:
footer['record_count'] = int(footer_elem.findtext('RecordCount', '0'))
footer['intervals'] = int(footer_elem.findtext('Intervals', '0'))
footer['checksum'] = footer_elem.findtext('Checksum', '')
end_time = footer_elem.findtext('EndTime', '')
if end_time:
footer['end_time'] = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
return footer
def _get_file_info(self) -> Dict[str, Any]:
"""Get file metadata"""
file_size = os.path.getsize(self.xml_file_path)
filename = os.path.basename(self.xml_file_path)
# Calculate checksum
with open(self.xml_file_path, 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()
return {
'filename': filename,
'file_size': file_size,
'checksum': file_hash
}