Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
sentiment_analyzer.py23 kB
"""감정 분석기""" import asyncio import re import time import hashlib from datetime import datetime, timedelta from typing import Dict, List, Any, Optional from src.exceptions import ModelNotTrainedError, InsufficientDataError, PredictionError class SentimentAnalyzer: """감정 분석기 클래스""" def __init__(self, config: Dict[str, Any]): """ Args: config: 분석기 설정 딕셔너리 """ self.config = config self.model_name = config.get("model_name", "klue/bert-base") self.max_length = config.get("max_length", 512) self.batch_size = config.get("batch_size", 16) self.confidence_threshold = config.get("confidence_threshold", 0.7) self.sentiment_labels = config.get("sentiment_labels", ["negative", "neutral", "positive"]) self.language = config.get("language", "ko") # 모델 상태 self.is_initialized = False self.model = None self.tokenizer = None # 캐시 및 성능 메트릭 self.cache = {} self.performance_metrics = { "total_texts_processed": 0, "total_processing_time": 0.0, "cache_hits": 0, "cache_misses": 0 } async def initialize_model(self) -> bool: """모델 초기화""" try: if self.is_initialized: return True # 모델 초기화 시뮬레이션 await asyncio.sleep(0.1) # 로딩 시뮬레이션 self.model = { "name": self.model_name, "language": self.language, "labels": self.sentiment_labels, "initialized_at": datetime.now().isoformat() } self.tokenizer = { "max_length": self.max_length, "vocab_size": 32000 # 시뮬레이션 } self.is_initialized = True return True except Exception as e: raise PredictionError(f"Model initialization failed: {str(e)}") async def analyze_text(self, text: str) -> Dict[str, Any]: """단일 텍스트 감정 분석""" if not self.is_initialized: raise ModelNotTrainedError("Model must be initialized before analysis") if not text or not text.strip(): return { "sentiment": "neutral", "confidence": 0.5, "scores": {"negative": 0.2, "neutral": 0.6, "positive": 0.2}, "processing_time": 0.001 } try: start_time = time.time() # 캐시 확인 text_hash = hashlib.md5(text.encode()).hexdigest() if text_hash in self.cache: self.performance_metrics["cache_hits"] += 1 return self.cache[text_hash] # 텍스트 전처리 cleaned_text = self._clean_text(text) processed_input = self._prepare_input(cleaned_text) # 감정 분석 시뮬레이션 sentiment_scores = self._simulate_sentiment_analysis(cleaned_text) # 결과 생성 dominant_sentiment = max(sentiment_scores, key=sentiment_scores.get) confidence = sentiment_scores[dominant_sentiment] processing_time = time.time() - start_time result = { "sentiment": dominant_sentiment, "confidence": confidence, "scores": sentiment_scores, "processing_time": processing_time } # 캐시 저장 self.cache[text_hash] = result self.performance_metrics["cache_misses"] += 1 self.performance_metrics["total_texts_processed"] += 1 self.performance_metrics["total_processing_time"] += processing_time return result except Exception as e: raise PredictionError(f"Text analysis failed: {str(e)}") async def analyze_batch(self, texts: List[str]) -> List[Dict[str, Any]]: """배치 텍스트 감정 분석""" if not self.is_initialized: raise ModelNotTrainedError("Model must be initialized before analysis") results = [] # 배치 크기로 분할하여 처리 for i in range(0, len(texts), self.batch_size): batch = texts[i:i + self.batch_size] batch_results = await asyncio.gather( *[self.analyze_text(text) for text in batch] ) results.extend(batch_results) return results async def analyze_news(self, news_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """뉴스 감정 분석""" if not self.is_initialized: raise ModelNotTrainedError("Model must be initialized before analysis") results = [] for i, news in enumerate(news_data): try: # 제목과 내용 분석 title = news.get("title", "") content = news.get("content", "") title_result = await self.analyze_text(title) if title else None content_result = await self.analyze_text(content) if content else None # 종합 분석 (제목과 내용 가중 평균) if title_result and content_result: combined_scores = {} for label in self.sentiment_labels: # 제목 70%, 내용 30% 가중치 combined_scores[label] = ( title_result["scores"][label] * 0.7 + content_result["scores"][label] * 0.3 ) overall_sentiment = max(combined_scores, key=combined_scores.get) overall_confidence = combined_scores[overall_sentiment] else: # 하나만 있는 경우 single_result = title_result or content_result combined_scores = single_result["scores"] overall_sentiment = single_result["sentiment"] overall_confidence = single_result["confidence"] result = { "news_id": f"news_{i}_{int(time.time())}", "sentiment": overall_sentiment, "confidence": overall_confidence, "scores": combined_scores, "symbols": news.get("symbols", []), "timestamp": news.get("timestamp"), "source": news.get("source"), "title_sentiment": title_result, "content_sentiment": content_result } results.append(result) except Exception as e: # 개별 뉴스 분석 실패 시 중립으로 처리 results.append({ "news_id": f"news_{i}_error", "sentiment": "neutral", "confidence": 0.5, "scores": {"negative": 0.3, "neutral": 0.4, "positive": 0.3}, "symbols": news.get("symbols", []), "timestamp": news.get("timestamp"), "source": news.get("source"), "error": str(e) }) return results async def aggregate_sentiment_by_symbol(self, news_results: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: """심볼별 감정 집계""" symbol_data = {} for result in news_results: symbols = result.get("symbols", []) sentiment_score = self._sentiment_to_score(result["sentiment"]) confidence = result["confidence"] for symbol in symbols: if symbol not in symbol_data: symbol_data[symbol] = { "scores": [], "confidences": [], "sentiments": [], "news_count": 0 } symbol_data[symbol]["scores"].append(sentiment_score) symbol_data[symbol]["confidences"].append(confidence) symbol_data[symbol]["sentiments"].append(result["sentiment"]) symbol_data[symbol]["news_count"] += 1 # 집계 결과 계산 aggregated = {} for symbol, data in symbol_data.items(): if data["news_count"] == 0: continue avg_score = sum(data["scores"]) / len(data["scores"]) avg_confidence = sum(data["confidences"]) / len(data["confidences"]) # 감정 분포 계산 sentiment_counts = {} for sentiment in data["sentiments"]: sentiment_counts[sentiment] = sentiment_counts.get(sentiment, 0) + 1 sentiment_distribution = { k: v / data["news_count"] for k, v in sentiment_counts.items() } # 전체 감정 결정 if avg_score > 0.1: overall_sentiment = "positive" elif avg_score < -0.1: overall_sentiment = "negative" else: overall_sentiment = "neutral" # 트렌드 분석 (간단한 구현) recent_scores = data["scores"][-5:] if len(data["scores"]) >= 5 else data["scores"] if len(recent_scores) >= 2: trend = "improving" if recent_scores[-1] > recent_scores[0] else "declining" else: trend = "stable" aggregated[symbol] = { "overall_sentiment": overall_sentiment, "sentiment_score": avg_score, "news_count": data["news_count"], "confidence_avg": avg_confidence, "sentiment_distribution": sentiment_distribution, "trend_analysis": {"trend": trend, "recent_change": recent_scores[-1] - recent_scores[0] if len(recent_scores) >= 2 else 0} } return aggregated async def get_sentiment_time_series(self, news_results: List[Dict[str, Any]], interval: str = '1H') -> Dict[str, Dict[str, Any]]: """시계열 감정 분석""" time_data = {} for result in news_results: timestamp = result.get("timestamp") if not timestamp: continue try: # 시간대별로 그룹핑 (간단한 구현) dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) if interval == '1H': time_key = dt.strftime('%Y-%m-%d %H:00:00') elif interval == '1D': time_key = dt.strftime('%Y-%m-%d') else: time_key = timestamp if time_key not in time_data: time_data[time_key] = { "scores": [], "sentiments": [], "confidences": [] } sentiment_score = self._sentiment_to_score(result["sentiment"]) time_data[time_key]["scores"].append(sentiment_score) time_data[time_key]["sentiments"].append(result["sentiment"]) time_data[time_key]["confidences"].append(result["confidence"]) except Exception: continue # 시계열 데이터 계산 time_series = {} for time_key, data in time_data.items(): if not data["scores"]: continue avg_score = sum(data["scores"]) / len(data["scores"]) # 변동성 계산 if len(data["scores"]) > 1: variance = sum((x - avg_score) ** 2 for x in data["scores"]) / len(data["scores"]) volatility = variance ** 0.5 else: volatility = 0 # 지배적 감정 sentiment_counts = {} for sentiment in data["sentiments"]: sentiment_counts[sentiment] = sentiment_counts.get(sentiment, 0) + 1 dominant_sentiment = max(sentiment_counts, key=sentiment_counts.get) time_series[time_key] = { "sentiment_score": avg_score, "news_count": len(data["scores"]), "dominant_sentiment": dominant_sentiment, "volatility": volatility } return time_series async def analyze_sentiment_impact(self, news_results: List[Dict[str, Any]], price_data: Dict[str, Dict[str, float]]) -> Dict[str, Any]: """감정 영향도 분석""" correlations = {} impact_scores = {} significant_events = [] for symbol, prices in price_data.items(): # 해당 심볼 관련 뉴스 필터링 symbol_news = [news for news in news_results if symbol in news.get("symbols", [])] if not symbol_news: continue # 시간별 감정 점수와 가격 변화 매칭 sentiment_scores = [] price_changes = [] price_times = sorted(prices.keys()) for i in range(1, len(price_times)): current_time = price_times[i] prev_time = price_times[i-1] # 가격 변화율 price_change = (prices[current_time] - prices[prev_time]) / prices[prev_time] # 해당 시간대 감정 점수 relevant_news = [ news for news in symbol_news if prev_time <= news.get("timestamp", "") <= current_time ] if relevant_news: avg_sentiment = sum(self._sentiment_to_score(news["sentiment"]) for news in relevant_news) / len(relevant_news) sentiment_scores.append(avg_sentiment) price_changes.append(price_change) # 상관관계 계산 if len(sentiment_scores) >= 2: correlation = self._calculate_correlation(sentiment_scores, price_changes) correlations[symbol] = correlation # 영향도 점수 (상관관계 강도) impact_scores[symbol] = abs(correlation) # 유의미한 이벤트 탐지 for i, (sentiment, price_change) in enumerate(zip(sentiment_scores, price_changes)): if abs(sentiment) > 0.5 and abs(price_change) > 0.02: # 강한 감정 + 큰 가격 변화 significant_events.append({ "symbol": symbol, "time_index": i, "sentiment_score": sentiment, "price_change": price_change, "impact_magnitude": abs(sentiment * price_change) }) return { "correlation_analysis": { "sentiment_price_correlation": sum(correlations.values()) / len(correlations) if correlations else 0 }, "impact_scores": impact_scores, "significant_events": significant_events, "lag_analysis": {"optimal_lag": 0, "lag_correlation": 0} # 간단한 구현 } async def analyze_sentiment_trends(self, news_results: List[Dict[str, Any]]) -> Dict[str, Any]: """감정 트렌드 분석""" if not news_results: return {"trend_direction": "stable", "trend_strength": 0, "trend_acceleration": 0, "turning_points": []} # 시간순 정렬 sorted_news = sorted(news_results, key=lambda x: x.get("timestamp", "")) # 감정 점수 시계열 sentiment_scores = [self._sentiment_to_score(news["sentiment"]) for news in sorted_news] if len(sentiment_scores) < 2: return {"trend_direction": "stable", "trend_strength": 0, "trend_acceleration": 0, "turning_points": []} # 트렌드 방향 (선형 회귀 기울기) n = len(sentiment_scores) x = list(range(n)) # 간단한 선형 회귀 slope = self._calculate_slope(x, sentiment_scores) if slope > 0.01: trend_direction = "up" elif slope < -0.01: trend_direction = "down" else: trend_direction = "stable" # 트렌드 강도 trend_strength = min(abs(slope) * 10, 1.0) # 0-1 범위로 정규화 # 가속도 (2차 도함수 근사) if len(sentiment_scores) >= 3: recent_slope = self._calculate_slope(x[-3:], sentiment_scores[-3:]) earlier_slope = self._calculate_slope(x[:3], sentiment_scores[:3]) trend_acceleration = recent_slope - earlier_slope else: trend_acceleration = 0 return { "trend_direction": trend_direction, "trend_strength": trend_strength, "trend_acceleration": trend_acceleration, "turning_points": [] # 간단한 구현에서는 빈 리스트 } async def filter_by_confidence(self, results: List[Dict[str, Any]], threshold: float = None) -> List[Dict[str, Any]]: """신뢰도 필터링""" if threshold is None: threshold = self.confidence_threshold return [result for result in results if result.get("confidence", 0) >= threshold] def get_performance_metrics(self) -> Dict[str, Any]: """성능 메트릭 조회""" total_requests = self.performance_metrics["cache_hits"] + self.performance_metrics["cache_misses"] cache_hit_rate = (self.performance_metrics["cache_hits"] / total_requests) if total_requests > 0 else 0 avg_processing_time = ( self.performance_metrics["total_processing_time"] / self.performance_metrics["total_texts_processed"] ) if self.performance_metrics["total_texts_processed"] > 0 else 0 return { "total_texts_processed": self.performance_metrics["total_texts_processed"], "average_processing_time": avg_processing_time, "cache_hit_rate": cache_hit_rate, "cache_size": len(self.cache) } def _clean_text(self, text: str) -> str: """텍스트 정제""" # 특수문자 제거 cleaned = re.sub(r'[^\w\s가-힣]', ' ', text) # 연속 공백 제거 cleaned = re.sub(r'\s+', ' ', cleaned) # 앞뒤 공백 제거 cleaned = cleaned.strip() # 최대 길이 제한 if len(cleaned) > self.max_length: cleaned = cleaned[:self.max_length] return cleaned def _tokenize(self, text: str) -> List[str]: """토큰화""" # 간단한 공백 기반 토큰화 tokens = text.split() return tokens def _prepare_input(self, text: str) -> Dict[str, Any]: """입력 데이터 준비""" tokens = self._tokenize(text) # 토큰 ID 시뮬레이션 input_ids = [hash(token) % 30000 for token in tokens] attention_mask = [1] * len(input_ids) return { "input_ids": input_ids, "attention_mask": attention_mask, "tokens": tokens } def _simulate_sentiment_analysis(self, text: str) -> Dict[str, float]: """감정 분석 시뮬레이션""" # 키워드 기반 감정 점수 (실제로는 모델 추론) positive_words = ["상승", "성장", "개선", "호조", "증가", "좋", "긍정"] negative_words = ["하락", "감소", "악화", "부정", "우려", "위험", "나쁨"] text_lower = text.lower() positive_count = sum(1 for word in positive_words if word in text) negative_count = sum(1 for word in negative_words if word in text) # 기본 점수 base_scores = {"negative": 0.3, "neutral": 0.4, "positive": 0.3} # 키워드에 따른 조정 if positive_count > negative_count: base_scores["positive"] += 0.3 base_scores["neutral"] -= 0.15 base_scores["negative"] -= 0.15 elif negative_count > positive_count: base_scores["negative"] += 0.3 base_scores["neutral"] -= 0.15 base_scores["positive"] -= 0.15 # 정규화 total = sum(base_scores.values()) return {k: v / total for k, v in base_scores.items()} def _sentiment_to_score(self, sentiment: str) -> float: """감정 라벨을 점수로 변환""" score_map = { "positive": 1.0, "neutral": 0.0, "negative": -1.0 } return score_map.get(sentiment, 0.0) def _calculate_correlation(self, x: List[float], y: List[float]) -> float: """상관관계 계산""" if len(x) != len(y) or len(x) < 2: return 0.0 n = len(x) # 평균 계산 mean_x = sum(x) / n mean_y = sum(y) / n # 상관계수 계산 numerator = sum((x[i] - mean_x) * (y[i] - mean_y) for i in range(n)) sum_sq_x = sum((x[i] - mean_x) ** 2 for i in range(n)) sum_sq_y = sum((y[i] - mean_y) ** 2 for i in range(n)) denominator = (sum_sq_x * sum_sq_y) ** 0.5 if denominator == 0: return 0.0 return numerator / denominator def _calculate_slope(self, x: List[float], y: List[float]) -> float: """기울기 계산""" if len(x) != len(y) or len(x) < 2: return 0.0 n = len(x) mean_x = sum(x) / n mean_y = sum(y) / n numerator = sum((x[i] - mean_x) * (y[i] - mean_y) for i in range(n)) denominator = sum((x[i] - mean_x) ** 2 for i in range(n)) if denominator == 0: return 0.0 return numerator / denominator

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server