Skip to main content
Glama
ApoorvBrooklyn

PM Counter Monitoring MCP Server

xml_parser.py9.04 kB
""" XML parser for PM Counter XML files """ from lxml import etree from datetime import datetime from typing import Dict, List, Any import hashlib import os class XMLParser: """Parse PM Counter XML files""" def __init__(self, xml_file_path: str): self.xml_file_path = xml_file_path self.tree = None self.root = None def parse(self) -> Dict[str, Any]: """Parse XML file and return structured data""" self.tree = etree.parse(self.xml_file_path) self.root = self.tree.getroot() # Remove namespace for easier parsing for elem in self.root.iter(): tag = elem.tag if isinstance(tag, str) and tag.startswith('{'): elem.tag = tag.split('}')[1] result = { 'file_header': self._parse_file_header(), 'network_element': self._parse_network_element(), 'measurement_intervals': self._parse_measurement_intervals(), 'threshold_alerts': self._parse_threshold_alerts(), 'data_quality': self._parse_data_quality(), 'file_footer': self._parse_file_footer(), 'file_info': self._get_file_info() } return result def _parse_file_header(self) -> Dict[str, Any]: """Parse file header section""" header = {} header_elem = self.root.find('FileHeader') if header_elem is not None: for child in header_elem: header[child.tag.lower()] = child.text return header def _parse_network_element(self) -> Dict[str, Any]: """Parse network element section""" ne = {} ne_elem = self.root.find('NetworkElement') if ne_elem is not None: ne['ne_name'] = ne_elem.findtext('NeName', '') ne['ne_type'] = ne_elem.findtext('NeType', '') location = ne_elem.find('Location') if location is not None: ne['site'] = location.findtext('Site', '') ne['region'] = location.findtext('Region', '') ne['country'] = location.findtext('Country', '') ne['management_ip'] = ne_elem.findtext('ManagementIP', '') return ne def _parse_measurement_intervals(self) -> List[Dict[str, Any]]: """Parse all measurement intervals""" intervals = [] pm_data = self.root.find('PMData') if pm_data is not None: for interval_elem in pm_data.findall('MeasurementInterval'): interval = { 'start_time': datetime.fromisoformat(interval_elem.get('startTime').replace('Z', '+00:00')), 'end_time': datetime.fromisoformat(interval_elem.get('endTime').replace('Z', '+00:00')), 'interface_group': self._parse_interface_group(interval_elem), 'ip_group': self._parse_ip_group(interval_elem), 'tcp_group': self._parse_tcp_group(interval_elem), 'system_group': self._parse_system_group(interval_elem), 'bgp_group': self._parse_bgp_group(interval_elem) } intervals.append(interval) return intervals def _parse_interface_group(self, interval_elem) -> List[Dict[str, Any]]: """Parse interface group counters""" interfaces = [] interface_group = interval_elem.find('InterfaceGroup') if interface_group is not None: for interface_elem in interface_group.findall('Interface'): interface_name = interface_elem.get('name') if_index = interface_elem.get('ifIndex') if_type = interface_elem.get('ifType') for counter in interface_elem.findall('Counter'): interfaces.append({ 'interface_name': interface_name, 'if_index': if_index, 'if_type': if_type, 'counter_name': counter.get('name'), 'value': float(counter.get('value')), 'unit': counter.get('unit') }) return interfaces def _parse_ip_group(self, interval_elem) -> List[Dict[str, Any]]: """Parse IP group counters""" counters = [] ip_group = interval_elem.find('IPGroup') if ip_group is not None: for counter in ip_group.findall('Counter'): counters.append({ 'counter_name': counter.get('name'), 'value': float(counter.get('value')), 'unit': counter.get('unit') }) return counters def _parse_tcp_group(self, interval_elem) -> List[Dict[str, Any]]: """Parse TCP group counters""" counters = [] tcp_group = interval_elem.find('TCPGroup') if tcp_group is not None: for counter in tcp_group.findall('Counter'): counters.append({ 'counter_name': counter.get('name'), 'value': float(counter.get('value')), 'unit': counter.get('unit') }) return counters def _parse_system_group(self, interval_elem) -> List[Dict[str, Any]]: """Parse system group counters""" counters = [] system_group = interval_elem.find('SystemGroup') if system_group is not None: for counter in system_group.findall('Counter'): counters.append({ 'counter_name': counter.get('name'), 'value': float(counter.get('value')), 'unit': counter.get('unit') }) return counters def _parse_bgp_group(self, interval_elem) -> List[Dict[str, Any]]: """Parse BGP group counters""" counters = [] bgp_group = interval_elem.find('BGPGroup') if bgp_group is not None: for peer in bgp_group.findall('Peer'): peer_address = peer.get('address') as_number = peer.get('asNumber') for counter in peer.findall('Counter'): counters.append({ 'peer_address': peer_address, 'as_number': as_number, 'counter_name': counter.get('name'), 'value': float(counter.get('value')), 'unit': counter.get('unit') }) return counters def _parse_threshold_alerts(self) -> List[Dict[str, Any]]: """Parse threshold alerts""" alerts = [] alerts_elem = self.root.find('ThresholdAlerts') if alerts_elem is not None: for alert in alerts_elem.findall('Alert'): alerts.append({ 'severity': alert.get('severity'), 'timestamp': datetime.fromisoformat(alert.findtext('timestamp', '').replace('Z', '+00:00')) if alert.findtext('timestamp') else None, 'parameter': alert.findtext('Parameter', ''), 'value': float(alert.findtext('Value', '0')), 'threshold': float(alert.findtext('Threshold', '0')), 'description': alert.findtext('Description', '') }) return alerts def _parse_data_quality(self) -> Dict[str, Any]: """Parse data quality indicators""" quality = {} quality_elem = self.root.find('DataQuality') if quality_elem is not None: for indicator in quality_elem.findall('Indicator'): quality[indicator.get('name')] = { 'value': float(indicator.get('value')), 'unit': indicator.get('unit') } return quality def _parse_file_footer(self) -> Dict[str, Any]: """Parse file footer""" footer = {} footer_elem = self.root.find('FileFooter') if footer_elem is not None: footer['record_count'] = int(footer_elem.findtext('RecordCount', '0')) footer['intervals'] = int(footer_elem.findtext('Intervals', '0')) footer['checksum'] = footer_elem.findtext('Checksum', '') end_time = footer_elem.findtext('EndTime', '') if end_time: footer['end_time'] = datetime.fromisoformat(end_time.replace('Z', '+00:00')) return footer def _get_file_info(self) -> Dict[str, Any]: """Get file metadata""" file_size = os.path.getsize(self.xml_file_path) filename = os.path.basename(self.xml_file_path) # Calculate checksum with open(self.xml_file_path, 'rb') as f: file_hash = hashlib.md5(f.read()).hexdigest() return { 'filename': filename, 'file_size': file_size, 'checksum': file_hash }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ApoorvBrooklyn/Networking-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server