"""Sentiment analyzer for Korean financial news."""
import re
import json
import asyncio
import logging
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional, Tuple, Union
from dataclasses import dataclass, asdict
from pathlib import Path
import hashlib
# Simple text preprocessor for sentiment analysis
class NewsPreprocessor:
"""Simple text preprocessor for sentiment analysis."""
def clean_text(self, text: str) -> str:
"""Clean text for analysis."""
if not text:
return ""
# Remove extra whitespace
cleaned = re.sub(r'\s+', ' ', text.strip())
# Remove HTML tags if any
cleaned = re.sub(r'<[^>]+>', '', cleaned)
return cleaned
def tokenize(self, text: str) -> List[str]:
"""Simple tokenization for Korean text."""
if not text:
return []
# Simple word splitting - in real implementation would use proper Korean tokenizer
words = re.findall(r'[가-힣]+|[a-zA-Z]+|\d+', text)
return [word for word in words if len(word) > 1]
class SentimentError(Exception):
"""Sentiment analysis specific error."""
pass
@dataclass
class SentimentResult:
"""Sentiment analysis result."""
sentiment: str # "positive", "negative", "neutral", "mixed"
score: float # 0.0 to 1.0
confidence: float # 0.0 to 1.0
keywords: List[str] = None
entities: Dict[str, List[str]] = None
language: str = "korean"
sarcasm_detected: bool = False
context_adjusted: bool = False
weighted_score: float = None
explanation: Dict[str, Any] = None
def __post_init__(self):
if self.keywords is None:
self.keywords = []
if self.entities is None:
self.entities = {}
if self.explanation is None:
self.explanation = {}
class SentimentAnalyzer:
"""Advanced sentiment analyzer for Korean financial news."""
def __init__(self):
"""Initialize sentiment analyzer."""
self.preprocessor = NewsPreprocessor()
self.logger = logging.getLogger("sentiment_analyzer")
# Sentiment thresholds
self.thresholds = {
"positive": 0.6,
"negative": 0.4,
"neutral": 0.5
}
# Cache for analyzed texts
self.cache = {}
self.cache_ttl = 3600 # 1 hour
# Custom sentiment rules
self.custom_rules = {}
# Load sentiment lexicons
self._load_sentiment_lexicons()
# Statistics
self.stats = {
"total_analyzed": 0,
"cache_hits": 0,
"positive_count": 0,
"negative_count": 0,
"neutral_count": 0
}
def _load_sentiment_lexicons(self):
"""Load sentiment lexicons for Korean financial terms."""
# Positive financial terms
self.positive_terms = {
# Stock market positive terms
"상승": 0.8, "급등": 0.9, "강세": 0.8, "호재": 0.8, "상한가": 0.95,
"신고가": 0.9, "돌파": 0.7, "매수": 0.6, "추천": 0.7, "목표가상향": 0.8,
"실적개선": 0.8, "성장": 0.7, "이익": 0.6, "수익": 0.6, "흑자": 0.8,
"배당": 0.6, "투자": 0.5, "기대": 0.6, "긍정": 0.7, "호전": 0.7,
# General positive terms
"좋다": 0.6, "훌륭": 0.8, "최고": 0.9, "우수": 0.7, "뛰어나다": 0.8,
"성공": 0.8, "발전": 0.7, "개선": 0.7, "향상": 0.7, "증가": 0.6,
# Market sentiment terms
"낙관": 0.7, "희망": 0.6, "기쁨": 0.7, "만족": 0.6, "안심": 0.6
}
# Negative financial terms
self.negative_terms = {
# Stock market negative terms
"하락": -0.8, "급락": -0.9, "약세": -0.8, "악재": -0.8, "하한가": -0.95,
"신저가": -0.9, "붕괴": -0.9, "매도": -0.6, "투매": -0.8, "목표가하향": -0.8,
"실적악화": -0.8, "손실": -0.7, "적자": -0.8, "부채": -0.6, "위기": -0.8,
"파산": -0.9, "부실": -0.8, "리스크": -0.6, "우려": -0.7, "경고": -0.7,
# General negative terms
"나쁘다": -0.6, "최악": -0.9, "형편없다": -0.8, "실망": -0.7, "걱정": -0.6,
"실패": -0.8, "후퇴": -0.7, "악화": -0.7, "감소": -0.6, "하락": -0.6,
# Market sentiment terms
"비관": -0.7, "절망": -0.8, "불안": -0.7, "공포": -0.8, "패닉": -0.9
}
# Neutral terms
self.neutral_terms = {
"보합": 0.0, "횡보": 0.0, "유지": 0.0, "관망": 0.0, "중립": 0.0,
"현상유지": 0.0, "동결": 0.0, "평가절하": 0.0, "평가절상": 0.0
}
# Financial entities patterns
self.entity_patterns = {
"companies": re.compile(r"[가-힣]+(?:전자|화학|제약|건설|금융|보험|증권|은행|카드|통신|에너지|식품|유통|항공|조선|철강)|삼성전자|LG전자"),
"stock_codes": re.compile(r"\(\d{6}\)|\d{6}"),
"indices": re.compile(r"코스피|코스닥|다우|나스닥|S&P|닛케이"),
"currencies": re.compile(r"원|달러|엔|유로|위안")
}
async def analyze(self, text: Union[str, None]) -> SentimentResult:
"""Analyze sentiment of given text.
Args:
text: Text to analyze
Returns:
SentimentResult object
"""
if text is None or not isinstance(text, str):
if text is None:
return SentimentResult("neutral", 0.5, 1.0)
raise SentimentError("Text must be a string")
if not text or not text.strip():
return SentimentResult("neutral", 0.5, 1.0)
# Check cache first
cache_key = self._get_cache_key(text)
if cache_key in self.cache:
cached_result, timestamp = self.cache[cache_key]
if datetime.now().timestamp() - timestamp < self.cache_ttl:
self.stats["cache_hits"] += 1
return cached_result
try:
# Preprocess text
cleaned_text = self.preprocessor.clean_text(text)
# Detect language
language = self._detect_language(cleaned_text)
# Extract keywords and entities
keywords = await self.extract_keywords(cleaned_text)
entities = await self.detect_financial_entities(cleaned_text)
# Calculate base sentiment score
base_score = self._calculate_base_sentiment(cleaned_text)
# Apply contextual adjustments
adjusted_score = self._apply_contextual_adjustments(
base_score, cleaned_text, entities
)
# Detect sarcasm
sarcasm_detected = self._detect_sarcasm(cleaned_text)
if sarcasm_detected:
adjusted_score = 1.0 - adjusted_score # Invert sentiment
# Calculate confidence
confidence = self._calculate_confidence(cleaned_text, adjusted_score)
# Determine sentiment label
sentiment_label = self._score_to_label(adjusted_score)
# Create result
result = SentimentResult(
sentiment=sentiment_label,
score=adjusted_score,
confidence=confidence,
keywords=[kw[0] for kw in keywords[:10]], # Top 10 keywords
entities=entities,
language=language,
sarcasm_detected=sarcasm_detected
)
# Cache result
self.cache[cache_key] = (result, datetime.now().timestamp())
# Update statistics
self.stats["total_analyzed"] += 1
self.stats[f"{sentiment_label}_count"] += 1
return result
except Exception as e:
self.logger.error(f"Error analyzing sentiment: {e}")
raise SentimentError(f"Sentiment analysis failed: {e}")
async def analyze_news(self, news_item: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze sentiment of a complete news article.
Args:
news_item: News item with title and content
Returns:
Dictionary with detailed sentiment analysis
"""
title = news_item.get("title", "")
content = news_item.get("content", "")
# Analyze title and content separately
title_result = await self.analyze(title)
content_result = await self.analyze(content)
# Calculate overall sentiment (weighted average)
title_weight = 0.3
content_weight = 0.7
overall_score = (
title_result.score * title_weight +
content_result.score * content_weight
)
overall_sentiment = self._score_to_label(overall_score)
# Combine keywords and entities
all_keywords = list(set(title_result.keywords + content_result.keywords))
all_entities = {}
for key in set(title_result.entities.keys()) | set(content_result.entities.keys()):
all_entities[key] = list(set(
title_result.entities.get(key, []) +
content_result.entities.get(key, [])
))
return {
"title_sentiment": asdict(title_result),
"content_sentiment": asdict(content_result),
"overall_sentiment": overall_sentiment,
"overall_score": overall_score,
"keywords": all_keywords,
"entities": all_entities
}
async def analyze_batch(self, texts: List[str]) -> List[SentimentResult]:
"""Analyze sentiment for multiple texts.
Args:
texts: List of texts to analyze
Returns:
List of SentimentResult objects
"""
# Process in batches to avoid overwhelming the system
batch_size = 10
results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
# Process batch concurrently
batch_tasks = [self.analyze(text) for text in batch]
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
# Handle exceptions
for result in batch_results:
if isinstance(result, Exception):
self.logger.error(f"Batch analysis error: {result}")
results.append(SentimentResult("neutral", 0.5, 0.0))
else:
results.append(result)
return results
async def extract_keywords(self, text: str) -> List[Tuple[str, float]]:
"""Extract keywords with importance scores.
Args:
text: Text to extract keywords from
Returns:
List of (keyword, score) tuples
"""
# Simple TF-IDF-like scoring for Korean text
words = self.preprocessor.tokenize(text)
word_scores = {}
for word in words:
if len(word) > 1: # Filter single characters
# Calculate importance based on various factors
score = 1.0
# Boost score for sentiment words
if word in self.positive_terms:
score += abs(self.positive_terms[word])
elif word in self.negative_terms:
score += abs(self.negative_terms[word])
# Boost score for financial terms
if self._is_financial_term(word):
score += 0.5
# Accumulate scores
word_scores[word] = word_scores.get(word, 0) + score
# Sort by score and return top keywords
sorted_keywords = sorted(word_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_keywords[:20] # Return top 20
async def detect_financial_entities(self, text: str) -> Dict[str, List[str]]:
"""Detect financial entities in text.
Args:
text: Text to analyze
Returns:
Dictionary of entity types and their values
"""
entities = {
"companies": [],
"stock_codes": [],
"indices": [],
"currencies": []
}
for entity_type, pattern in self.entity_patterns.items():
matches = pattern.findall(text)
entities[entity_type] = list(set(matches)) # Remove duplicates
return entities
def _calculate_word_sentiment(self, word: str) -> float:
"""Calculate sentiment score for a single word.
Args:
word: Word to analyze
Returns:
Sentiment score (-1.0 to 1.0)
"""
if word in self.positive_terms:
return self.positive_terms[word]
elif word in self.negative_terms:
return self.negative_terms[word]
elif word in self.neutral_terms:
return self.neutral_terms[word]
else:
return 0.0
def _calculate_base_sentiment(self, text: str) -> float:
"""Calculate base sentiment score for text.
Args:
text: Text to analyze
Returns:
Sentiment score (0.0 to 1.0)
"""
words = self.preprocessor.tokenize(text)
if not words:
return 0.5
sentiment_scores = []
for word in words:
score = self._calculate_word_sentiment(word)
if score != 0.0:
sentiment_scores.append(score)
if not sentiment_scores:
return 0.5
# Calculate weighted average
avg_sentiment = sum(sentiment_scores) / len(sentiment_scores)
# Normalize to 0-1 scale
normalized_score = (avg_sentiment + 1.0) / 2.0
return max(0.0, min(1.0, normalized_score))
def _apply_contextual_adjustments(self, base_score: float, text: str,
entities: Dict[str, List[str]]) -> float:
"""Apply contextual adjustments to sentiment score.
Args:
base_score: Base sentiment score
text: Original text
entities: Detected entities
Returns:
Adjusted sentiment score
"""
adjusted_score = base_score
# Amplify sentiment if multiple companies mentioned
if len(entities.get("companies", [])) > 1:
if base_score > 0.5:
adjusted_score = min(1.0, base_score * 1.1)
else:
adjusted_score = max(0.0, base_score * 0.9)
# Check for negation patterns
negation_patterns = ["않", "안", "못", "없", "아니"]
for pattern in negation_patterns:
if pattern in text:
# Reduce confidence in sentiment
if base_score > 0.5:
adjusted_score = 0.5 + (base_score - 0.5) * 0.7
else:
adjusted_score = 0.5 - (0.5 - base_score) * 0.7
break
return adjusted_score
def _detect_language(self, text: str) -> str:
"""Detect language of text.
Args:
text: Text to analyze
Returns:
Language code
"""
# Simple Korean detection
korean_chars = re.findall(r'[가-힣]', text)
if len(korean_chars) > len(text) * 0.3:
return "korean"
else:
return "unknown"
def _detect_sarcasm(self, text: str) -> bool:
"""Detect potential sarcasm in text.
Args:
text: Text to analyze
Returns:
True if sarcasm detected
"""
# Simple sarcasm detection patterns
sarcasm_patterns = [
r"아주\s+좋네요.*(?:하락|손실|적자)",
r"정말\s+훌륭.*(?:망|실패|부실)",
r"최고.*(?:최악|형편없)"
]
for pattern in sarcasm_patterns:
if re.search(pattern, text):
return True
return False
def _calculate_confidence(self, text: str, score: float) -> float:
"""Calculate confidence score for sentiment analysis.
Args:
text: Original text
score: Sentiment score
Returns:
Confidence score (0.0 to 1.0)
"""
# Base confidence
confidence = 0.5
# Higher confidence for extreme scores
distance_from_neutral = abs(score - 0.5)
confidence += distance_from_neutral
# Higher confidence for longer texts
text_length_factor = min(len(text) / 100, 0.3)
confidence += text_length_factor
# Lower confidence for mixed signals
words = self.preprocessor.tokenize(text)
positive_count = sum(1 for word in words if self._calculate_word_sentiment(word) > 0)
negative_count = sum(1 for word in words if self._calculate_word_sentiment(word) < 0)
if positive_count > 0 and negative_count > 0:
mixed_factor = abs(positive_count - negative_count) / (positive_count + negative_count)
confidence *= mixed_factor
return max(0.0, min(1.0, confidence))
def _score_to_label(self, score: float) -> str:
"""Convert sentiment score to label.
Args:
score: Sentiment score (0.0 to 1.0)
Returns:
Sentiment label
"""
if score >= self.thresholds["positive"]:
return "positive"
elif score <= self.thresholds["negative"]:
return "negative"
else:
return "neutral"
def _is_financial_term(self, word: str) -> bool:
"""Check if word is a financial term.
Args:
word: Word to check
Returns:
True if financial term
"""
financial_keywords = [
"주가", "주식", "증권", "투자", "펀드", "채권", "금리", "환율",
"배당", "수익", "손실", "이익", "매출", "실적", "영업", "순이익"
]
return word in financial_keywords
def _get_cache_key(self, text: str) -> str:
"""Generate cache key for text.
Args:
text: Text to generate key for
Returns:
Cache key string
"""
return hashlib.md5(text.encode('utf-8')).hexdigest()
async def analyze_with_context(self, text: str, context: Dict[str, Any]) -> SentimentResult:
"""Analyze sentiment with additional context.
Args:
text: Text to analyze
context: Additional context information
Returns:
SentimentResult with context adjustments
"""
base_result = await self.analyze(text)
# Apply context adjustments
adjusted_score = base_result.score
if context.get("previous_sentiment") == "positive" and base_result.sentiment == "negative":
# Amplify negative sentiment if following positive
adjusted_score = max(0.0, adjusted_score * 0.8)
result = SentimentResult(
sentiment=self._score_to_label(adjusted_score),
score=adjusted_score,
confidence=base_result.confidence,
keywords=base_result.keywords,
entities=base_result.entities,
context_adjusted=True
)
return result
async def analyze_time_weighted(self, news_items: List[Dict[str, Any]]) -> SentimentResult:
"""Analyze sentiment with time weighting.
Args:
news_items: List of news items with timestamps and weights
Returns:
Time-weighted sentiment result
"""
weighted_scores = []
total_weight = 0
for item in news_items:
result = await self.analyze(item["content"])
weight = item.get("weight", 1.0)
weighted_scores.append(result.score * weight)
total_weight += weight
if total_weight == 0:
return SentimentResult("neutral", 0.5, 0.0)
weighted_average = sum(weighted_scores) / total_weight
return SentimentResult(
sentiment=self._score_to_label(weighted_average),
score=weighted_average,
confidence=0.8,
weighted_score=weighted_average
)
async def analyze_trend(self, historical_sentiments: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze sentiment trends over time.
Args:
historical_sentiments: Historical sentiment data
Returns:
Trend analysis results
"""
if len(historical_sentiments) < 2:
return {"direction": "stable", "strength": 0.0, "volatility": 0.0}
scores = [item["score"] for item in historical_sentiments]
# Calculate trend direction
recent_avg = sum(scores[-3:]) / min(3, len(scores))
early_avg = sum(scores[:3]) / min(3, len(scores))
if recent_avg > early_avg + 0.1:
direction = "improving"
elif recent_avg < early_avg - 0.1:
direction = "declining"
else:
direction = "stable"
# Calculate strength and volatility
strength = abs(recent_avg - early_avg)
volatility = sum(abs(scores[i] - scores[i-1]) for i in range(1, len(scores))) / (len(scores) - 1)
return {
"direction": direction,
"strength": strength,
"volatility": volatility
}
async def aggregate_sentiments(self, sentiments: List[SentimentResult]) -> SentimentResult:
"""Aggregate multiple sentiment results.
Args:
sentiments: List of sentiment results
Returns:
Aggregated sentiment result
"""
if not sentiments:
return SentimentResult("neutral", 0.5, 0.0)
# Calculate weighted average
total_score = sum(s.score * s.confidence for s in sentiments)
total_confidence = sum(s.confidence for s in sentiments)
if total_confidence == 0:
return SentimentResult("neutral", 0.5, 0.0)
avg_score = total_score / total_confidence
avg_confidence = total_confidence / len(sentiments)
# Check for mixed sentiments
sentiment_counts = {"positive": 0, "negative": 0, "neutral": 0}
for s in sentiments:
sentiment_counts[s.sentiment] += 1
# If significantly mixed, label as mixed
max_count = max(sentiment_counts.values())
if max_count < len(sentiments) * 0.6:
sentiment_label = "mixed"
else:
sentiment_label = self._score_to_label(avg_score)
return SentimentResult(
sentiment=sentiment_label,
score=avg_score,
confidence=avg_confidence
)
async def update_sentiment(self, previous_result: SentimentResult,
new_text: str) -> SentimentResult:
"""Update sentiment with new information.
Args:
previous_result: Previous sentiment result
new_text: New text to incorporate
Returns:
Updated sentiment result
"""
new_result = await self.analyze(new_text)
# Weighted combination (give more weight to recent information)
previous_weight = 0.3
new_weight = 0.7
updated_score = (previous_result.score * previous_weight +
new_result.score * new_weight)
return SentimentResult(
sentiment=self._score_to_label(updated_score),
score=updated_score,
confidence=(previous_result.confidence + new_result.confidence) / 2,
keywords=list(set(previous_result.keywords + new_result.keywords))
)
async def analyze_with_explanation(self, text: str) -> SentimentResult:
"""Analyze sentiment with detailed explanation.
Args:
text: Text to analyze
Returns:
SentimentResult with explanation
"""
result = await self.analyze(text)
# Generate explanation
words = self.preprocessor.tokenize(text)
positive_factors = []
negative_factors = []
for word in words:
score = self._calculate_word_sentiment(word)
if score > 0:
positive_factors.append((word, score))
elif score < 0:
negative_factors.append((word, abs(score)))
explanation = {
"positive_factors": sorted(positive_factors, key=lambda x: x[1], reverse=True)[:5],
"negative_factors": sorted(negative_factors, key=lambda x: x[1], reverse=True)[:5],
"key_phrases": result.keywords[:3]
}
result.explanation = explanation
return result
def add_custom_rule(self, term: str, sentiment: str, score: float):
"""Add custom sentiment rule.
Args:
term: Term to add rule for
sentiment: Sentiment label
score: Sentiment score
"""
self.custom_rules[term] = {"sentiment": sentiment, "score": score}
def apply_custom_rules(self, text: str) -> Dict[str, Any]:
"""Apply custom sentiment rules to text.
Args:
text: Text to analyze
Returns:
Custom rule results
"""
results = {"score": 0.5, "rules_applied": []}
for term, rule in self.custom_rules.items():
if term in text:
results["score"] = rule["score"]
results["rules_applied"].append(term)
return results
async def analyze_comparative(self, text: str) -> Dict[str, Any]:
"""Analyze comparative sentiment.
Args:
text: Text with comparisons
Returns:
Comparative analysis results
"""
entities = await self.detect_financial_entities(text)
companies = entities.get("companies", [])
# Simple comparative analysis
comparisons = []
if len(companies) >= 2:
# Look for comparative words
if "보다" in text or "대비" in text:
comparisons.append({
"entity1": companies[0],
"entity2": companies[1],
"relation": "better" if "좋" in text else "worse"
})
return {
"entities": entities,
"comparisons": comparisons
}
async def save_result(self, result: SentimentResult, file_path: Path):
"""Save sentiment result to file.
Args:
result: Sentiment result to save
file_path: Path to save file
"""
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(asdict(result), f, ensure_ascii=False, indent=2)
async def load_result(self, file_path: Path) -> SentimentResult:
"""Load sentiment result from file.
Args:
file_path: Path to load file from
Returns:
Loaded sentiment result
"""
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return SentimentResult(**data)
async def analyze_aspects(self, text: str) -> Dict[str, str]:
"""Analyze sentiment for different aspects.
Args:
text: Text to analyze
Returns:
Aspect-based sentiment analysis
"""
aspects = {
"실적": "neutral",
"전망": "neutral",
"경영진": "neutral",
"제품": "neutral",
"재무": "neutral"
}
# Simple aspect-based analysis
for aspect in aspects.keys():
if aspect in text:
# Find surrounding context
aspect_idx = text.find(aspect)
context = text[max(0, aspect_idx-20):aspect_idx+20]
context_result = await self.analyze(context)
aspects[aspect] = context_result.sentiment
return aspects
def get_thresholds(self) -> Dict[str, float]:
"""Get current sentiment thresholds.
Returns:
Dictionary of thresholds
"""
return self.thresholds.copy()
def set_thresholds(self, positive: float = None, negative: float = None):
"""Set sentiment thresholds.
Args:
positive: Positive threshold
negative: Negative threshold
"""
if positive is not None:
self.thresholds["positive"] = positive
if negative is not None:
self.thresholds["negative"] = negative
async def analyze_with_external_api(self, text: str) -> SentimentResult:
"""Analyze sentiment using external API.
Args:
text: Text to analyze
Returns:
Sentiment result from external API
"""
# Mock implementation - would integrate with real external API
# For testing purposes, return a mock result
return SentimentResult(
sentiment="positive",
score=0.85,
confidence=0.9
)