Skip to main content
Glama

PTP MCP Server

by aneeshkp
MIT License
1
ptp_log_parser.py18.4 kB
#!/usr/bin/env python3 """ PTP Log Parser - Parses linuxptp daemon logs and extracts structured information """ import json import logging import re import subprocess from typing import Dict, List, Optional, Any, Tuple from datetime import datetime, timedelta from dataclasses import dataclass logger = logging.getLogger(__name__) @dataclass class LogEntry: """Structured log entry""" timestamp: datetime component: str level: str message: str parsed_data: Dict[str, Any] class PTPLogParser: """Parser for linuxptp daemon logs""" def __init__(self): self.namespace = "openshift-ptp" self.daemon_name = "linuxptp-daemon" self.container_name = "linuxptp-daemon-container" # Regex patterns for different log components self.patterns = { "timestamp": r"(\d{2}:\d{2}:\d{2}\.\d{6})", "ptp4l": r"ptp4l\[(\d+\.\d+)\]:\s*\[([^\]]+)\]\s*(.+)", "phc2sys": r"phc2sys\[(\d+\.\d+)\]:\s*\[([^\]]+)\]\s*(.+)", "ts2phc": r"ts2phc\[(\d+\.\d+)\]:\s*\[([^\]]+)\]\s*(.+)", "dpll": r"dpll\[(\d+)\]:\[([^\]]+)\]\s*(.+)", "gnss": r"gnss\[(\d+)\]:\[([^\]]+)\]\s*(.+)", "gm": r"GM\[(\d+)\]:\[([^\]]+)\]\s*(.+)", "go_log": r"I(\d{4})\s+(\d{2}:\d{2}:\d{2}\.\d{6})\s+(\d+)\s+([^:]+):(\d+)\]\s*(.+)" } async def get_ptp_logs(self, namespace: str = None, lines: int = 1000, since: str = None) -> List[LogEntry]: """Get PTP logs from OpenShift cluster""" if namespace is None: namespace = self.namespace try: # Build oc command cmd = [ "oc", "logs", f"ds/{self.daemon_name}", "-c", self.container_name, "-n", namespace, "--tail", str(lines) ] if since: cmd.extend(["--since", since]) result = subprocess.run( cmd, capture_output=True, text=True, timeout=60 ) if result.returncode != 0: raise Exception(f"Failed to get PTP logs: {result.stderr}") # Parse log lines log_lines = result.stdout.strip().split('\n') return [self._parse_log_line(line) for line in log_lines if line.strip()] except subprocess.TimeoutExpired: raise Exception("Timeout getting PTP logs") except Exception as e: logger.error(f"Error getting PTP logs: {str(e)}") raise def _parse_log_line(self, line: str) -> LogEntry: """Parse individual log line into structured format""" # Try to parse timestamp first timestamp_match = re.search(self.patterns["timestamp"], line) timestamp = None if timestamp_match: try: timestamp = datetime.strptime(timestamp_match.group(1), "%H:%M:%S.%f") # Use current date if timestamp doesn't include date timestamp = timestamp.replace(year=datetime.now().year, month=datetime.now().month, day=datetime.now().day) except ValueError: pass # Try different log formats parsed_data = {} component = "unknown" level = "info" message = line # Try Go-style logs first go_match = re.match(self.patterns["go_log"], line) if go_match: level = "info" timestamp_str = go_match.group(2) try: timestamp = datetime.strptime(timestamp_str, "%H:%M:%S.%f") timestamp = timestamp.replace(year=datetime.now().year, month=datetime.now().month, day=datetime.now().day) except ValueError: pass component = go_match.group(4) message = go_match.group(6) parsed_data = self._parse_go_log_message(message, component) else: # Try PTP component logs for comp_name, pattern in self.patterns.items(): if comp_name in ["timestamp", "go_log"]: continue match = re.match(pattern, line) if match: component = comp_name message = match.group(3) parsed_data = self._parse_component_message(message, component) break return LogEntry( timestamp=timestamp or datetime.now(), component=component, level=level, message=message, parsed_data=parsed_data ) def _parse_go_log_message(self, message: str, component: str) -> Dict[str, Any]: """Parse Go-style log messages""" parsed = {} # Parse DPLL messages if "dpll" in component.lower(): parsed.update(self._parse_dpll_message(message)) elif "gnss" in component.lower(): parsed.update(self._parse_gnss_message(message)) elif "stats" in component.lower(): parsed.update(self._parse_stats_message(message)) elif "event" in component.lower(): parsed.update(self._parse_event_message(message)) return parsed def _parse_dpll_message(self, message: str) -> Dict[str, Any]: """Parse DPLL-related messages""" parsed = {} # Extract offset information offset_match = re.search(r"offset to (-?\d+) ns", message) if offset_match: parsed["offset_ns"] = int(offset_match.group(1)) # Extract clock ID clock_match = re.search(r"clock id (\d+)", message) if clock_match: parsed["clock_id"] = clock_match.group(1) # Extract interface iface_match = re.search(r"iface (\S+)", message) if iface_match: parsed["interface"] = iface_match.group(1) # Extract DPLL state state_match = re.search(r"state is ([^(]+)", message) if state_match: parsed["dpll_state"] = state_match.group(1) # Extract decision information decision_match = re.search(r"decision: Status (\d+), Offset (-?\d+), In spec (\w+), Source GNSS lost (\w+), On holdover (\w+)", message) if decision_match: parsed["status"] = int(decision_match.group(1)) parsed["offset"] = int(decision_match.group(2)) parsed["in_spec"] = decision_match.group(3) == "true" parsed["source_lost"] = decision_match.group(4) == "true" parsed["on_holdover"] = decision_match.group(5) == "true" return parsed def _parse_gnss_message(self, message: str) -> Dict[str, Any]: """Parse GNSS-related messages""" parsed = {} # Extract GNSS status status_match = re.search(r"gnss_status (\d+)", message) if status_match: parsed["gnss_status"] = int(status_match.group(1)) # Extract offset offset_match = re.search(r"offset (\d+)", message) if offset_match: parsed["offset"] = int(offset_match.group(1)) return parsed def _parse_stats_message(self, message: str) -> Dict[str, Any]: """Parse stats-related messages""" parsed = {} # Extract state update state_match = re.search(r"state updated for (\w+) =(\w+)", message) if state_match: parsed["component"] = state_match.group(1) parsed["state"] = state_match.group(2) return parsed def _parse_event_message(self, message: str) -> Dict[str, Any]: """Parse event-related messages""" parsed = {} # Extract state information state_match = re.search(r"dpll State (\w+), gnss State (\w+), tsphc state (\w+), gm state (\w+)", message) if state_match: parsed["dpll_state"] = state_match.group(1) parsed["gnss_state"] = state_match.group(2) parsed["ts2phc_state"] = state_match.group(3) parsed["gm_state"] = state_match.group(4) return parsed def _parse_component_message(self, message: str, component: str) -> Dict[str, Any]: """Parse PTP component messages (ptp4l, phc2sys, ts2phc)""" parsed = {} if component == "phc2sys": parsed.update(self._parse_phc2sys_message(message)) elif component == "ts2phc": parsed.update(self._parse_ts2phc_message(message)) elif component == "ptp4l": parsed.update(self._parse_ptp4l_message(message)) elif component == "dpll": parsed.update(self._parse_dpll_component_message(message)) elif component == "gnss": parsed.update(self._parse_gnss_component_message(message)) elif component == "gm": parsed.update(self._parse_gm_message(message)) return parsed def _parse_phc2sys_message(self, message: str) -> Dict[str, Any]: """Parse phc2sys messages""" parsed = {} # Extract offset, frequency, and delay match = re.search(r"CLOCK_REALTIME phc offset\s+(-?\d+)\s+(\w+)\s+freq\s+(-?\d+)\s+delay\s+(\d+)", message) if match: parsed["offset"] = int(match.group(1)) parsed["state"] = match.group(2) parsed["frequency"] = int(match.group(3)) parsed["delay"] = int(match.group(4)) return parsed def _parse_ts2phc_message(self, message: str) -> Dict[str, Any]: """Parse ts2phc messages""" parsed = {} # Extract offset and frequency match = re.search(r"offset\s+(-?\d+)\s+(\w+)\s+freq\s+([+-]\d+)", message) if match: parsed["offset"] = int(match.group(1)) parsed["state"] = match.group(2) parsed["frequency"] = int(match.group(3)) # Extract NMEA information nmea_match = re.search(r"nmea delay: (\d+) ns", message) if nmea_match: parsed["nmea_delay_ns"] = int(nmea_match.group(1)) # Extract NMEA sentences nmea_sentence_match = re.search(r"nmea sentence: ([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)", message) if nmea_sentence_match: parsed["nmea_type"] = nmea_sentence_match.group(1) parsed["nmea_time"] = nmea_sentence_match.group(2) parsed["nmea_lat"] = nmea_sentence_match.group(3) parsed["nmea_lat_dir"] = nmea_sentence_match.group(4) parsed["nmea_lon"] = nmea_sentence_match.group(5) parsed["nmea_lon_dir"] = nmea_sentence_match.group(6) return parsed def _parse_ptp4l_message(self, message: str) -> Dict[str, Any]: """Parse ptp4l messages""" parsed = {} # Extract BMCA information bmca_match = re.search(r"selected (\w+) clock", message) if bmca_match: parsed["selected_clock"] = bmca_match.group(1) # Extract port state port_match = re.search(r"port (\d+): ([\w\s]+)", message) if port_match: parsed["port"] = int(port_match.group(1)) parsed["port_state"] = port_match.group(2).strip() return parsed def _parse_dpll_component_message(self, message: str) -> Dict[str, Any]: """Parse DPLL component messages""" parsed = {} # Extract frequency status, offset, phase status, PPS status match = re.search(r"(\w+) frequency_status (\d+) offset (-?\d+) phase_status (\d+) pps_status (\d+) (\w+)", message) if match: parsed["interface"] = match.group(1) parsed["frequency_status"] = int(match.group(2)) parsed["offset"] = int(match.group(3)) parsed["phase_status"] = int(match.group(4)) parsed["pps_status"] = int(match.group(5)) parsed["state"] = match.group(6) return parsed def _parse_gnss_component_message(self, message: str) -> Dict[str, Any]: """Parse GNSS component messages""" parsed = {} # Extract GNSS status and offset match = re.search(r"(\w+) gnss_status (\d+) offset (\d+) (\w+)", message) if match: parsed["interface"] = match.group(1) parsed["gnss_status"] = int(match.group(2)) parsed["offset"] = int(match.group(3)) parsed["state"] = match.group(4) return parsed def _parse_gm_message(self, message: str) -> Dict[str, Any]: """Parse GM (Grandmaster) messages""" parsed = {} # Extract GM status match = re.search(r"(\w+) T-GM-STATUS (\w+)", message) if match: parsed["interface"] = match.group(1) parsed["gm_status"] = match.group(2) return parsed def search_logs(self, logs: List[LogEntry], query: str, time_range: str = None, log_level: str = None) -> List[LogEntry]: """Search logs for specific patterns""" filtered_logs = logs # Apply time range filter if time_range: cutoff_time = self._get_cutoff_time(time_range) filtered_logs = [log for log in filtered_logs if log.timestamp >= cutoff_time] # Apply log level filter if log_level: filtered_logs = [log for log in filtered_logs if log.level == log_level] # Apply query filter if query: query_lower = query.lower() filtered_logs = [ log for log in filtered_logs if query_lower in log.message.lower() or any(query_lower in str(v).lower() for v in log.parsed_data.values()) ] return filtered_logs def _get_cutoff_time(self, time_range: str) -> datetime: """Get cutoff time based on time range string""" now = datetime.now() if time_range == "last_hour": return now - timedelta(hours=1) elif time_range == "last_day": return now - timedelta(days=1) elif time_range == "last_week": return now - timedelta(weeks=1) elif time_range.startswith("last_"): # Parse custom time ranges like "last_30m", "last_2h" import re match = re.match(r"last_(\d+)([mhd])", time_range) if match: amount = int(match.group(1)) unit = match.group(2) if unit == "m": return now - timedelta(minutes=amount) elif unit == "h": return now - timedelta(hours=amount) elif unit == "d": return now - timedelta(days=amount) # Default to last hour return now - timedelta(hours=1) def extract_grandmaster_info(self, logs: List[LogEntry]) -> Dict[str, Any]: """Extract grandmaster information from logs""" gm_info = { "status": "unknown", "interface": None, "last_seen": None, "offset": None, "frequency": None } # Look for GM status messages gm_logs = [log for log in logs if log.component == "gm"] if gm_logs: latest_gm = max(gm_logs, key=lambda x: x.timestamp) gm_info["status"] = latest_gm.parsed_data.get("gm_status", "unknown") gm_info["interface"] = latest_gm.parsed_data.get("interface") gm_info["last_seen"] = latest_gm.timestamp # Look for phc2sys offset information phc2sys_logs = [log for log in logs if log.component == "phc2sys"] if phc2sys_logs: latest_phc2sys = max(phc2sys_logs, key=lambda x: x.timestamp) gm_info["offset"] = latest_phc2sys.parsed_data.get("offset") gm_info["frequency"] = latest_phc2sys.parsed_data.get("frequency") return gm_info def extract_sync_status(self, logs: List[LogEntry]) -> Dict[str, Any]: """Extract synchronization status from logs""" sync_status = { "dpll_locked": False, "gnss_available": False, "offset_in_range": False, "last_offset": None, "last_update": None } # Look for DPLL decision messages dpll_logs = [log for log in logs if "dpll" in log.component.lower() and "decision" in log.message] if dpll_logs: latest_dpll = max(dpll_logs, key=lambda x: x.timestamp) parsed = latest_dpll.parsed_data sync_status["dpll_locked"] = parsed.get("status", 0) == 3 sync_status["offset_in_range"] = parsed.get("in_spec", False) sync_status["last_offset"] = parsed.get("offset") sync_status["last_update"] = latest_dpll.timestamp # Look for GNSS status gnss_logs = [log for log in logs if "gnss" in log.component.lower()] if gnss_logs: latest_gnss = max(gnss_logs, key=lambda x: x.timestamp) sync_status["gnss_available"] = latest_gnss.parsed_data.get("gnss_status", 0) > 0 return sync_status def extract_clock_hierarchy(self, logs: List[LogEntry]) -> Dict[str, Any]: """Extract clock hierarchy information from logs""" hierarchy = { "grandmaster": None, "boundary_clocks": [], "ordinary_clocks": [], "transparent_clocks": [] } # Look for ptp4l BMCA messages ptp4l_logs = [log for log in logs if log.component == "ptp4l" and "selected" in log.message] for log in ptp4l_logs: if "grandmaster" in log.parsed_data.get("selected_clock", "").lower(): hierarchy["grandmaster"] = { "status": "active", "last_seen": log.timestamp } return hierarchy

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aneeshkp/ptp-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server