sentiment_analyzer.py•23 kB
"""감정 분석기"""
import asyncio
import re
import time
import hashlib
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from src.exceptions import ModelNotTrainedError, InsufficientDataError, PredictionError
class SentimentAnalyzer:
"""감정 분석기 클래스"""
def __init__(self, config: Dict[str, Any]):
"""
Args:
config: 분석기 설정 딕셔너리
"""
self.config = config
self.model_name = config.get("model_name", "klue/bert-base")
self.max_length = config.get("max_length", 512)
self.batch_size = config.get("batch_size", 16)
self.confidence_threshold = config.get("confidence_threshold", 0.7)
self.sentiment_labels = config.get("sentiment_labels", ["negative", "neutral", "positive"])
self.language = config.get("language", "ko")
# 모델 상태
self.is_initialized = False
self.model = None
self.tokenizer = None
# 캐시 및 성능 메트릭
self.cache = {}
self.performance_metrics = {
"total_texts_processed": 0,
"total_processing_time": 0.0,
"cache_hits": 0,
"cache_misses": 0
}
async def initialize_model(self) -> bool:
"""모델 초기화"""
try:
if self.is_initialized:
return True
# 모델 초기화 시뮬레이션
await asyncio.sleep(0.1) # 로딩 시뮬레이션
self.model = {
"name": self.model_name,
"language": self.language,
"labels": self.sentiment_labels,
"initialized_at": datetime.now().isoformat()
}
self.tokenizer = {
"max_length": self.max_length,
"vocab_size": 32000 # 시뮬레이션
}
self.is_initialized = True
return True
except Exception as e:
raise PredictionError(f"Model initialization failed: {str(e)}")
async def analyze_text(self, text: str) -> Dict[str, Any]:
"""단일 텍스트 감정 분석"""
if not self.is_initialized:
raise ModelNotTrainedError("Model must be initialized before analysis")
if not text or not text.strip():
return {
"sentiment": "neutral",
"confidence": 0.5,
"scores": {"negative": 0.2, "neutral": 0.6, "positive": 0.2},
"processing_time": 0.001
}
try:
start_time = time.time()
# 캐시 확인
text_hash = hashlib.md5(text.encode()).hexdigest()
if text_hash in self.cache:
self.performance_metrics["cache_hits"] += 1
return self.cache[text_hash]
# 텍스트 전처리
cleaned_text = self._clean_text(text)
processed_input = self._prepare_input(cleaned_text)
# 감정 분석 시뮬레이션
sentiment_scores = self._simulate_sentiment_analysis(cleaned_text)
# 결과 생성
dominant_sentiment = max(sentiment_scores, key=sentiment_scores.get)
confidence = sentiment_scores[dominant_sentiment]
processing_time = time.time() - start_time
result = {
"sentiment": dominant_sentiment,
"confidence": confidence,
"scores": sentiment_scores,
"processing_time": processing_time
}
# 캐시 저장
self.cache[text_hash] = result
self.performance_metrics["cache_misses"] += 1
self.performance_metrics["total_texts_processed"] += 1
self.performance_metrics["total_processing_time"] += processing_time
return result
except Exception as e:
raise PredictionError(f"Text analysis failed: {str(e)}")
async def analyze_batch(self, texts: List[str]) -> List[Dict[str, Any]]:
"""배치 텍스트 감정 분석"""
if not self.is_initialized:
raise ModelNotTrainedError("Model must be initialized before analysis")
results = []
# 배치 크기로 분할하여 처리
for i in range(0, len(texts), self.batch_size):
batch = texts[i:i + self.batch_size]
batch_results = await asyncio.gather(
*[self.analyze_text(text) for text in batch]
)
results.extend(batch_results)
return results
async def analyze_news(self, news_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""뉴스 감정 분석"""
if not self.is_initialized:
raise ModelNotTrainedError("Model must be initialized before analysis")
results = []
for i, news in enumerate(news_data):
try:
# 제목과 내용 분석
title = news.get("title", "")
content = news.get("content", "")
title_result = await self.analyze_text(title) if title else None
content_result = await self.analyze_text(content) if content else None
# 종합 분석 (제목과 내용 가중 평균)
if title_result and content_result:
combined_scores = {}
for label in self.sentiment_labels:
# 제목 70%, 내용 30% 가중치
combined_scores[label] = (
title_result["scores"][label] * 0.7 +
content_result["scores"][label] * 0.3
)
overall_sentiment = max(combined_scores, key=combined_scores.get)
overall_confidence = combined_scores[overall_sentiment]
else:
# 하나만 있는 경우
single_result = title_result or content_result
combined_scores = single_result["scores"]
overall_sentiment = single_result["sentiment"]
overall_confidence = single_result["confidence"]
result = {
"news_id": f"news_{i}_{int(time.time())}",
"sentiment": overall_sentiment,
"confidence": overall_confidence,
"scores": combined_scores,
"symbols": news.get("symbols", []),
"timestamp": news.get("timestamp"),
"source": news.get("source"),
"title_sentiment": title_result,
"content_sentiment": content_result
}
results.append(result)
except Exception as e:
# 개별 뉴스 분석 실패 시 중립으로 처리
results.append({
"news_id": f"news_{i}_error",
"sentiment": "neutral",
"confidence": 0.5,
"scores": {"negative": 0.3, "neutral": 0.4, "positive": 0.3},
"symbols": news.get("symbols", []),
"timestamp": news.get("timestamp"),
"source": news.get("source"),
"error": str(e)
})
return results
async def aggregate_sentiment_by_symbol(self, news_results: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""심볼별 감정 집계"""
symbol_data = {}
for result in news_results:
symbols = result.get("symbols", [])
sentiment_score = self._sentiment_to_score(result["sentiment"])
confidence = result["confidence"]
for symbol in symbols:
if symbol not in symbol_data:
symbol_data[symbol] = {
"scores": [],
"confidences": [],
"sentiments": [],
"news_count": 0
}
symbol_data[symbol]["scores"].append(sentiment_score)
symbol_data[symbol]["confidences"].append(confidence)
symbol_data[symbol]["sentiments"].append(result["sentiment"])
symbol_data[symbol]["news_count"] += 1
# 집계 결과 계산
aggregated = {}
for symbol, data in symbol_data.items():
if data["news_count"] == 0:
continue
avg_score = sum(data["scores"]) / len(data["scores"])
avg_confidence = sum(data["confidences"]) / len(data["confidences"])
# 감정 분포 계산
sentiment_counts = {}
for sentiment in data["sentiments"]:
sentiment_counts[sentiment] = sentiment_counts.get(sentiment, 0) + 1
sentiment_distribution = {
k: v / data["news_count"] for k, v in sentiment_counts.items()
}
# 전체 감정 결정
if avg_score > 0.1:
overall_sentiment = "positive"
elif avg_score < -0.1:
overall_sentiment = "negative"
else:
overall_sentiment = "neutral"
# 트렌드 분석 (간단한 구현)
recent_scores = data["scores"][-5:] if len(data["scores"]) >= 5 else data["scores"]
if len(recent_scores) >= 2:
trend = "improving" if recent_scores[-1] > recent_scores[0] else "declining"
else:
trend = "stable"
aggregated[symbol] = {
"overall_sentiment": overall_sentiment,
"sentiment_score": avg_score,
"news_count": data["news_count"],
"confidence_avg": avg_confidence,
"sentiment_distribution": sentiment_distribution,
"trend_analysis": {"trend": trend, "recent_change": recent_scores[-1] - recent_scores[0] if len(recent_scores) >= 2 else 0}
}
return aggregated
async def get_sentiment_time_series(self, news_results: List[Dict[str, Any]], interval: str = '1H') -> Dict[str, Dict[str, Any]]:
"""시계열 감정 분석"""
time_data = {}
for result in news_results:
timestamp = result.get("timestamp")
if not timestamp:
continue
try:
# 시간대별로 그룹핑 (간단한 구현)
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
if interval == '1H':
time_key = dt.strftime('%Y-%m-%d %H:00:00')
elif interval == '1D':
time_key = dt.strftime('%Y-%m-%d')
else:
time_key = timestamp
if time_key not in time_data:
time_data[time_key] = {
"scores": [],
"sentiments": [],
"confidences": []
}
sentiment_score = self._sentiment_to_score(result["sentiment"])
time_data[time_key]["scores"].append(sentiment_score)
time_data[time_key]["sentiments"].append(result["sentiment"])
time_data[time_key]["confidences"].append(result["confidence"])
except Exception:
continue
# 시계열 데이터 계산
time_series = {}
for time_key, data in time_data.items():
if not data["scores"]:
continue
avg_score = sum(data["scores"]) / len(data["scores"])
# 변동성 계산
if len(data["scores"]) > 1:
variance = sum((x - avg_score) ** 2 for x in data["scores"]) / len(data["scores"])
volatility = variance ** 0.5
else:
volatility = 0
# 지배적 감정
sentiment_counts = {}
for sentiment in data["sentiments"]:
sentiment_counts[sentiment] = sentiment_counts.get(sentiment, 0) + 1
dominant_sentiment = max(sentiment_counts, key=sentiment_counts.get)
time_series[time_key] = {
"sentiment_score": avg_score,
"news_count": len(data["scores"]),
"dominant_sentiment": dominant_sentiment,
"volatility": volatility
}
return time_series
async def analyze_sentiment_impact(self, news_results: List[Dict[str, Any]], price_data: Dict[str, Dict[str, float]]) -> Dict[str, Any]:
"""감정 영향도 분석"""
correlations = {}
impact_scores = {}
significant_events = []
for symbol, prices in price_data.items():
# 해당 심볼 관련 뉴스 필터링
symbol_news = [news for news in news_results if symbol in news.get("symbols", [])]
if not symbol_news:
continue
# 시간별 감정 점수와 가격 변화 매칭
sentiment_scores = []
price_changes = []
price_times = sorted(prices.keys())
for i in range(1, len(price_times)):
current_time = price_times[i]
prev_time = price_times[i-1]
# 가격 변화율
price_change = (prices[current_time] - prices[prev_time]) / prices[prev_time]
# 해당 시간대 감정 점수
relevant_news = [
news for news in symbol_news
if prev_time <= news.get("timestamp", "") <= current_time
]
if relevant_news:
avg_sentiment = sum(self._sentiment_to_score(news["sentiment"]) for news in relevant_news) / len(relevant_news)
sentiment_scores.append(avg_sentiment)
price_changes.append(price_change)
# 상관관계 계산
if len(sentiment_scores) >= 2:
correlation = self._calculate_correlation(sentiment_scores, price_changes)
correlations[symbol] = correlation
# 영향도 점수 (상관관계 강도)
impact_scores[symbol] = abs(correlation)
# 유의미한 이벤트 탐지
for i, (sentiment, price_change) in enumerate(zip(sentiment_scores, price_changes)):
if abs(sentiment) > 0.5 and abs(price_change) > 0.02: # 강한 감정 + 큰 가격 변화
significant_events.append({
"symbol": symbol,
"time_index": i,
"sentiment_score": sentiment,
"price_change": price_change,
"impact_magnitude": abs(sentiment * price_change)
})
return {
"correlation_analysis": {
"sentiment_price_correlation": sum(correlations.values()) / len(correlations) if correlations else 0
},
"impact_scores": impact_scores,
"significant_events": significant_events,
"lag_analysis": {"optimal_lag": 0, "lag_correlation": 0} # 간단한 구현
}
async def analyze_sentiment_trends(self, news_results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""감정 트렌드 분석"""
if not news_results:
return {"trend_direction": "stable", "trend_strength": 0, "trend_acceleration": 0, "turning_points": []}
# 시간순 정렬
sorted_news = sorted(news_results, key=lambda x: x.get("timestamp", ""))
# 감정 점수 시계열
sentiment_scores = [self._sentiment_to_score(news["sentiment"]) for news in sorted_news]
if len(sentiment_scores) < 2:
return {"trend_direction": "stable", "trend_strength": 0, "trend_acceleration": 0, "turning_points": []}
# 트렌드 방향 (선형 회귀 기울기)
n = len(sentiment_scores)
x = list(range(n))
# 간단한 선형 회귀
slope = self._calculate_slope(x, sentiment_scores)
if slope > 0.01:
trend_direction = "up"
elif slope < -0.01:
trend_direction = "down"
else:
trend_direction = "stable"
# 트렌드 강도
trend_strength = min(abs(slope) * 10, 1.0) # 0-1 범위로 정규화
# 가속도 (2차 도함수 근사)
if len(sentiment_scores) >= 3:
recent_slope = self._calculate_slope(x[-3:], sentiment_scores[-3:])
earlier_slope = self._calculate_slope(x[:3], sentiment_scores[:3])
trend_acceleration = recent_slope - earlier_slope
else:
trend_acceleration = 0
return {
"trend_direction": trend_direction,
"trend_strength": trend_strength,
"trend_acceleration": trend_acceleration,
"turning_points": [] # 간단한 구현에서는 빈 리스트
}
async def filter_by_confidence(self, results: List[Dict[str, Any]], threshold: float = None) -> List[Dict[str, Any]]:
"""신뢰도 필터링"""
if threshold is None:
threshold = self.confidence_threshold
return [result for result in results if result.get("confidence", 0) >= threshold]
def get_performance_metrics(self) -> Dict[str, Any]:
"""성능 메트릭 조회"""
total_requests = self.performance_metrics["cache_hits"] + self.performance_metrics["cache_misses"]
cache_hit_rate = (self.performance_metrics["cache_hits"] / total_requests) if total_requests > 0 else 0
avg_processing_time = (
self.performance_metrics["total_processing_time"] /
self.performance_metrics["total_texts_processed"]
) if self.performance_metrics["total_texts_processed"] > 0 else 0
return {
"total_texts_processed": self.performance_metrics["total_texts_processed"],
"average_processing_time": avg_processing_time,
"cache_hit_rate": cache_hit_rate,
"cache_size": len(self.cache)
}
def _clean_text(self, text: str) -> str:
"""텍스트 정제"""
# 특수문자 제거
cleaned = re.sub(r'[^\w\s가-힣]', ' ', text)
# 연속 공백 제거
cleaned = re.sub(r'\s+', ' ', cleaned)
# 앞뒤 공백 제거
cleaned = cleaned.strip()
# 최대 길이 제한
if len(cleaned) > self.max_length:
cleaned = cleaned[:self.max_length]
return cleaned
def _tokenize(self, text: str) -> List[str]:
"""토큰화"""
# 간단한 공백 기반 토큰화
tokens = text.split()
return tokens
def _prepare_input(self, text: str) -> Dict[str, Any]:
"""입력 데이터 준비"""
tokens = self._tokenize(text)
# 토큰 ID 시뮬레이션
input_ids = [hash(token) % 30000 for token in tokens]
attention_mask = [1] * len(input_ids)
return {
"input_ids": input_ids,
"attention_mask": attention_mask,
"tokens": tokens
}
def _simulate_sentiment_analysis(self, text: str) -> Dict[str, float]:
"""감정 분석 시뮬레이션"""
# 키워드 기반 감정 점수 (실제로는 모델 추론)
positive_words = ["상승", "성장", "개선", "호조", "증가", "좋", "긍정"]
negative_words = ["하락", "감소", "악화", "부정", "우려", "위험", "나쁨"]
text_lower = text.lower()
positive_count = sum(1 for word in positive_words if word in text)
negative_count = sum(1 for word in negative_words if word in text)
# 기본 점수
base_scores = {"negative": 0.3, "neutral": 0.4, "positive": 0.3}
# 키워드에 따른 조정
if positive_count > negative_count:
base_scores["positive"] += 0.3
base_scores["neutral"] -= 0.15
base_scores["negative"] -= 0.15
elif negative_count > positive_count:
base_scores["negative"] += 0.3
base_scores["neutral"] -= 0.15
base_scores["positive"] -= 0.15
# 정규화
total = sum(base_scores.values())
return {k: v / total for k, v in base_scores.items()}
def _sentiment_to_score(self, sentiment: str) -> float:
"""감정 라벨을 점수로 변환"""
score_map = {
"positive": 1.0,
"neutral": 0.0,
"negative": -1.0
}
return score_map.get(sentiment, 0.0)
def _calculate_correlation(self, x: List[float], y: List[float]) -> float:
"""상관관계 계산"""
if len(x) != len(y) or len(x) < 2:
return 0.0
n = len(x)
# 평균 계산
mean_x = sum(x) / n
mean_y = sum(y) / n
# 상관계수 계산
numerator = sum((x[i] - mean_x) * (y[i] - mean_y) for i in range(n))
sum_sq_x = sum((x[i] - mean_x) ** 2 for i in range(n))
sum_sq_y = sum((y[i] - mean_y) ** 2 for i in range(n))
denominator = (sum_sq_x * sum_sq_y) ** 0.5
if denominator == 0:
return 0.0
return numerator / denominator
def _calculate_slope(self, x: List[float], y: List[float]) -> float:
"""기울기 계산"""
if len(x) != len(y) or len(x) < 2:
return 0.0
n = len(x)
mean_x = sum(x) / n
mean_y = sum(y) / n
numerator = sum((x[i] - mean_x) * (y[i] - mean_y) for i in range(n))
denominator = sum((x[i] - mean_x) ** 2 for i in range(n))
if denominator == 0:
return 0.0
return numerator / denominator