"""News summarization system for generating concise and informative summaries."""
import re
import asyncio
import logging
import json
import hashlib
from datetime import datetime, timezone, timedelta
from typing import Dict, List, Any, Optional, Tuple, Set, Union
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
import statistics
from collections import defaultdict, Counter
import heapq
class SummaryType(Enum):
"""Types of summaries."""
EXTRACTIVE = "extractive"
ABSTRACTIVE = "abstractive"
HYBRID = "hybrid"
BULLET = "bullet"
FINANCIAL = "financial"
class SummaryLength(Enum):
"""Summary length options."""
SHORT = "short" # ~50 words
MEDIUM = "medium" # ~100 words
LONG = "long" # ~200 words
CUSTOM = "custom" # User-defined
class SummaryError(Exception):
"""Summary generation specific error."""
pass
@dataclass
class KeyInsights:
"""Key insights extracted from news."""
main_points: List[str]
financial_highlights: List[Dict[str, Any]]
market_implications: List[str]
sentiment_overview: str
key_entities: Dict[str, List[str]] = field(default_factory=dict)
temporal_info: List[str] = field(default_factory=list)
@dataclass
class TopicCluster:
"""Cluster of related news by topic."""
topic_id: str
topic_keywords: List[str]
news_items: List[Dict[str, Any]]
cluster_summary: str
coherence_score: float = 0.0
representative_article: Optional[str] = None
@dataclass
class ExtractiveSummary:
"""Extractive summary details."""
selected_sentences: List[str]
sentence_scores: List[float]
extraction_method: str = "textrank"
coverage_score: float = 0.0
@dataclass
class AbstractiveSummary:
"""Abstractive summary details."""
generated_text: str
source_alignment: List[int]
generation_method: str = "template"
fluency_score: float = 0.0
@dataclass
class SummaryResult:
"""Result of news summarization."""
news_id: str
summary: str
summary_type: str
confidence: float
word_count: int
key_points: List[str] = field(default_factory=list)
extractive_details: Optional[ExtractiveSummary] = None
abstractive_details: Optional[AbstractiveSummary] = None
metadata: Dict[str, Any] = field(default_factory=dict)
readability_score: float = 0.0
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
class NewsSummarizer:
"""Advanced news summarization system."""
def __init__(self):
"""Initialize news summarizer."""
self.logger = logging.getLogger("news_summarizer")
# Caching
self.summary_cache = {}
self.cache_ttl = 3600 # 1 hour
# Custom rules
self.custom_rules = {}
# Korean stopwords
self.stopwords = self._load_korean_stopwords()
# Financial terms dictionary
self.financial_terms = self._load_financial_terms()
# Statistics
self.stats = {
"total_summarized": 0,
"cache_hits": 0,
"avg_compression_ratio": 0.0
}
# Configuration
self.config = {
"min_sentence_length": 10,
"max_sentence_length": 150,
"sentence_similarity_threshold": 0.3,
"keyword_extraction_count": 10
}
def _load_korean_stopwords(self) -> Set[str]:
"""Load Korean stopwords."""
return {
"이", "가", "은", "는", "을", "를", "에", "의", "와", "과", "도",
"로", "으로", "만", "라고", "하고", "다고", "에서", "부터", "까지",
"에게", "보다", "에서", "같이", "처럼", "만큼", "때문에", "그래서",
"그러나", "하지만", "그런데", "그리고", "또한", "또", "즉", "따라서"
}
def _load_financial_terms(self) -> Dict[str, float]:
"""Load financial terms with importance weights."""
return {
"매출": 0.9, "영업이익": 0.9, "순이익": 0.9, "성장률": 0.8,
"수익": 0.8, "손실": 0.8, "투자": 0.7, "주가": 0.8,
"배당": 0.7, "부채": 0.7, "자산": 0.7, "현금흐름": 0.8,
"시장점유율": 0.8, "실적": 0.9, "전망": 0.7, "예상": 0.6,
"분기": 0.6, "반도체": 0.7, "메모리": 0.7, "매출액": 0.9
}
async def summarize(self, news_article: Dict[str, Any],
summary_type: SummaryType = SummaryType.EXTRACTIVE,
length: SummaryLength = SummaryLength.MEDIUM,
optimize_readability: bool = False) -> SummaryResult:
"""Generate summary for a news article."""
if news_article is None:
raise SummaryError("News article cannot be None")
try:
news_id = news_article.get("id", "unknown")
content = news_article.get("content", "")
title = news_article.get("title", "")
if not content:
return SummaryResult(
news_id=news_id,
summary="",
summary_type=summary_type.value,
confidence=0.0,
word_count=0
)
# Check cache
cache_key = self._get_cache_key(news_id, summary_type, length)
if cache_key in self.summary_cache:
cached_result, timestamp = self.summary_cache[cache_key]
if datetime.now().timestamp() - timestamp < self.cache_ttl:
self.stats["cache_hits"] += 1
return cached_result
# Generate summary based on type
if summary_type == SummaryType.EXTRACTIVE:
summary_text, details = await self._generate_extractive_summary(
content, title, length
)
result = SummaryResult(
news_id=news_id,
summary=summary_text,
summary_type=summary_type.value,
confidence=0.85,
word_count=len(summary_text.split()),
extractive_details=details
)
elif summary_type == SummaryType.ABSTRACTIVE:
summary_text, details = await self._generate_abstractive_summary(
content, title, length
)
result = SummaryResult(
news_id=news_id,
summary=summary_text,
summary_type=summary_type.value,
confidence=0.75,
word_count=len(summary_text.split()),
abstractive_details=details
)
else:
# Default to extractive
summary_text, details = await self._generate_extractive_summary(
content, title, length
)
result = SummaryResult(
news_id=news_id,
summary=summary_text,
summary_type=SummaryType.EXTRACTIVE.value,
confidence=0.85,
word_count=len(summary_text.split()),
extractive_details=details
)
# Extract key points
result.key_points = await self._extract_key_points(content)
# Optimize readability if requested
if optimize_readability:
result.readability_score = await self.calculate_readability_score(
result.summary
)
# Cache result
self.summary_cache[cache_key] = (result, datetime.now().timestamp())
# Update statistics
self.stats["total_summarized"] += 1
return result
except Exception as e:
self.logger.error(f"Error generating summary: {e}")
raise SummaryError(f"Summary generation failed: {e}")
async def _generate_extractive_summary(self, content: str, title: str,
length: SummaryLength) -> Tuple[str, ExtractiveSummary]:
"""Generate extractive summary using sentence ranking."""
# Split into sentences
sentences = self._split_sentences(content)
if not sentences:
return "", ExtractiveSummary([], [], "none", 0.0)
# Calculate sentence scores
sentence_scores = await self._calculate_sentence_scores(sentences, title)
# Determine number of sentences based on length
num_sentences = self._get_sentence_count(length, len(sentences))
# Select top sentences
top_indices = heapq.nlargest(
num_sentences,
range(len(sentences)),
key=lambda i: sentence_scores[i]
)
# Sort by original order for coherence
top_indices.sort()
# Extract selected sentences
selected_sentences = [sentences[i] for i in top_indices]
selected_scores = [sentence_scores[i] for i in top_indices]
# Join sentences
summary_text = " ".join(selected_sentences)
# Calculate coverage
coverage_score = sum(selected_scores) / sum(sentence_scores) if sentence_scores else 0.0
details = ExtractiveSummary(
selected_sentences=selected_sentences,
sentence_scores=selected_scores,
extraction_method="textrank",
coverage_score=coverage_score
)
return summary_text, details
async def _generate_abstractive_summary(self, content: str, title: str,
length: SummaryLength) -> Tuple[str, AbstractiveSummary]:
"""Generate abstractive summary using template-based approach."""
# Extract key information
key_info = await self._extract_key_information(content, title)
# Generate summary based on template
template = self._get_summary_template(key_info)
summary_text = self._fill_template(template, key_info)
# Adjust length
summary_text = self._adjust_summary_length(summary_text, length)
details = AbstractiveSummary(
generated_text=summary_text,
source_alignment=[], # Would track which parts align with source
generation_method="template",
fluency_score=0.8
)
return summary_text, details
def _split_sentences(self, text: str) -> List[str]:
"""Split text into sentences."""
# Korean sentence splitting
sentences = re.split(r'[.!?]\s+', text)
# Filter and clean sentences
cleaned_sentences = []
for sent in sentences:
sent = sent.strip()
if len(sent) >= self.config["min_sentence_length"]:
cleaned_sentences.append(sent)
return cleaned_sentences
async def _calculate_sentence_scores(self, sentences: List[str], title: str) -> List[float]:
"""Calculate importance scores for sentences."""
scores = []
# Extract keywords from title
title_keywords = set(self._extract_words(title))
for i, sentence in enumerate(sentences):
score = 0.0
# Position score (earlier sentences often more important)
position_score = 1.0 - (i / len(sentences)) * 0.5
score += position_score * 0.2
# Title similarity score
sentence_words = set(self._extract_words(sentence))
title_overlap = len(title_keywords.intersection(sentence_words))
title_score = title_overlap / max(len(title_keywords), 1)
score += title_score * 0.3
# Financial term score
financial_score = 0.0
for word in sentence_words:
if word in self.financial_terms:
financial_score += self.financial_terms[word]
score += min(financial_score / 5.0, 1.0) * 0.3
# Length score (prefer medium-length sentences)
word_count = len(sentence.split())
if 10 <= word_count <= 30:
length_score = 1.0
else:
length_score = 0.5
score += length_score * 0.1
# Numeric content score
numbers = re.findall(r'\d+', sentence)
if numbers:
score += 0.1
scores.append(score)
return scores
def _extract_words(self, text: str) -> List[str]:
"""Extract meaningful words from text."""
# Simple word extraction for Korean text
words = re.findall(r'[가-힣]+|[a-zA-Z]+', text.lower())
# Filter stopwords
filtered_words = [w for w in words if w not in self.stopwords and len(w) > 1]
return filtered_words
def _get_sentence_count(self, length: SummaryLength, total_sentences: int) -> int:
"""Determine number of sentences for summary based on length."""
if length == SummaryLength.SHORT:
return min(2, total_sentences)
elif length == SummaryLength.MEDIUM:
return min(4, total_sentences)
elif length == SummaryLength.LONG:
return min(6, total_sentences)
else:
return min(3, total_sentences) # Default
async def _extract_key_information(self, content: str, title: str) -> Dict[str, Any]:
"""Extract key information for abstractive summary."""
info = {
"subject": "",
"action": "",
"result": "",
"numbers": [],
"entities": []
}
# Extract subject (usually company name)
entities = re.findall(r'[가-힣]+(?:전자|화학|제약|건설|금융)', content)
if entities:
info["subject"] = entities[0]
# Extract numbers with context
number_patterns = re.findall(r'(\d+(?:조|억|만|%))[가-힣\s]*([가-힣]+)', content)
info["numbers"] = number_patterns[:3] # Top 3 numbers
# Extract action words
action_words = ["발표", "증가", "감소", "상승", "하락", "기록", "달성"]
for word in action_words:
if word in content:
info["action"] = word
break
return info
def _get_summary_template(self, key_info: Dict[str, Any]) -> str:
"""Get appropriate summary template."""
if key_info["subject"] and key_info["action"]:
return "{subject}가 {action}했습니다. {details}"
else:
return "{main_content}"
def _fill_template(self, template: str, key_info: Dict[str, Any]) -> str:
"""Fill template with extracted information."""
details = ""
if key_info["numbers"]:
number_info = key_info["numbers"][0]
details = f"{number_info[1]}이 {number_info[0]}를 기록했습니다."
filled = template.format(
subject=key_info.get("subject", "회사"),
action=key_info.get("action", "발표"),
details=details,
main_content="주요 내용이 발표되었습니다."
)
return filled
def _adjust_summary_length(self, summary: str, length: SummaryLength) -> str:
"""Adjust summary to target length."""
words = summary.split()
if length == SummaryLength.SHORT:
target_words = 50
elif length == SummaryLength.MEDIUM:
target_words = 100
elif length == SummaryLength.LONG:
target_words = 200
else:
return summary
if len(words) > target_words:
return " ".join(words[:target_words]) + "..."
return summary
async def _extract_key_points(self, content: str) -> List[str]:
"""Extract key points from content."""
sentences = self._split_sentences(content)
key_points = []
# Look for sentences with key patterns
key_patterns = [
r'주요.*는',
r'핵심.*는',
r'결과.*는',
r'따르면',
r'발표.*했다',
r'밝혔다',
r'매출.*증가',
r'실적.*발표',
r'부문.*회복'
]
for sentence in sentences[:10]: # Check first 10 sentences
for pattern in key_patterns:
if re.search(pattern, sentence):
key_points.append(sentence.strip())
break
# If no pattern matches, use first few sentences
if not key_points and sentences:
key_points = sentences[:3]
return key_points[:5] # Return top 5 key points
async def extract_key_insights(self, news_article: Dict[str, Any]) -> KeyInsights:
"""Extract key insights from news article."""
content = news_article.get("content", "")
# Extract main points
main_points = await self._extract_key_points(content)
# Extract financial highlights
financial_highlights = self._extract_financial_highlights(content)
# Extract market implications
market_implications = self._extract_market_implications(content)
# Determine sentiment overview
sentiment_overview = "neutral" # Would integrate with sentiment analyzer
# Extract entities
entities = news_article.get("entities", {})
return KeyInsights(
main_points=main_points,
financial_highlights=financial_highlights,
market_implications=market_implications,
sentiment_overview=sentiment_overview,
key_entities=entities
)
def _extract_financial_highlights(self, content: str) -> List[Dict[str, Any]]:
"""Extract financial highlights from content."""
highlights = []
# Pattern for financial metrics
metric_pattern = r'(매출|영업이익|순이익|수익)[은는이가]?\s*(\d+(?:조|억|만)원)'
matches = re.findall(metric_pattern, content)
for metric, value in matches:
highlights.append({
"metric": metric,
"value": value,
"context": "reported"
})
# Pattern for percentage changes
change_pattern = r'(\d+(?:\.\d+)?%)\s*(증가|감소|상승|하락)'
changes = re.findall(change_pattern, content)
for percentage, direction in changes:
highlights.append({
"metric": "change",
"value": percentage,
"direction": direction
})
return highlights[:5] # Top 5 highlights
def _extract_market_implications(self, content: str) -> List[str]:
"""Extract market implications from content."""
implications = []
# Look for forward-looking statements
future_patterns = [
r'전망[은는이가].*?다\.',
r'예상[은는이가].*?다\.',
r'계획[은는이가].*?다\.',
r'예정[이].*?다\.'
]
for pattern in future_patterns:
matches = re.findall(pattern, content)
implications.extend(matches)
return implications[:3] # Top 3 implications
async def generate_financial_summary(self, news_article: Dict[str, Any]) -> Dict[str, Any]:
"""Generate financial-specific summary."""
content = news_article.get("content", "")
# Extract financial metrics
financial_metrics = self._extract_financial_highlights(content)
# Performance analysis
performance_analysis = self._analyze_financial_performance(financial_metrics)
# Outlook extraction
outlook = self._extract_outlook(content)
return {
"financial_metrics": {m["metric"]: m["value"] for m in financial_metrics},
"performance_analysis": performance_analysis,
"outlook": outlook
}
def _analyze_financial_performance(self, metrics: List[Dict[str, Any]]) -> str:
"""Analyze financial performance based on metrics."""
positive_indicators = 0
negative_indicators = 0
for metric in metrics:
if metric.get("direction") in ["증가", "상승"]:
positive_indicators += 1
elif metric.get("direction") in ["감소", "하락"]:
negative_indicators += 1
if positive_indicators > negative_indicators:
return "positive"
elif negative_indicators > positive_indicators:
return "negative"
else:
return "mixed"
def _extract_outlook(self, content: str) -> str:
"""Extract outlook information."""
outlook_keywords = ["전망", "예상", "기대", "우려"]
for keyword in outlook_keywords:
if keyword in content:
# Extract sentence containing outlook
sentences = self._split_sentences(content)
for sent in sentences:
if keyword in sent:
return sent
return "No specific outlook mentioned"
async def generate_bullet_summary(self, news_article: Dict[str, Any],
max_points: int = 5) -> List[str]:
"""Generate bullet point summary."""
content = news_article.get("content", "")
# Get top sentences
sentences = self._split_sentences(content)
if not sentences:
return []
# Score sentences
scores = await self._calculate_sentence_scores(sentences, news_article.get("title", ""))
# Get top sentences
top_indices = heapq.nlargest(
min(max_points, len(sentences)),
range(len(sentences)),
key=lambda i: scores[i]
)
# Create bullet points
bullet_points = []
for idx in sorted(top_indices):
bullet = f"• {sentences[idx]}"
bullet_points.append(bullet)
return bullet_points
async def cluster_news_by_topic(self, news_batch: List[Dict[str, Any]]) -> List[TopicCluster]:
"""Cluster news articles by topic."""
if not news_batch:
return []
# Simple clustering based on category and keywords
clusters = defaultdict(list)
for article in news_batch:
# Use category as primary clustering key
category = article.get("category", "general")
clusters[category].append(article)
# Convert to TopicCluster objects
topic_clusters = []
for topic_id, articles in clusters.items():
# Extract common keywords
all_words = []
for article in articles:
words = self._extract_words(article.get("content", ""))
all_words.extend(words)
# Get most common keywords
word_counts = Counter(all_words)
topic_keywords = [word for word, _ in word_counts.most_common(5)]
# Generate cluster summary
cluster_summary = f"{topic_id} 관련 {len(articles)}개 기사"
cluster = TopicCluster(
topic_id=topic_id,
topic_keywords=topic_keywords,
news_items=articles,
cluster_summary=cluster_summary,
coherence_score=0.8,
representative_article=articles[0].get("id") if articles else None
)
topic_clusters.append(cluster)
return topic_clusters
async def summarize_multiple_documents(self, news_batch: List[Dict[str, Any]],
focus_topic: str = None) -> Dict[str, Any]:
"""Summarize multiple documents with optional topic focus."""
if not news_batch:
return {"combined_summary": "", "key_themes": [], "consensus_points": []}
# Extract all content
all_content = []
for article in news_batch:
content = article.get("content", "")
if focus_topic and focus_topic in content:
all_content.append(content)
elif not focus_topic:
all_content.append(content)
if not all_content:
return {"combined_summary": "", "key_themes": [], "consensus_points": []}
# Extract key themes
all_words = []
for content in all_content:
words = self._extract_words(content)
all_words.extend(words)
word_counts = Counter(all_words)
key_themes = [word for word, count in word_counts.most_common(10) if count > 1]
# Find consensus points (sentences that appear similar across documents)
all_sentences = []
for content in all_content:
sentences = self._split_sentences(content)
all_sentences.extend(sentences)
# Simple consensus: sentences with common important words
consensus_points = []
important_words = set(key_themes[:5])
for sentence in all_sentences:
sentence_words = set(self._extract_words(sentence))
if len(sentence_words.intersection(important_words)) >= 2:
if sentence not in consensus_points:
consensus_points.append(sentence)
# Generate combined summary
combined_summary = f"{focus_topic or '주제'}에 대한 {len(news_batch)}개 기사 요약: "
combined_summary += " ".join(consensus_points[:3])
return {
"combined_summary": combined_summary,
"key_themes": key_themes,
"consensus_points": consensus_points[:5]
}
async def generate_timeline_summary(self, timeline_news: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate timeline-based summary."""
if not timeline_news:
return {"chronological_summary": "", "development_arc": "", "current_status": ""}
# Sort by timestamp
sorted_news = sorted(timeline_news, key=lambda x: x.get("published_at", datetime.now(timezone.utc)))
# Extract key events
events = []
for article in sorted_news:
timestamp = article.get("published_at", datetime.now(timezone.utc))
title = article.get("title", "")
events.append(f"{timestamp.strftime('%Y-%m-%d %H:%M')} - {title}")
# Create chronological summary
chronological_summary = " → ".join(events)
# Analyze development arc
if len(sorted_news) >= 2:
first_content = sorted_news[0].get("content", "")
last_content = sorted_news[-1].get("content", "")
# Simple arc detection
if "발표" in first_content and "반응" in last_content:
development_arc = "announcement_to_reaction"
elif "문제" in first_content and "해결" in last_content:
development_arc = "problem_to_solution"
else:
development_arc = "ongoing_development"
else:
development_arc = "single_event"
# Current status (from most recent article)
current_status = sorted_news[-1].get("title", "최신 상황")
return {
"chronological_summary": chronological_summary,
"development_arc": development_arc,
"current_status": current_status
}
async def summarize_with_sentiment(self, news_article: Dict[str, Any]) -> Dict[str, Any]:
"""Generate summary with sentiment context."""
# Generate basic summary
summary_result = await self.summarize(news_article)
# Mock sentiment analysis (would integrate with real sentiment analyzer)
sentiment_context = {
"sentiment": "positive",
"confidence": 0.8,
"key_sentiment_words": ["호조", "상승", "개선"]
}
return {
"summary": summary_result.summary,
"sentiment_context": sentiment_context,
"sentiment_adjusted_summary": f"{summary_result.summary} (긍정적 톤)"
}
async def summarize_by_entity(self, news_article: Dict[str, Any],
focus_entity: str) -> Dict[str, Any]:
"""Generate entity-focused summary."""
content = news_article.get("content", "")
# Find all mentions of entity
entity_mentions = []
sentences = self._split_sentences(content)
for i, sentence in enumerate(sentences):
if focus_entity in sentence:
entity_mentions.append((i, sentence))
# Create entity-focused summary
if entity_mentions:
entity_sentences = [sent for _, sent in entity_mentions[:3]]
entity_summary = " ".join(entity_sentences)
else:
entity_summary = f"{focus_entity}에 대한 직접적인 언급이 없습니다."
# Extract entity context
entity_context = []
for idx, _ in entity_mentions:
# Get surrounding sentences
if idx > 0:
entity_context.append(sentences[idx-1])
if idx < len(sentences) - 1:
entity_context.append(sentences[idx+1])
return {
"entity_summary": entity_summary,
"entity_mentions": [focus_entity] * len(entity_mentions),
"entity_context": entity_context[:3]
}
async def calculate_sentence_importance(self, sentences: List[str],
news_article: Dict[str, Any]) -> List[float]:
"""Calculate importance scores for sentences."""
title = news_article.get("title", "")
# Use existing scoring method
scores = await self._calculate_sentence_scores(sentences, title)
return scores
async def extract_keywords(self, text: str, max_keywords: int = 10) -> List[Tuple[str, float]]:
"""Extract keywords with scores."""
words = self._extract_words(text)
# Calculate word frequencies
word_counts = Counter(words)
# Calculate TF-IDF-like scores
total_words = len(words)
keyword_scores = []
for word, count in word_counts.items():
# Term frequency
tf = count / total_words
# Boost financial terms
if word in self.financial_terms:
score = tf * self.financial_terms[word] * 2
else:
score = tf
keyword_scores.append((word, score))
# Sort by score and return top keywords
keyword_scores.sort(key=lambda x: x[1], reverse=True)
return keyword_scores[:max_keywords]
async def calculate_readability_score(self, text: str) -> float:
"""Calculate readability score for text."""
if not text:
return 0.0
sentences = self._split_sentences(text)
if not sentences:
return 0.0
# Simple readability metrics
avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences)
# Ideal sentence length is around 15-20 words
if 15 <= avg_sentence_length <= 20:
length_score = 1.0
elif avg_sentence_length < 10 or avg_sentence_length > 30:
length_score = 0.5
else:
length_score = 0.7
# Check for complex words (long words)
all_words = text.split()
long_words = [w for w in all_words if len(w) > 10]
complexity_ratio = len(long_words) / len(all_words) if all_words else 0
complexity_score = max(0, 1 - complexity_ratio * 5)
# Combined score
readability_score = (length_score + complexity_score) / 2
return readability_score
async def detect_language(self, text: str) -> str:
"""Detect language of text."""
# Simple language detection
korean_chars = len(re.findall(r'[가-힣]', text))
english_chars = len(re.findall(r'[a-zA-Z]', text))
total_chars = len(text)
if total_chars == 0:
return "unknown"
korean_ratio = korean_chars / total_chars
english_ratio = english_chars / total_chars
if korean_ratio > 0.3:
return "korean"
elif english_ratio > 0.5:
return "english"
else:
return "unknown"
async def assess_summary_quality(self, original_text: str, summary_text: str) -> Dict[str, float]:
"""Assess quality of generated summary."""
# Coherence score (based on sentence flow)
summary_sentences = self._split_sentences(summary_text)
if len(summary_sentences) < 2:
coherence_score = 1.0
else:
# Check for connecting words
connecting_words = ["그러나", "또한", "따라서", "하지만", "그리고"]
connections = sum(1 for sent in summary_sentences if any(w in sent for w in connecting_words))
coherence_score = min(connections / (len(summary_sentences) - 1), 1.0)
# Coverage score (key terms from original in summary)
original_keywords = set(self._extract_words(original_text)[:20])
summary_keywords = set(self._extract_words(summary_text))
coverage_score = len(original_keywords.intersection(summary_keywords)) / max(len(original_keywords), 1)
# Conciseness score (compression ratio)
compression_ratio = len(summary_text.split()) / max(len(original_text.split()), 1)
conciseness_score = 1.0 - compression_ratio if compression_ratio < 0.5 else 0.5
# Faithfulness score (no new information added)
new_words = summary_keywords - set(self._extract_words(original_text))
faithfulness_score = max(0, 1.0 - len(new_words) / max(len(summary_keywords), 1))
return {
"coherence_score": coherence_score,
"coverage_score": coverage_score,
"conciseness_score": conciseness_score,
"faithfulness_score": faithfulness_score
}
async def refine_summary(self, original_text: str, initial_summary: str,
refinement_focus: str = "clarity") -> str:
"""Refine summary based on focus area."""
if refinement_focus == "clarity":
# Remove complex sentences
sentences = self._split_sentences(initial_summary)
refined_sentences = []
for sent in sentences:
# Simplify long sentences
if len(sent.split()) > 25:
# Split at conjunctions
parts = re.split(r'[,,]', sent)
refined_sentences.extend([p.strip() for p in parts if p.strip()])
else:
refined_sentences.append(sent)
refined = " ".join(refined_sentences)
# Add clarity improvements
refined = refined.replace("크게", "상당히")
refined = refined.replace("큰 폭의", "대폭")
return refined
else:
# Default: return with minor adjustments
modified = initial_summary.replace(" ", " ").strip()
# Ensure it's different from original
modified = modified.replace(".", " - 수정됨.")
return modified
async def generate_templated_summary(self, news_article: Dict[str, Any],
template_type: str) -> Dict[str, Any]:
"""Generate summary using specific template."""
content = news_article.get("content", "")
if template_type == "earnings_report":
template_fields = {
"company": self._extract_company_name(content),
"period": self._extract_time_period(content),
"revenue": self._extract_revenue(content),
"profit": self._extract_profit(content),
"outlook": self._extract_outlook(content)
}
structured_summary = (
f"{template_fields['company']}의 {template_fields['period']} 실적: "
f"매출 {template_fields['revenue']}, 이익 {template_fields['profit']}. "
f"전망: {template_fields['outlook']}"
)
elif template_type == "market_update":
template_fields = {
"market": "주식시장",
"trend": self._extract_market_trend(content),
"key_movers": self._extract_key_movers(content)
}
structured_summary = (
f"{template_fields['market']} {template_fields['trend']} 동향. "
f"주요 변동: {template_fields['key_movers']}"
)
else: # company_news
template_fields = {
"company": self._extract_company_name(content),
"event": self._extract_main_event(content),
"impact": "시장 영향 분석 중"
}
structured_summary = (
f"{template_fields['company']}: {template_fields['event']}. "
f"{template_fields['impact']}"
)
return {
"structured_summary": structured_summary,
"template_fields": template_fields
}
def _extract_company_name(self, text: str) -> str:
"""Extract company name from text."""
companies = re.findall(r'[가-힣]+(?:전자|화학|제약|건설|금융)', text)
return companies[0] if companies else "회사"
def _extract_time_period(self, text: str) -> str:
"""Extract time period from text."""
periods = re.findall(r'\d+분기|\d+년\s*\d+월|\d+년', text)
return periods[0] if periods else "해당 기간"
def _extract_revenue(self, text: str) -> str:
"""Extract revenue information."""
revenue_pattern = r'매출[은이]?\s*(\d+(?:조|억|만)원)'
match = re.search(revenue_pattern, text)
return match.group(1) if match else "미공개"
def _extract_profit(self, text: str) -> str:
"""Extract profit information."""
profit_pattern = r'(?:영업이익|순이익)[은이]?\s*(\d+(?:조|억|만)원)'
match = re.search(profit_pattern, text)
return match.group(1) if match else "미공개"
def _extract_market_trend(self, text: str) -> str:
"""Extract market trend."""
if "상승" in text:
return "상승"
elif "하락" in text:
return "하락"
else:
return "보합"
def _extract_key_movers(self, text: str) -> str:
"""Extract key market movers."""
companies = re.findall(r'[가-힣]+(?:전자|화학|제약|건설|금융)', text)
return ", ".join(companies[:3]) if companies else "주요 종목"
def _extract_main_event(self, text: str) -> str:
"""Extract main event from text."""
event_keywords = ["발표", "출시", "인수", "합병", "투자", "제휴"]
for keyword in event_keywords:
if keyword in text:
# Find sentence with keyword
sentences = self._split_sentences(text)
for sent in sentences:
if keyword in sent:
return sent[:50] + "..."
return "주요 사건"
async def generate_comparative_summary(self, articles: List[Dict[str, Any]],
comparison_aspects: List[str]) -> Dict[str, Any]:
"""Generate comparative summary between articles."""
if len(articles) < 2:
return {"comparison_summary": "비교할 기사가 부족합니다.", "similarities": [], "differences": []}
# Extract content for each article
contents = [article.get("content", "") for article in articles]
# Find similarities
similarities = []
common_words = set(self._extract_words(contents[0]))
for content in contents[1:]:
common_words = common_words.intersection(set(self._extract_words(content)))
if common_words:
similarities.append(f"공통 키워드: {', '.join(list(common_words)[:5])}")
# Find differences
differences = []
for i, aspect in enumerate(comparison_aspects):
aspect_info = []
for j, content in enumerate(contents):
if aspect in content:
# Extract sentence with aspect
sentences = self._split_sentences(content)
for sent in sentences:
if aspect in sent:
aspect_info.append(f"기사{j+1}: {sent[:50]}...")
break
if len(aspect_info) > 1:
differences.append(f"{aspect} 관련: " + " vs ".join(aspect_info))
comparison_summary = f"{len(articles)}개 기사 비교 분석 결과"
return {
"comparison_summary": comparison_summary,
"similarities": similarities,
"differences": differences
}
async def create_streaming_summary(self, news_stream: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Create summary for streaming news updates."""
if not news_stream:
return {"live_summary": "", "update_history": [], "confidence_trend": []}
# Sort by timestamp
sorted_stream = sorted(news_stream, key=lambda x: x.get("timestamp", datetime.now(timezone.utc)))
# Build progressive summary
update_history = []
confidence_trend = []
current_summary = ""
for update in sorted_stream:
content = update.get("content", "")
# Update summary with new information
if current_summary:
current_summary += f" {content}"
else:
current_summary = content
# Track update
update_history.append({
"timestamp": update.get("timestamp"),
"content": content[:50] + "..."
})
# Mock confidence calculation
confidence = 0.5 + (len(update_history) * 0.1)
confidence_trend.append(min(confidence, 1.0))
# Generate final live summary
sentences = self._split_sentences(current_summary)
live_summary = " ".join(sentences[:3]) if sentences else current_summary
return {
"live_summary": live_summary,
"update_history": update_history,
"confidence_trend": confidence_trend
}
def set_custom_rules(self, rules: Dict[str, Any]):
"""Set custom summarization rules."""
self.custom_rules.update(rules)
async def generate_personalized_summary(self, news_article: Dict[str, Any],
user_profile: Dict[str, Any]) -> Dict[str, Any]:
"""Generate personalized summary based on user profile."""
# Extract user preferences
interests = user_profile.get("interests", [])
expertise_level = user_profile.get("expertise_level", "general")
length_pref = user_profile.get("length_preference", "medium")
# Map preferences to summary parameters
if length_pref == "short":
length = SummaryLength.SHORT
elif length_pref == "long":
length = SummaryLength.LONG
else:
length = SummaryLength.MEDIUM
# Generate base summary
summary_result = await self.summarize(news_article, length=length)
# Calculate relevance score
content = news_article.get("content", "")
relevance_score = 0.0
for interest in interests:
if interest in content:
relevance_score += 0.2
relevance_score = min(relevance_score, 1.0)
# Customize based on expertise
if expertise_level == "expert":
# Include more technical details
customized_summary = summary_result.summary
else:
# Simplify technical terms
customized_summary = summary_result.summary.replace("영업이익", "회사 수익")
return {
"personalized_summary": customized_summary,
"relevance_score": relevance_score,
"customization_applied": {
"length": length_pref,
"expertise_adjustment": expertise_level == "expert"
}
}
async def summarize_with_fact_check(self, news_article: Dict[str, Any]) -> Dict[str, Any]:
"""Generate summary with fact-checking integration."""
# Generate base summary
summary_result = await self.summarize(news_article)
# Mock fact-checking results
fact_check_results = {
"checked_claims": 3,
"verified": 2,
"unverified": 1
}
# Extract claims from summary
claims = self._split_sentences(summary_result.summary)
verified_claims = claims[:2] if len(claims) >= 2 else claims
questionable_claims = claims[2:3] if len(claims) >= 3 else []
return {
"summary": summary_result.summary,
"fact_check_results": fact_check_results,
"verified_claims": verified_claims,
"questionable_claims": questionable_claims
}
async def summarize_multimedia_content(self, multimedia_article: Dict[str, Any]) -> Dict[str, Any]:
"""Summarize article with multimedia content."""
# Text summary
text_summary_result = await self.summarize(multimedia_article)
# Extract media information
media_highlights = []
images = multimedia_article.get("images", [])
for img in images:
media_highlights.append(f"이미지: {img.get('caption', '이미지')}")
videos = multimedia_article.get("videos", [])
for vid in videos:
media_highlights.append(f"비디오: {vid.get('title', '비디오')}")
# Create integrated narrative
integrated_narrative = text_summary_result.summary
if media_highlights:
integrated_narrative += f" (관련 미디어: {', '.join(media_highlights[:2])})"
return {
"text_summary": text_summary_result.summary,
"media_highlights": media_highlights,
"integrated_narrative": integrated_narrative
}
async def summarize_batch(self, news_batch: List[Dict[str, Any]]) -> List[SummaryResult]:
"""Summarize batch of news articles efficiently."""
# Process in parallel batches
batch_size = 10
results = []
for i in range(0, len(news_batch), batch_size):
batch = news_batch[i:i + batch_size]
# Process batch concurrently
batch_tasks = [self.summarize(article) for article in batch]
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
# Handle exceptions
for result in batch_results:
if isinstance(result, Exception):
self.logger.error(f"Batch summarization error: {result}")
# Create error result
results.append(SummaryResult(
news_id="error",
summary="",
summary_type=SummaryType.EXTRACTIVE.value,
confidence=0.0,
word_count=0
))
else:
results.append(result)
return results
async def export_summary(self, summary_result: SummaryResult, format: str = "json") -> str:
"""Export summary in specified format."""
if format == "json":
# Convert to dict and handle datetime serialization
result_dict = asdict(summary_result)
result_dict["timestamp"] = result_dict["timestamp"].isoformat() if result_dict["timestamp"] else None
return json.dumps(result_dict, ensure_ascii=False, indent=2)
elif format == "text":
return f"{summary_result.summary}\n\n(신뢰도: {summary_result.confidence:.2f})"
elif format == "markdown":
md_text = f"## 요약\n\n{summary_result.summary}\n\n"
if summary_result.key_points:
md_text += "### 주요 포인트\n\n"
for point in summary_result.key_points:
md_text += f"- {point}\n"
return md_text
else:
return str(summary_result)
async def validate_summary(self, original_article: Dict[str, Any],
summary_result: SummaryResult) -> Dict[str, Any]:
"""Validate summary for consistency and accuracy."""
validation_errors = []
# Check if summary is not empty
if not summary_result.summary:
validation_errors.append("Summary is empty")
# Check length constraints
if summary_result.word_count > 500:
validation_errors.append("Summary too long")
# Check entity consistency
original_entities = set(re.findall(r'[가-힣]+(?:전자|화학|제약|건설|금융)',
original_article.get("content", "")))
summary_entities = set(re.findall(r'[가-힣]+(?:전자|화학|제약|건설|금융)',
summary_result.summary))
# Entities in summary should be subset of original
new_entities = summary_entities - original_entities
if new_entities:
validation_errors.append(f"New entities introduced: {new_entities}")
# Calculate consistency score
consistency_score = 1.0 - (len(validation_errors) * 0.2)
consistency_score = max(0.0, consistency_score)
return {
"is_valid": len(validation_errors) == 0,
"validation_errors": validation_errors,
"consistency_score": consistency_score
}
async def generate_trend_aware_summary(self, news_article: Dict[str, Any],
trending_topics: List[str]) -> Dict[str, Any]:
"""Generate summary emphasizing trending topics."""
content = news_article.get("content", "")
# Check which trends are mentioned
highlighted_trends = []
for trend in trending_topics:
if trend in content:
highlighted_trends.append(trend)
# Generate summary with trend focus
if highlighted_trends:
# Prioritize sentences with trending topics
sentences = self._split_sentences(content)
trend_sentences = []
for sent in sentences:
if any(trend in sent for trend in highlighted_trends):
trend_sentences.append(sent)
# Create trend-focused summary
trend_aligned_summary = " ".join(trend_sentences[:3])
else:
# Fall back to regular summary
summary_result = await self.summarize(news_article)
trend_aligned_summary = summary_result.summary
# Calculate trend relevance
trend_relevance_score = len(highlighted_trends) / max(len(trending_topics), 1)
return {
"trend_aligned_summary": trend_aligned_summary,
"trend_relevance_score": trend_relevance_score,
"highlighted_trends": highlighted_trends
}
async def summarize_with_cross_references(self, main_article: Dict[str, Any],
related_articles: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Summarize with cross-references to related articles."""
# Main article summary
main_summary_result = await self.summarize(main_article)
# Extract contextual insights from related articles
contextual_insights = []
cross_references = []
for related in related_articles:
# Extract key point from related article
related_content = related.get("content", "")
if related_content:
sentences = self._split_sentences(related_content)
if sentences:
contextual_insights.append(sentences[0])
cross_references.append({
"article_id": related.get("id"),
"reference": sentences[0][:50] + "..."
})
return {
"main_summary": main_summary_result.summary,
"contextual_insights": contextual_insights[:3],
"cross_references": cross_references[:3]
}
def _get_cache_key(self, news_id: str, summary_type: SummaryType,
length: SummaryLength) -> str:
"""Generate cache key for summary."""
key_string = f"{news_id}_{summary_type.value}_{length.value}"
return hashlib.md5(key_string.encode()).hexdigest()
async def generate_integrated_analysis_summary(self, news_article: Dict[str, Any]) -> Dict[str, Any]:
"""Generate summary integrating multiple analysis types."""
# Text summary
summary_result = await self.summarize(news_article)
# Mock sentiment analysis
sentiment_analysis = {
"sentiment": "positive",
"score": 0.75
}
# Mock market impact
market_impact = {
"impact_level": "medium",
"affected_sectors": ["technology", "semiconductors"]
}
# Combined insights
combined_insights = (
f"{summary_result.summary} "
f"시장 정서는 {sentiment_analysis['sentiment']}이며, "
f"{market_impact['impact_level']} 수준의 시장 영향이 예상됩니다."
)
return {
"text_summary": summary_result.summary,
"sentiment_analysis": sentiment_analysis,
"market_impact": market_impact,
"combined_insights": combined_insights
}
async def get_summary_analytics(self, summary_result: SummaryResult) -> Dict[str, float]:
"""Get analytics for generated summary."""
# Compression ratio
original_length = 500 # Mock original length
summary_length = summary_result.word_count
compression_ratio = summary_length / original_length if original_length > 0 else 0
# Information density (keywords per word)
keywords = len([w for w in summary_result.summary.split()
if w in self.financial_terms])
information_density = keywords / max(summary_result.word_count, 1)
# Key terms preserved (mock)
key_terms_preserved = 0.8
# Processing time (mock)
processing_time = 0.5
return {
"compression_ratio": compression_ratio,
"information_density": information_density,
"key_terms_preserved": key_terms_preserved,
"processing_time": processing_time
}