Skip to main content
Glama

Regression-JIRA Integration System

by nanyang12138
log_analyzer.py24.1 kB
""" Log Analyzer Module Python implementation of the analyzeFailure Perl function. Analyzes test log files to extract error signatures and relevant information. """ import re import os import gzip from typing import List, Optional, Tuple from dataclasses import dataclass from .error_patterns import ( COMPILED_IGNORE_PATTERNS, ERROR_PATTERNS, WARNING_PATTERNS ) from .utils import extract_keywords, extract_keywords_from_test_name @dataclass class ErrorSignature: """ Error signature extracted from log analysis. Corresponds to the Perl analyzeFailure return values. """ suite: str test: str signature: str # The error signature/message tool: str # Tool that failed line_number: int # Line number in log file line_offset: int # Byte offset in log file error_level: int # Error severity level (1-10) error_line: str # Full line containing the error pattern_pos: str # Pattern that matched num_lines_scanned: int # Total lines scanned error_keywords: List[str] # Extracted keywords for JIRA search def to_dict(self): """Convert to dictionary for JSON serialization""" return { 'suite': self.suite, 'test': self.test, 'signature': self.signature, 'tool': self.tool, 'line_number': self.line_number, 'error_level': self.error_level, 'error_line': self.error_line, 'pattern_pos': self.pattern_pos, 'num_lines_scanned': self.num_lines_scanned, 'error_keywords': self.error_keywords } class LogAnalyzer: """ Log file analyzer - Python implementation of Perl's analyzeFailure. This class reads test log files and identifies error patterns using the same rules as the original Perl implementation. """ def __init__(self, max_lines: Optional[int] = None, ends_only: Optional[int] = None): """ Initialize the log analyzer. Args: max_lines: Maximum number of lines to scan (None = unlimited) ends_only: Only scan first/last N bytes (None = scan all) """ self.max_lines = max_lines self.ends_only = ends_only self.history_size = 10 self.history: List[str] = [] def analyze_failure( self, log_file_path: str, suite: Optional[str] = None, test: Optional[str] = None, tool: Optional[str] = None ) -> ErrorSignature: """ Analyze a test failure log file. This is the Python equivalent of the Perl analyzeFailure subroutine. Args: log_file_path: Path to the log file suite: Test suite name (auto-detected if None) test: Test name (auto-detected if None) tool: Tool name (auto-detected if None) Returns: ErrorSignature object containing analysis results """ # State variables (matching Perl implementation) failed_level = None failed_signature = None failed_tool = None failed_line = None failed_line_no = None failed_line_offset = None failed_pat_pos = None first_error_found = False first_error = "unknown" first_error_line = None first_error_offset = None treat_warnings_as_errors = False current_tool = tool max_lines_scanned = 0 skipped_to_end = False file_size = 0 # Check if file exists if not os.path.exists(log_file_path): return self._create_error_signature( suite or "unknown", test or "unknown", "Log file not found", current_tool or "unknown", 0, 0, 1, "Log file not found", "error:file_not_found", 0, [] ) try: # Handle gzipped files is_gzipped = log_file_path.endswith('.gz') if is_gzipped: f = gzip.open(log_file_path, 'rt', encoding='utf-8', errors='ignore') file_size = 0 # Can't easily get size for gzip in text mode else: f = open(log_file_path, 'r', encoding='utf-8', errors='ignore') # Get file size for regular files f.seek(0, 2) file_size = f.tell() f.seek(0) try: # Read and process log file for line_num, line in enumerate(f, 1): # Update history (for advanced pattern matching) self.history.insert(0, line) if len(self.history) > self.history_size: self.history.pop() max_lines_scanned += 1 # Check line limit if self.max_lines and max_lines_scanned > self.max_lines: first_error = f"unknown, did not find error in first {self.max_lines} lines" break # Handle ends_only (scan only file head and tail) # Skip for gzipped files as they don't support seeking in text mode if self.ends_only and not skipped_to_end and not is_gzipped: current_pos = f.tell() if current_pos > self.ends_only: skipped_to_end = True new_pos = file_size - self.ends_only if new_pos > current_pos: f.seek(new_pos) # Skip partial line f.readline() continue # Check ignore patterns if self._should_ignore(line): continue # Auto-detect suite/test from action line if not suite or not test: match = re.search(r'^# action: gc\(.*\)::(\w+)\/(\w+)\.(\S+)', line) if match: suite = match.group(1) test = match.group(2) if not tool: current_tool = match.group(3) continue # Extract tool name from DV output tool_match = self._extract_tool_from_dv(line) if tool_match: current_tool = tool_match continue # Check for "warnings as errors" flag if re.search(r'cc1plus: warnings being treated as errors', line): treat_warnings_as_errors = True continue # Apply error pattern matching error_match = self._match_error_patterns( line, current_tool or "unknown", treat_warnings_as_errors ) if error_match: if not first_error_found: first_error_found = True first_error = line.strip() first_error_line = line_num # Only get offset for non-gzipped files if not is_gzipped: first_error_offset = f.tell() # Update if this is higher severity if failed_level is None or error_match['level'] > failed_level: failed_level = error_match['level'] failed_signature = line.strip() failed_tool = current_tool failed_line = line failed_line_no = line_num # Only get offset for non-gzipped files if not is_gzipped: failed_line_offset = f.tell() failed_pat_pos = error_match['pattern_pos'] finally: f.close() except Exception as e: return self._create_error_signature( suite or "unknown", test or "unknown", f"Error reading log file: {str(e)}", current_tool or "unknown", 0, 0, 1, str(e), "error:read_failure", 0, [] ) # Set defaults if no error found if not first_error_found: if self.max_lines and max_lines_scanned > self.max_lines: first_error = f"unknown, did not find error in first {self.max_lines} lines" elif self.ends_only and skipped_to_end: first_error = f"unknown, did not find error in first/last {self.ends_only} bytes" failed_signature = first_error failed_level = 1 failed_line = first_error failed_line_no = first_error_line or 0 failed_line_offset = first_error_offset or 0 failed_pat_pos = "unknown" # Extract keywords from error signature error_keywords = extract_keywords(failed_signature or first_error) # If no keywords from error, try test name if not error_keywords and test: error_keywords = extract_keywords_from_test_name(test) return self._create_error_signature( suite or "unknown", test or "unknown", failed_signature or first_error, failed_tool or current_tool or "unknown", failed_line_no or 0, failed_line_offset or 0, failed_level or 1, failed_line or first_error, failed_pat_pos or "unknown", max_lines_scanned, error_keywords ) def _create_error_signature( self, suite: str, test: str, signature: str, tool: str, line_no: int, offset: int, level: int, line: str, pattern_pos: str, num_lines: int, keywords: List[str] ) -> ErrorSignature: """Helper to create ErrorSignature object""" return ErrorSignature( suite=suite, test=test, signature=signature, tool=tool, line_number=line_no, line_offset=offset, error_level=level, error_line=line, pattern_pos=pattern_pos, num_lines_scanned=num_lines, error_keywords=keywords ) def _should_ignore(self, line: str) -> bool: """ Check if line should be ignored based on ignore patterns. Args: line: Log line to check Returns: True if line should be ignored """ # Special case: simctrl lines (unless they contain "failed: caught signal <num>") # Perl: (/simctrl/ and (not /failed:\s+caught\s+signal\s+\d+/)) and next; if re.search(r'simctrl', line): if not re.search(r'failed:\s+caught\s+signal\s+\d+', line): return True # Check regular ignore patterns for pattern in COMPILED_IGNORE_PATTERNS: if pattern.search(line): return True return False def _extract_tool_from_dv(self, line: str) -> Optional[str]: """ Extract tool name from DV output lines. Args: line: Log line Returns: Tool name or None """ # dv: ... running tool <tool_name> match = re.search(r'dv: \.\.\. running tool (\S+)', line) if match: return match.group(1) # dv: tool <tool_name> failed! match = re.search(r'dv: tool (\S+) failed!', line) if match: return match.group(1) return None def _match_error_patterns( self, line: str, tool: str, treat_warnings_as_errors: bool ) -> Optional[dict]: """ Match line against error patterns. Args: line: Log line to check tool: Current tool name treat_warnings_as_errors: Whether to treat warnings as errors Returns: Dict with 'level' and 'pattern_pos' if matched, None otherwise """ # Skip empty lines if not line.strip(): return None # Check error patterns for pattern_dict in ERROR_PATTERNS: pattern = pattern_dict['pattern'] if pattern.search(line): return { 'level': pattern_dict['level'], 'pattern_pos': pattern_dict['pos'] } # Check warning patterns if warnings are errors if treat_warnings_as_errors: for pattern_dict in WARNING_PATTERNS: pattern = pattern_dict['pattern'] if pattern.search(line): return { 'level': pattern_dict['level'], 'pattern_pos': pattern_dict['pos'] } return None def analyze_log_content( self, log_content: str, suite: Optional[str] = None, test: Optional[str] = None, tool: Optional[str] = None ) -> ErrorSignature: """ Analyze log content directly (without file path). Args: log_content: Log file content as string suite: Test suite name test: Test name tool: Tool name Returns: ErrorSignature object """ lines = log_content.split('\n') # State variables failed_level = None failed_signature = None failed_tool = None failed_line = None failed_line_no = None failed_pat_pos = None first_error_found = False first_error = "unknown" first_error_line = None treat_warnings_as_errors = False current_tool = tool max_lines_scanned = 0 for line_num, line in enumerate(lines, 1): # Update history self.history.insert(0, line) if len(self.history) > self.history_size: self.history.pop() max_lines_scanned += 1 # Check line limit if self.max_lines and max_lines_scanned > self.max_lines: first_error = f"unknown, did not find error in first {self.max_lines} lines" break # Check ignore patterns if self._should_ignore(line): continue # Auto-detect suite/test if not suite or not test: match = re.search(r'^# action: gc\(.*\)::(\w+)\/(\w+)\.(\S+)', line) if match: suite = match.group(1) test = match.group(2) if not tool: current_tool = match.group(3) continue # Extract tool name tool_match = self._extract_tool_from_dv(line) if tool_match: current_tool = tool_match continue # Check for warnings as errors if re.search(r'cc1plus: warnings being treated as errors', line): treat_warnings_as_errors = True continue # Match error patterns error_match = self._match_error_patterns( line, current_tool or "unknown", treat_warnings_as_errors ) if error_match: if not first_error_found: first_error_found = True first_error = line.strip() first_error_line = line_num # Update if higher severity if failed_level is None or error_match['level'] > failed_level: failed_level = error_match['level'] failed_signature = line.strip() failed_tool = current_tool failed_line = line failed_line_no = line_num failed_pat_pos = error_match['pattern_pos'] # Set defaults if no error found if not first_error_found: if self.max_lines and max_lines_scanned > self.max_lines: first_error = f"unknown, did not find error in first {self.max_lines} lines" failed_signature = first_error failed_level = 1 failed_line = first_error failed_line_no = first_error_line or 0 failed_pat_pos = "unknown" # Extract keywords error_keywords = extract_keywords(failed_signature or first_error) if not error_keywords and test: error_keywords = extract_keywords_from_test_name(test) return self._create_error_signature( suite or "unknown", test or "unknown", failed_signature or first_error, failed_tool or current_tool or "unknown", failed_line_no or 0, 0, # offset not applicable for content analysis failed_level or 1, failed_line or first_error, failed_pat_pos or "unknown", max_lines_scanned, error_keywords ) def extract_all_errors( self, log_file_path: str, max_errors: int = 10 ) -> List[Tuple[str, int, int]]: """ Extract all error lines from log file (not just the first one). Args: log_file_path: Path to log file max_errors: Maximum number of errors to extract Returns: List of tuples: (error_line, line_number, error_level) """ errors = [] if not os.path.exists(log_file_path): return errors try: # Handle gzipped files if log_file_path.endswith('.gz'): f = gzip.open(log_file_path, 'rt', encoding='utf-8', errors='ignore') else: f = open(log_file_path, 'r', encoding='utf-8', errors='ignore') try: treat_warnings_as_errors = False for line_num, line in enumerate(f, 1): # Check ignore patterns if self._should_ignore(line): continue # Check for warnings as errors if re.search(r'cc1plus: warnings being treated as errors', line): treat_warnings_as_errors = True continue # Match error patterns error_match = self._match_error_patterns( line, "unknown", treat_warnings_as_errors ) if error_match: errors.append(( line.strip(), line_num, error_match['level'] )) if len(errors) >= max_errors: break finally: f.close() except Exception: pass return errors def get_log_tail(self, log_file_path: str, num_lines: int = 100) -> str: """ Get the last N lines of a log file. Args: log_file_path: Path to log file num_lines: Number of lines to read Returns: Last N lines as a single string """ if not os.path.exists(log_file_path): return "Log file not found" try: # Handle gzipped files if log_file_path.endswith('.gz'): with gzip.open(log_file_path, 'rt', encoding='utf-8', errors='ignore') as f: lines = f.readlines() else: with open(log_file_path, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() tail_lines = lines[-num_lines:] if len(lines) > num_lines else lines return ''.join(tail_lines) except Exception as e: return f"Error reading log file: {str(e)}" def get_error_context( self, log_file_path: str, error_line_number: int, context_lines: int = 5 ) -> str: """ Get context around an error line. Args: log_file_path: Path to log file error_line_number: Line number of the error context_lines: Number of lines before and after to include Returns: Context as a string """ if not os.path.exists(log_file_path): return "Log file not found" try: # Handle gzipped files if log_file_path.endswith('.gz'): with gzip.open(log_file_path, 'rt', encoding='utf-8', errors='ignore') as f: lines = f.readlines() else: with open(log_file_path, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() start = max(0, error_line_number - context_lines - 1) end = min(len(lines), error_line_number + context_lines) context_lines_list = [] for i in range(start, end): line_num = i + 1 marker = ">>> " if line_num == error_line_number else " " context_lines_list.append(f"{marker}{line_num:5d}: {lines[i].rstrip()}") return '\n'.join(context_lines_list) except Exception as e: return f"Error reading context: {str(e)}" def quick_error_check(log_file_path: str) -> bool: """ Quick check if log file contains errors. Args: log_file_path: Path to log file Returns: True if errors found, False otherwise """ if not os.path.exists(log_file_path): return False analyzer = LogAnalyzer(max_lines=None) # No limit, scan entire file try: # Handle gzipped files if log_file_path.endswith('.gz'): f = gzip.open(log_file_path, 'rt', encoding='utf-8', errors='ignore') else: f = open(log_file_path, 'r', encoding='utf-8', errors='ignore') try: for line in f: if analyzer._should_ignore(line): continue error_match = analyzer._match_error_patterns(line, "unknown", False) if error_match: return True finally: f.close() except Exception: pass return False

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nanyang12138/regression-jira-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server