Skip to main content
Glama
evt_parser.py16 kB
"""Parser for Windows Event Logs.""" import datetime import platform from pathlib import Path from typing import Any, Dict, Iterator, List, Optional, Union from uuid import UUID from ..core.models import AnalysisResult, LogRecord, LogSource from .base import BaseParser # Only import Windows-specific modules on Windows if platform.system() == "Windows": import win32evtlog import win32evtlogutil import win32api import win32con from win32con import EVENTLOG_BACKWARDS_READ, EVENTLOG_SEQUENTIAL_READ import pywintypes import xml.etree.ElementTree as ET else: # Mock objects for non-Windows platforms win32evtlog = None win32evtlogutil = None win32api = None win32con = None pywintypes = None ET = None EVENTLOG_BACKWARDS_READ = None EVENTLOG_SEQUENTIAL_READ = None class EventLogParser(BaseParser): """Parser for Windows Event Logs.""" def __init__(self, config: Optional[Dict[str, Any]] = None): """Initialize the Windows Event Log parser. Args: config: Optional configuration dictionary """ super().__init__(config) if platform.system() != "Windows": raise RuntimeError("Windows Event Log parser is only available on Windows") def parse_file(self, source: LogSource, file_path: Path) -> Iterator[LogRecord]: """Parse a Windows Event Log file. Args: source: The log source file_path: Path to the event log file Yields: LogRecord: Parsed log records """ # Windows Event Logs are typically accessed by name, not file path # This method would need special handling for .evt/.evtx files raise NotImplementedError( "Direct file parsing not implemented for Windows Event Logs" ) def parse_content(self, source: LogSource, content: str) -> Iterator[LogRecord]: """Parse Windows Event Log content. Args: source: The log source content: The log content (not used for Windows Event Logs) Yields: LogRecord: Parsed log records """ # Windows Event Logs are binary and accessed via API, not text content raise NotImplementedError( "Content parsing not applicable for Windows Event Logs" ) def parse( self, path: str, filters: Optional[Dict[str, Any]] = None, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, limit: int = 100, offset: int = 0, ) -> List[LogRecord]: """Parse Windows Event Logs. Args: path: Event log name (e.g., "System", "Application") filters: Optional filters start_time: Start time filter end_time: End time filter limit: Maximum number of records offset: Number of records to skip Returns: List of parsed log records """ if platform.system() != "Windows": return [] records = [] # Handle both standard logs (System, Application) and custom logs # Custom logs like "Microsoft-Service Fabric/Admin" need the newer API if "/" in path or "\\" in path: # This is a custom Application and Services log - use newer EvtQuery API records = self._parse_custom_event_log( path, filters, start_time, end_time, limit, offset ) else: # Standard log - use legacy API try: hand = win32evtlog.OpenEventLog(None, path) flags = EVENTLOG_BACKWARDS_READ | EVENTLOG_SEQUENTIAL_READ # Continue reading until we have enough records events_read = 0 while len(records) < limit: events = win32evtlog.ReadEventLog(hand, flags, 0) if not events: break # No more events to read for event in events: if events_read >= offset: record = self._parse_event(event, path) if self._matches_filters( record, filters, start_time, end_time ): records.append(record) if len(records) >= limit: break events_read += 1 win32evtlog.CloseEventLog(hand) except Exception as e: # Log error or handle appropriately print(f"Error parsing Windows Event Log '{path}': {str(e)}") pass return records def analyze( self, logs: List[LogRecord], analysis_type: str = "summary" ) -> AnalysisResult: """Analyze Windows Event Logs. Args: logs: List of log records to analyze analysis_type: Type of analysis to perform Returns: Analysis result """ if analysis_type == "summary": return self._summary_analysis(logs) elif analysis_type == "pattern": return self._pattern_analysis(logs) elif analysis_type == "anomaly": return self._anomaly_analysis(logs) else: raise ValueError(f"Unknown analysis type: {analysis_type}") def _parse_custom_event_log( self, path: str, filters: Optional[Dict[str, Any]], start_time: Optional[datetime.datetime], end_time: Optional[datetime.datetime], limit: int, offset: int, ) -> List[LogRecord]: """Parse custom Application and Services logs using the newer Windows Event Log API.""" records = [] try: # Build query for the custom log query_flags = ( win32evtlog.EvtQueryChannelPath | win32evtlog.EvtQueryReverseDirection ) # Build XPath query if we have filters xpath_query = "*" if filters or start_time or end_time: conditions = [] if start_time: # Convert to Windows file time format start_ms = int(start_time.timestamp() * 1000) conditions.append(f"TimeCreated[@SystemTime >= '{start_ms}']") if end_time: end_ms = int(end_time.timestamp() * 1000) conditions.append(f"TimeCreated[@SystemTime <= '{end_ms}']") if filters and "EventID" in filters: conditions.append(f"EventID={filters['EventID']}") if conditions: xpath_query = f"*[System[{' and '.join(conditions)}]]" # Query the event log query_handle = win32evtlog.EvtQuery(path, query_flags, xpath_query) # Read events events_read = 0 while len(records) < limit: # Get batch of events events = win32evtlog.EvtNext(query_handle, 10) # Read 10 at a time if not events: break for event in events: if events_read >= offset: # Render event as XML to extract data xml_content = win32evtlog.EvtRender( event, win32evtlog.EvtRenderEventXml ) record = self._parse_event_xml(xml_content, path) if self._matches_filters(record, filters, start_time, end_time): records.append(record) if len(records) >= limit: break events_read += 1 win32evtlog.EvtClose(event) win32evtlog.EvtClose(query_handle) except Exception as e: print(f"Error parsing custom event log '{path}': {str(e)}") # Fall back to empty list if the API is not available or fails pass return records def _parse_event_xml(self, xml_content: str, log_name: str) -> LogRecord: """Parse event data from XML format.""" try: root = ET.fromstring(xml_content) # Extract system data system = root.find(".//System") event_id = ( int(system.find("EventID").text) if system.find("EventID") is not None else 0 ) # Handle unsigned conversion event_id = event_id & 0xFFFFFFFF provider = system.find("Provider") provider_name = ( provider.get("Name", "Unknown") if provider is not None else "Unknown" ) computer = system.find("Computer") computer_name = computer.text if computer is not None else "Unknown" time_created = system.find("TimeCreated") if time_created is not None: system_time = time_created.get("SystemTime", "") # Parse ISO format timestamp try: timestamp = datetime.datetime.fromisoformat( system_time.replace("Z", "+00:00") ) except: timestamp = datetime.datetime.now() else: timestamp = datetime.datetime.now() level = system.find("Level") event_type = ( int(level.text) if level is not None else 4 ) # Default to Information # Map levels to event types (1=Error, 2=Warning, 4=Information) level_map = { 1: 2, 2: 3, 3: 4, 4: 4, 5: 4, } # Critical=1, Error=2, Warning=3, Info=4, Verbose=5 event_type = level_map.get(event_type, 4) # Extract event data event_data = {} data_elem = root.find(".//EventData") if data_elem is not None: for data in data_elem: name = data.get("Name", "") if name: event_data[name] = data.text or "" # Try to get rendered message message = "" rendering = root.find(".//RenderingInfo/Message") if rendering is not None: message = rendering.text or "" else: # Build message from event data if event_data: message = "; ".join(f"{k}: {v}" for k, v in event_data.items()) return LogRecord( source_id=UUID("00000000-0000-0000-0000-000000000000"), timestamp=timestamp, data={ "EventID": event_id, "EventType": event_type, "EventCategory": 0, # Not available in new API "SourceName": provider_name, "ComputerName": computer_name, "Message": message, "EventData": event_data, "LogName": log_name, }, ) except Exception as e: print(f"Error parsing event XML: {str(e)}") # Return a basic record on error return LogRecord( source_id=UUID("00000000-0000-0000-0000-000000000000"), timestamp=datetime.datetime.now(), data={ "EventID": 0, "EventType": 4, "EventCategory": 0, "SourceName": "Unknown", "ComputerName": "Unknown", "Message": f"Error parsing event: {str(e)}", "LogName": log_name, }, ) def _parse_event(self, event, log_name: str = None) -> LogRecord: """Parse a single Windows event.""" try: message = win32evtlogutil.SafeFormatMessage(event, log_name) except: message = "(Unable to format message)" return LogRecord( source_id=UUID("00000000-0000-0000-0000-000000000000"), # Placeholder timestamp=event.TimeGenerated, data={ "EventID": event.EventID & 0xFFFFFFFF, # Convert to unsigned "EventType": event.EventType, "EventCategory": event.EventCategory, "SourceName": event.SourceName, "ComputerName": event.ComputerName, "Message": message, }, ) def _matches_filters( self, record: LogRecord, filters: Optional[Dict[str, Any]], start_time: Optional[datetime.datetime], end_time: Optional[datetime.datetime], ) -> bool: """Check if a record matches the given filters.""" if start_time and record.timestamp and record.timestamp < start_time: return False if end_time and record.timestamp and record.timestamp > end_time: return False if filters: for key, value in filters.items(): if key in record.data and record.data[key] != value: return False return True def _summary_analysis(self, logs: List[LogRecord]) -> AnalysisResult: """Perform summary analysis.""" event_types = {} sources = {} for log in logs: event_type = log.data.get("EventType", "Unknown") event_types[event_type] = event_types.get(event_type, 0) + 1 source = log.data.get("SourceName", "Unknown") sources[source] = sources.get(source, 0) + 1 return AnalysisResult( analysis_type="summary", summary={ "total_events": len(logs), "event_types": event_types, "sources": sources, }, ) def _pattern_analysis(self, logs: List[LogRecord]) -> AnalysisResult: """Perform pattern analysis.""" # Simplified pattern analysis patterns = [] # Group by EventID event_groups = {} for log in logs: event_id = log.data.get("EventID", "Unknown") if event_id not in event_groups: event_groups[event_id] = [] event_groups[event_id].append(log) for event_id, events in event_groups.items(): if len(events) > 1: patterns.append( { "pattern": f"EventID {event_id}", "count": len(events), "frequency": len(events) / len(logs), } ) return AnalysisResult( analysis_type="pattern", summary={"total_patterns": len(patterns)}, patterns=patterns, ) def _anomaly_analysis(self, logs: List[LogRecord]) -> AnalysisResult: """Perform anomaly analysis.""" # Simplified anomaly detection anomalies = [] # Look for error events for log in logs: if log.data.get("EventType") == 1: # Error anomalies.append( { "type": "error_event", "event_id": log.data.get("EventID"), "source": log.data.get("SourceName"), "message": log.data.get("Message", "")[:100], } ) return AnalysisResult( analysis_type="anomaly", summary={"total_anomalies": len(anomalies)}, anomalies=anomalies, ) # For backward compatibility EvtParser = EventLogParser

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sedwardstx/demomcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server