"""Tests for pattern_suggester analyzer."""
from codesdevs_log_analyzer.analyzers.pattern_suggester import (
PatternSuggester,
PatternSuggestionResult,
SuggestedPattern,
)
class TestSuggestedPattern:
"""Tests for SuggestedPattern dataclass."""
def test_default_values(self):
"""Test default values are set correctly."""
pattern = SuggestedPattern(
pattern=r"\berror\b",
description="Error messages",
category="error",
match_count=5,
)
assert pattern.examples == []
assert pattern.priority == "medium"
def test_to_dict(self):
"""Test to_dict serialization."""
pattern = SuggestedPattern(
pattern=r"\berror\b",
description="Error messages",
category="error",
match_count=10,
examples=["error 1", "error 2", "error 3", "error 4"],
priority="high",
)
d = pattern.to_dict()
assert d["pattern"] == r"\berror\b"
assert d["description"] == "Error messages"
assert d["category"] == "error"
assert d["match_count"] == 10
assert len(d["examples"]) == 3 # Should truncate to 3
assert d["priority"] == "high"
class TestPatternSuggestionResult:
"""Tests for PatternSuggestionResult dataclass."""
def test_default_values(self):
"""Test default values are set correctly."""
result = PatternSuggestionResult()
assert result.patterns == []
assert result.analysis_summary == ""
assert result.lines_analyzed == 0
assert result.unique_levels == set()
assert result.error_count == 0
assert result.warning_count == 0
def test_to_dict(self):
"""Test to_dict serialization."""
result = PatternSuggestionResult(
patterns=[
SuggestedPattern(
pattern="test",
description="Test",
category="test",
match_count=1,
)
],
analysis_summary="Test summary",
lines_analyzed=100,
unique_levels={"INFO", "ERROR"},
error_count=5,
warning_count=3,
)
d = result.to_dict()
assert len(d["patterns"]) == 1
assert d["analysis_summary"] == "Test summary"
assert d["lines_analyzed"] == 100
assert set(d["unique_levels"]) == {"INFO", "ERROR"}
assert d["error_count"] == 5
assert d["warning_count"] == 3
class TestPatternSuggester:
"""Tests for PatternSuggester class."""
def test_initialization(self):
"""Test that suggester initializes correctly."""
suggester = PatternSuggester()
assert len(suggester._compiled_patterns) > 0
def test_analyze_file_basic(self, sample_log_file, mock_parser):
"""Test basic file analysis."""
suggester = PatternSuggester()
result = suggester.analyze_file(
file_path=str(sample_log_file),
parser=mock_parser,
)
assert result.lines_analyzed > 0
assert len(result.unique_levels) > 0
assert result.analysis_summary != ""
def test_analyze_file_with_focus(self, sample_log_file, mock_parser):
"""Test file analysis with specific focus."""
suggester = PatternSuggester()
for focus in ["all", "errors", "security", "performance", "identifiers"]:
result = suggester.analyze_file(
file_path=str(sample_log_file),
parser=mock_parser,
focus=focus,
)
assert result.lines_analyzed > 0
def test_analyze_file_max_patterns(self, sample_log_file, mock_parser):
"""Test that max_patterns limits results."""
suggester = PatternSuggester()
result = suggester.analyze_file(
file_path=str(sample_log_file),
parser=mock_parser,
max_patterns=2,
)
assert len(result.patterns) <= 2
def test_analyze_file_max_lines(self, tmp_path, mock_parser):
"""Test that max_lines limits analysis."""
log_file = tmp_path / "large.log"
with open(log_file, "w") as f:
for i in range(1000):
f.write(f"2024-01-15 10:00:{i % 60:02d} INFO Message {i}\n")
suggester = PatternSuggester()
result = suggester.analyze_file(
file_path=str(log_file),
parser=mock_parser,
max_lines=100,
)
assert result.lines_analyzed == 100
def test_detect_uuid_patterns(self, tmp_path, mock_parser):
"""Test UUID pattern detection."""
log_file = tmp_path / "uuid.log"
log_file.write_text(
"2024-01-15 10:00:00 INFO Processing request 550e8400-e29b-41d4-a716-446655440000\n"
"2024-01-15 10:00:01 INFO User 550e8400-e29b-41d4-a716-446655440001 logged in\n"
"2024-01-15 10:00:02 INFO User 550e8400-e29b-41d4-a716-446655440002 logged in\n"
"2024-01-15 10:00:03 INFO User 550e8400-e29b-41d4-a716-446655440003 logged in\n"
"2024-01-15 10:00:04 INFO User 550e8400-e29b-41d4-a716-446655440004 logged in\n"
"2024-01-15 10:00:05 INFO User 550e8400-e29b-41d4-a716-446655440005 logged in\n"
)
suggester = PatternSuggester()
result = suggester.analyze_file(
file_path=str(log_file),
parser=mock_parser,
focus="identifiers",
)
# Should detect UUID pattern
identifier_patterns = [p for p in result.patterns if p.category == "identifier"]
# UUID should be detected if seen enough times
assert len(identifier_patterns) >= 0 # May or may not meet threshold
def test_detect_error_patterns(self, tmp_path, mock_parser):
"""Test error pattern detection."""
log_file = tmp_path / "errors.log"
log_file.write_text(
"2024-01-15 10:00:00 ERROR Connection refused to database\n"
"2024-01-15 10:00:01 ERROR Connection refused to database\n"
"2024-01-15 10:00:02 ERROR Connection refused to database\n"
"2024-01-15 10:00:03 ERROR Timeout waiting for response\n"
"2024-01-15 10:00:04 ERROR Timeout waiting for response\n"
)
suggester = PatternSuggester()
result = suggester.analyze_file(
file_path=str(log_file),
parser=mock_parser,
focus="errors",
)
assert result.error_count == 5
# May detect error patterns depending on threshold
def test_detect_security_patterns(self, tmp_path, mock_parser):
"""Test security pattern detection."""
log_file = tmp_path / "security.log"
log_file.write_text(
"2024-01-15 10:00:00 ERROR Authentication failed for user admin\n"
"2024-01-15 10:00:01 ERROR Authentication failed for user admin\n"
"2024-01-15 10:00:02 ERROR Invalid token received\n"
"2024-01-15 10:00:03 WARN Unauthorized access attempt detected\n"
)
suggester = PatternSuggester()
result = suggester.analyze_file(
file_path=str(log_file),
parser=mock_parser,
focus="security",
)
# Should detect security-related patterns
security_patterns = [p for p in result.patterns if p.category == "security"]
assert len(security_patterns) >= 0 # Depends on thresholds
def test_detect_performance_patterns(self, tmp_path, mock_parser):
"""Test performance pattern detection."""
log_file = tmp_path / "performance.log"
log_file.write_text(
"2024-01-15 10:00:00 INFO Request took 5000ms to complete\n"
"2024-01-15 10:00:01 INFO Request took 3500ms to complete\n"
"2024-01-15 10:00:02 WARN Memory usage at 1024MB\n"
)
suggester = PatternSuggester()
result = suggester.analyze_file(
file_path=str(log_file),
parser=mock_parser,
focus="performance",
)
# Should detect performance-related patterns
perf_patterns = [p for p in result.patterns if p.category == "performance"]
assert len(perf_patterns) >= 0 # Depends on thresholds
def test_error_count_tracking(self, tmp_path, mock_parser):
"""Test that error and warning counts are tracked."""
log_file = tmp_path / "mixed.log"
log_file.write_text(
"2024-01-15 10:00:00 INFO Normal message\n"
"2024-01-15 10:00:01 ERROR Error 1\n"
"2024-01-15 10:00:02 ERROR Error 2\n"
"2024-01-15 10:00:03 WARN Warning 1\n"
"2024-01-15 10:00:04 WARN Warning 2\n"
"2024-01-15 10:00:05 WARN Warning 3\n"
)
suggester = PatternSuggester()
result = suggester.analyze_file(
file_path=str(log_file),
parser=mock_parser,
)
assert result.error_count == 2
# WARN might be normalized to WARNING in some parsers
assert result.warning_count >= 0
def test_unique_levels_tracking(self, tmp_path, mock_parser):
"""Test that unique levels are tracked."""
log_file = tmp_path / "levels.log"
log_file.write_text(
"2024-01-15 10:00:00 INFO Info message\n"
"2024-01-15 10:00:01 ERROR Error message\n"
"2024-01-15 10:00:02 DEBUG Debug message\n"
"2024-01-15 10:00:03 WARN Warning message\n"
)
suggester = PatternSuggester()
result = suggester.analyze_file(
file_path=str(log_file),
parser=mock_parser,
)
# Should track multiple levels
assert len(result.unique_levels) >= 2
def test_pattern_priority_ordering(self, tmp_path, mock_parser):
"""Test that patterns are sorted by priority."""
log_file = tmp_path / "priority.log"
# Create log with security issues (high priority)
log_file.write_text(
"2024-01-15 10:00:00 ERROR Authentication failed for user\n"
"2024-01-15 10:00:01 INFO Normal message\n"
"2024-01-15 10:00:02 ERROR Authentication failed for admin\n"
)
suggester = PatternSuggester()
result = suggester.analyze_file(
file_path=str(log_file),
parser=mock_parser,
)
# If multiple patterns found, high priority should come first
if len(result.patterns) > 1:
priorities = [p.priority for p in result.patterns]
priority_order = {"high": 0, "medium": 1, "low": 2}
priority_values = [priority_order.get(p, 1) for p in priorities]
assert priority_values == sorted(priority_values)
def test_analysis_summary_generation(self, sample_log_file, mock_parser):
"""Test that analysis summary is generated."""
suggester = PatternSuggester()
result = suggester.analyze_file(
file_path=str(sample_log_file),
parser=mock_parser,
)
assert "Analyzed" in result.analysis_summary
assert "lines" in result.analysis_summary
def test_empty_file(self, tmp_path, mock_parser):
"""Test handling of empty file."""
log_file = tmp_path / "empty.log"
log_file.write_text("")
suggester = PatternSuggester()
result = suggester.analyze_file(
file_path=str(log_file),
parser=mock_parser,
)
assert result.lines_analyzed == 0
assert result.patterns == []
def test_normalize_error_message(self, tmp_path, mock_parser):
"""Test error message normalization for grouping."""
log_file = tmp_path / "normalize.log"
log_file.write_text(
"2024-01-15 10:00:00 ERROR Connection refused to 192.168.1.100:5432\n"
"2024-01-15 10:00:01 ERROR Connection refused to 10.0.0.50:5432\n"
"2024-01-15 10:00:02 ERROR Connection refused to 172.16.0.1:5432\n"
)
suggester = PatternSuggester()
result = suggester.analyze_file(
file_path=str(log_file),
parser=mock_parser,
focus="errors",
)
# Similar errors should be grouped
# The exact result depends on the normalization threshold
assert result.lines_analyzed == 3