smart_utils.py•23 kB
"""
Smart utility functions for AI-powered incident analysis.
This module provides text similarity, pattern matching, and intelligent analysis
functions for implementing smart incident management features.
"""
import re
import logging
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime
# Check ML library availability
import importlib.util
try:
ML_AVAILABLE = (
importlib.util.find_spec("sklearn.feature_extraction.text") is not None and
importlib.util.find_spec("sklearn.metrics.pairwise") is not None
)
except (ImportError, ModuleNotFoundError):
ML_AVAILABLE = False
logger = logging.getLogger(__name__)
@dataclass
class IncidentSimilarity:
"""Represents similarity between two incidents."""
incident_id: str
title: str
similarity_score: float
matched_services: List[str]
matched_keywords: List[str]
resolution_summary: str = ""
resolution_time_hours: Optional[float] = None
class TextSimilarityAnalyzer:
"""Analyzes text similarity between incidents using TF-IDF and cosine similarity."""
def __init__(self):
if not ML_AVAILABLE:
logger.warning("scikit-learn not available. Text similarity will use basic keyword matching.")
self.vectorizer = None
self.incident_vectors = None
self.incident_metadata = {}
def preprocess_text(self, text: Optional[str]) -> str:
"""Clean and normalize text for analysis."""
if not text:
return ""
# Convert to lowercase
text = text.lower()
# Remove special characters but keep spaces and important symbols
text = re.sub(r'[^\w\s\-\.]', ' ', text)
# Replace multiple spaces with single space
text = re.sub(r'\s+', ' ', text)
# Remove common stopwords manually (basic set)
stopwords = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were'}
words = text.split()
text = ' '.join([word for word in words if word not in stopwords and len(word) > 1])
return text.strip()
def extract_services(self, text: str) -> List[str]:
"""Extract service names from incident text."""
services = []
# Common service patterns
service_patterns = [
r'\b(\w+)-(?:service|api|app|server|db)\b', # service-api, auth-service
r'\b(\w+)(?:service|api|app|server|db)\b', # paymentapi, authservice
r'\b(\w+)\.(?:service|api|app|com)\b', # auth.service, api.com
r'\b(\w+)\s+(?:api|service|app|server|db)\b', # payment api, auth service
]
# Known service names (exact matches)
known_services = [
'elasticsearch', 'elastic', 'kibana', 'redis', 'postgres', 'mysql',
'mongodb', 'kafka', 'rabbitmq', 'nginx', 'apache', 'docker', 'kubernetes'
]
text_lower = text.lower()
# Extract pattern-based services
for pattern in service_patterns:
matches = re.findall(pattern, text_lower)
services.extend(matches)
# Extract known services (with word boundaries to avoid false positives)
for service in known_services:
if re.search(r'\b' + re.escape(service) + r'\b', text_lower):
services.append(service)
# Remove duplicates while preserving order
return list(dict.fromkeys(services))
def extract_error_patterns(self, text: str) -> List[str]:
"""Extract common error patterns from incident text."""
patterns = []
# HTTP status codes
http_codes = re.findall(r'\b[45]\d\d\b', text)
patterns.extend([f"http-{code}" for code in http_codes])
# Database errors
if re.search(r'\b(?:connection|timeout|database|db)\b', text.lower()):
patterns.append("database-error")
# Memory/resource errors
if re.search(r'\b(?:memory|cpu|disk|resource)\b', text.lower()):
patterns.append("resource-error")
# Network errors
if re.search(r'\b(?:network|dns|connection|unreachable)\b', text.lower()):
patterns.append("network-error")
return patterns
def calculate_similarity(self, incidents: List[Dict], target_incident: Dict) -> List[IncidentSimilarity]:
"""Calculate similarity scores between target incident and historical incidents."""
if not incidents:
return []
target_text = self._combine_incident_text(target_incident)
target_services = self.extract_services(target_text)
target_errors = self.extract_error_patterns(target_text)
similarities = []
if ML_AVAILABLE and len(incidents) > 1:
similarities = self._calculate_tfidf_similarity(incidents, target_incident, target_text, target_services, target_errors)
else:
similarities = self._calculate_keyword_similarity(incidents, target_incident, target_text, target_services, target_errors)
# Sort by similarity score descending
return sorted(similarities, key=lambda x: x.similarity_score, reverse=True)
def _combine_incident_text(self, incident: Dict) -> str:
"""Combine incident title, description, and other text fields."""
text_parts = []
# Get text from incident attributes (preferred)
attributes = incident.get('attributes', {})
title = attributes.get('title', '')
summary = attributes.get('summary', '')
description = attributes.get('description', '')
# Fallback to root level if attributes are empty
if not title:
title = incident.get('title', '')
if not summary:
summary = incident.get('summary', '')
if not description:
description = incident.get('description', '')
# Add non-empty parts, avoiding duplication
for part in [title, summary, description]:
if part and part not in text_parts:
text_parts.append(part)
combined = ' '.join(text_parts)
return self.preprocess_text(combined)
def _calculate_tfidf_similarity(self, incidents: List[Dict], target_incident: Dict,
target_text: str, target_services: List[str],
target_errors: List[str]) -> List[IncidentSimilarity]:
"""Use TF-IDF and cosine similarity for advanced text matching."""
if not ML_AVAILABLE:
return []
# Import here to avoid issues with conditional imports
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
# Prepare texts
incident_texts = [self._combine_incident_text(inc) for inc in incidents]
all_texts = incident_texts + [target_text]
# Vectorize
vectorizer = TfidfVectorizer(max_features=1000, ngram_range=(1, 2))
tfidf_matrix = vectorizer.fit_transform(all_texts)
# Calculate similarities
target_vector = tfidf_matrix[-1]
similarities = cosine_similarity(target_vector, tfidf_matrix[:-1]).flatten()
results = []
for i, incident in enumerate(incidents):
if similarities[i] > 0.1: # Only include reasonable matches
incident_services = self.extract_services(incident_texts[i])
incident_errors = self.extract_error_patterns(incident_texts[i])
# Bonus for matching services and error patterns
service_bonus = len(set(target_services) & set(incident_services)) * 0.1
error_bonus = len(set(target_errors) & set(incident_errors)) * 0.15
# Exact match bonus for identical preprocessed text
exact_match_bonus = 0.0
if target_text and incident_texts[i] and target_text.strip() == incident_texts[i].strip():
exact_match_bonus = 0.3 # Strong bonus for exact matches
# Partial matching bonus using fuzzy keyword similarity
partial_bonus = self._calculate_partial_similarity_bonus(target_text, incident_texts[i])
final_score = min(1.0, similarities[i] + service_bonus + error_bonus + exact_match_bonus + partial_bonus)
results.append(IncidentSimilarity(
incident_id=str(incident.get('id', '')),
title=incident.get('attributes', {}).get('title', 'Unknown'),
similarity_score=final_score,
matched_services=list(set(target_services) & set(incident_services)),
matched_keywords=self._extract_common_keywords(target_text, incident_texts[i]),
resolution_summary=incident.get('attributes', {}).get('summary', ''),
resolution_time_hours=self._calculate_resolution_time(incident)
))
return results
def _calculate_keyword_similarity(self, incidents: List[Dict], target_incident: Dict,
target_text: str, target_services: List[str],
target_errors: List[str]) -> List[IncidentSimilarity]:
"""Fallback keyword-based similarity when ML libraries not available."""
target_words = set(target_text.split())
results = []
for incident in incidents:
incident_text = self._combine_incident_text(incident)
incident_words = set(incident_text.split())
incident_services = self.extract_services(incident_text)
incident_errors = self.extract_error_patterns(incident_text)
# Calculate Jaccard similarity
if len(target_words | incident_words) > 0:
word_similarity = len(target_words & incident_words) / len(target_words | incident_words)
else:
word_similarity = 0
# Service and error pattern bonuses
service_bonus = len(set(target_services) & set(incident_services)) * 0.2
error_bonus = len(set(target_errors) & set(incident_errors)) * 0.25
# Exact match bonus for identical preprocessed text
exact_match_bonus = 0.0
if target_text and incident_text and target_text.strip() == incident_text.strip():
exact_match_bonus = 0.4 # Strong bonus for exact matches in keyword mode
# Partial matching bonus using fuzzy keyword similarity
partial_bonus = self._calculate_partial_similarity_bonus(target_text, incident_text)
final_score = min(1.0, word_similarity + service_bonus + error_bonus + exact_match_bonus + partial_bonus)
if final_score > 0.15: # Only include reasonable matches
results.append(IncidentSimilarity(
incident_id=str(incident.get('id', '')),
title=incident.get('attributes', {}).get('title', 'Unknown'),
similarity_score=final_score,
matched_services=list(set(target_services) & set(incident_services)),
matched_keywords=list(target_words & incident_words)[:5], # Top 5 matches
resolution_summary=incident.get('attributes', {}).get('summary', ''),
resolution_time_hours=self._calculate_resolution_time(incident)
))
return results
def _extract_common_keywords(self, text1: str, text2: str) -> List[str]:
"""Extract common meaningful keywords between two texts with fuzzy matching."""
words1 = set(text1.split())
words2 = set(text2.split())
# Exact matches
exact_common = words1 & words2
# Fuzzy matches for partial similarity
fuzzy_common = []
for word1 in words1:
if len(word1) > 3: # Only check longer words
for word2 in words2:
if len(word2) > 3 and word1 != word2:
# Check if words share significant substring (fuzzy matching)
if self._words_similar(word1, word2):
fuzzy_common.append(f"{word1}~{word2}")
# Combine exact and fuzzy matches
all_matches = list(exact_common) + fuzzy_common
meaningful = [word for word in all_matches if len(word.split('~')[0]) > 2]
return meaningful[:8] # Increased to show more matches
def _words_similar(self, word1: str, word2: str) -> bool:
"""Check if two words are similar enough to be considered related."""
# Handle common variations
variations = {
'elastic': ['elasticsearch', 'elk'],
'payment': ['payments', 'pay', 'billing'],
'database': ['db', 'postgres', 'mysql', 'mongo'],
'timeout': ['timeouts', 'timed-out', 'timing-out'],
'service': ['services', 'svc', 'api', 'app'],
'error': ['errors', 'err', 'failure', 'failed', 'failing'],
'down': ['outage', 'offline', 'unavailable']
}
# Check if words are variations of each other
for base, variants in variations.items():
if (word1 == base and word2 in variants) or (word2 == base and word1 in variants):
return True
if word1 in variants and word2 in variants:
return True
# Check substring similarity (at least 70% overlap for longer words)
if len(word1) >= 5 and len(word2) >= 5:
shorter = min(word1, word2, key=len)
longer = max(word1, word2, key=len)
if shorter in longer and len(shorter) / len(longer) >= 0.7:
return True
# Check if one word starts with the other (for prefixed services)
if len(word1) >= 4 and len(word2) >= 4:
if word1.startswith(word2) or word2.startswith(word1):
return True
return False
def _calculate_partial_similarity_bonus(self, text1: str, text2: str) -> float:
"""Calculate bonus for partial/fuzzy keyword matches."""
if not text1 or not text2:
return 0.0
words1 = set(text1.split())
words2 = set(text2.split())
fuzzy_matches = 0
# Count meaningful words that could be compared
meaningful_words1 = [w for w in words1 if len(w) > 3]
meaningful_words2 = [w for w in words2 if len(w) > 3]
if not meaningful_words1 or not meaningful_words2:
return 0.0
# Count fuzzy matches
for word1 in meaningful_words1:
for word2 in meaningful_words2:
if word1 != word2 and self._words_similar(word1, word2):
fuzzy_matches += 1
break # Only count each target word once
# Calculate bonus based on fuzzy match ratio
if fuzzy_matches > 0:
# Use the smaller meaningful word set as denominator for conservative bonus
total_possible_matches = min(len(meaningful_words1), len(meaningful_words2))
bonus_ratio = fuzzy_matches / total_possible_matches
return min(0.15, bonus_ratio * 0.3) # Max 0.15 bonus for partial matches
return 0.0
def _calculate_resolution_time(self, incident: Dict) -> Optional[float]:
"""Calculate resolution time in hours if timestamps are available."""
try:
attributes = incident.get('attributes', {})
created_at = attributes.get('created_at')
resolved_at = attributes.get('resolved_at') or attributes.get('updated_at')
if created_at and resolved_at:
# Try to parse ISO format timestamps
created = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
resolved = datetime.fromisoformat(resolved_at.replace('Z', '+00:00'))
diff = resolved - created
return diff.total_seconds() / 3600 # Convert to hours
except Exception:
pass
return None
class SolutionExtractor:
"""Extract and format solution information from resolved incidents."""
def extract_solutions(self, similar_incidents: List[IncidentSimilarity]) -> Dict[str, Any]:
"""Extract actionable solutions from similar resolved incidents."""
if not similar_incidents:
return {
"solutions": [],
"common_patterns": [],
"average_resolution_time": None,
"total_similar_incidents": 0
}
solutions = []
resolution_times = []
all_keywords = []
for incident in similar_incidents[:5]: # Top 5 most similar
solution_info = {
"incident_id": incident.incident_id,
"title": incident.title,
"similarity": round(incident.similarity_score, 3),
"matched_services": incident.matched_services,
"resolution_summary": incident.resolution_summary or "No resolution summary available",
"resolution_time_hours": incident.resolution_time_hours
}
# Extract potential solution steps from resolution summary
solution_steps = self._extract_action_items(incident.resolution_summary)
if solution_steps:
solution_info["suggested_actions"] = solution_steps
solutions.append(solution_info)
if incident.resolution_time_hours:
resolution_times.append(incident.resolution_time_hours)
all_keywords.extend(incident.matched_keywords)
# Calculate average resolution time
avg_resolution = sum(resolution_times) / len(resolution_times) if resolution_times else None
# Find common patterns
common_patterns = self._identify_common_patterns(all_keywords, similar_incidents)
return {
"solutions": solutions,
"common_patterns": common_patterns,
"average_resolution_time": round(avg_resolution, 2) if avg_resolution else None,
"total_similar_incidents": len(similar_incidents)
}
def _extract_action_items(self, resolution_text: str) -> List[str]:
"""Extract potential action items from resolution text."""
if not resolution_text:
return []
actions = []
text_lower = resolution_text.lower()
# Look for common action patterns
action_patterns = [
r'restart(?:ed)?\s+(\w+(?:\s+\w+)*)',
r'clear(?:ed)?\s+(\w+(?:\s+\w+)*)',
r'update(?:d)?\s+(\w+(?:\s+\w+)*)',
r'fix(?:ed)?\s+(\w+(?:\s+\w+)*)',
r'roll(?:ed)?\s+back\s+(\w+(?:\s+\w+)*)',
r'scale(?:d)?\s+(\w+(?:\s+\w+)*)',
r'deploy(?:ed)?\s+(\w+(?:\s+\w+)*)',
]
for pattern in action_patterns:
matches = re.findall(pattern, text_lower)
for match in matches:
# Extract the base action word from the pattern
if 'roll' in pattern and 'back' in pattern:
action = f"rollback {match}".strip()
elif 'restart' in pattern:
action = f"restart {match}".strip()
elif 'clear' in pattern:
action = f"clear {match}".strip()
elif 'update' in pattern:
action = f"update {match}".strip()
elif 'fix' in pattern:
action = f"fix {match}".strip()
elif 'scale' in pattern:
action = f"scale {match}".strip()
elif 'deploy' in pattern:
action = f"deploy {match}".strip()
else:
# Fallback to original logic
base_pattern = pattern.split('(')[0].replace('(?:ed)?', '').replace('(?:d)?', '')
action = f"{base_pattern.replace(r'\s+', ' ')} {match}".strip()
actions.append(action)
# Look for explicit steps
if 'step' in text_lower or 'action' in text_lower:
sentences = resolution_text.split('.')
for sentence in sentences:
if any(word in sentence.lower() for word in ['step', 'action', 'fix', 'solution']):
actions.append(sentence.strip())
return actions[:5] # Limit to top 5 actions
def _identify_common_patterns(self, keywords: List[str], incidents: List[IncidentSimilarity]) -> List[str]:
"""Identify common patterns across similar incidents."""
patterns = []
# Service patterns
all_services = []
for incident in incidents:
all_services.extend(incident.matched_services)
if all_services:
common_services = [service for service in set(all_services) if all_services.count(service) >= 2]
if common_services:
patterns.append(f"Common services affected: {', '.join(common_services)}")
# Keyword patterns
if keywords:
keyword_counts = {}
for keyword in keywords:
keyword_counts[keyword] = keyword_counts.get(keyword, 0) + 1
frequent_keywords = [k for k, v in keyword_counts.items() if v >= 2 and len(k) > 3]
if frequent_keywords:
patterns.append(f"Common keywords: {', '.join(frequent_keywords[:3])}")
# Resolution time patterns
resolution_times = [inc.resolution_time_hours for inc in incidents if inc.resolution_time_hours is not None]
if resolution_times:
avg_time = sum(resolution_times) / len(resolution_times)
if avg_time < 1:
patterns.append("These incidents typically resolve quickly (< 1 hour)")
elif avg_time > 4:
patterns.append("These incidents typically take longer to resolve (> 4 hours)")
else:
patterns.append(f"These incidents typically resolve in {avg_time:.1f} hours")
return patterns