"""Market impact analyzer for financial news."""
import json
import math
import asyncio
import logging
from datetime import datetime, timezone, timedelta
from typing import Dict, Any, List, Optional, Tuple, Union
from dataclasses import dataclass, asdict
from pathlib import Path
import statistics
class MarketImpactError(Exception):
"""Market impact analysis specific error."""
pass
@dataclass
class ImpactResult:
"""Market impact analysis result."""
stock_code: str
impact_score: float # 0.0 to 1.0
direction: str # "positive", "negative", "neutral"
confidence: float # 0.0 to 1.0
predicted_change: float = 0.0 # Expected price change percentage
time_horizon: int = 24 # Hours
affected_price_range: Tuple[float, float] = None
volume_impact: float = 1.0 # Volume multiplier
volatility_impact: float = 0.0 # Additional volatility
sector_spillover: float = 0.0 # Sector-wide impact
risk_factors: List[str] = None
def __post_init__(self):
if self.risk_factors is None:
self.risk_factors = []
class MarketImpactAnalyzer:
"""Advanced market impact analyzer for financial news."""
def __init__(self):
"""Initialize market impact analyzer."""
self.logger = logging.getLogger("market_impact_analyzer")
# Impact tracking
self.active_tracking = {}
# Model parameters
self.impact_weights = {
"sentiment": 0.4,
"importance": 0.3,
"timing": 0.2,
"market_conditions": 0.1
}
# Market cap thresholds for adjustment
self.market_cap_thresholds = {
"large_cap": 10000000000000, # 10조원
"mid_cap": 1000000000000, # 1조원
"small_cap": 100000000000 # 1000억원
}
# Sector mappings
self.sector_mappings = {
"전자": ["삼성전자", "LG전자", "SK하이닉스"],
"금융": ["KB금융", "신한금융", "하나금융"],
"화학": ["LG화학", "삼성SDI", "SK이노베이션"],
"자동차": ["현대차", "기아", "현대모비스"]
}
# Event type impact multipliers
self.event_multipliers = {
"earnings": 1.2,
"management": 1.1,
"product": 0.9,
"corporate_action": 1.5,
"regulatory": 1.3,
"market": 0.8
}
# Statistics
self.stats = {
"total_analyzed": 0,
"high_impact_events": 0,
"prediction_accuracy": 0.0
}
async def analyze_impact(self, news_data: Dict[str, Any],
market_data: Dict[str, Any]) -> ImpactResult:
"""Analyze market impact of news.
Args:
news_data: News data with sentiment analysis
market_data: Current market data
Returns:
ImpactResult object
"""
if not news_data:
raise MarketImpactError("News data is required")
try:
# Extract relevant information
sentiment_data = news_data.get("sentiment_analysis", {})
entities = news_data.get("entities", {})
companies = entities.get("companies", [])
if not companies:
return ImpactResult("", 0.0, "neutral", 0.0)
# Focus on first company for detailed analysis
primary_company = companies[0]
stock_code = self._get_stock_code(primary_company, entities)
# Calculate various impact components
sentiment_impact = await self.calculate_sentiment_impact(sentiment_data)
importance_score = await self.calculate_news_importance(news_data)
time_decay = self.calculate_time_decay(
news_data.get("published_at", datetime.now(timezone.utc))
)
# Get market context
stock_market_data = market_data.get(stock_code, {})
market_cap = stock_market_data.get("market_cap", 1000000000000)
volatility = stock_market_data.get("volatility", 0.25)
# Calculate base impact score
base_impact = (
sentiment_impact * self.impact_weights["sentiment"] +
importance_score * self.impact_weights["importance"] +
time_decay * self.impact_weights["timing"]
)
# Apply market cap adjustment
market_cap_adj = self.calculate_market_cap_adjustment(market_cap)
adjusted_impact = base_impact * market_cap_adj
# Determine direction
direction = self._determine_direction(sentiment_data.get("sentiment", "neutral"))
# Calculate confidence
confidence = self._calculate_confidence(
sentiment_data, importance_score, stock_market_data
)
# Predict price change
predicted_change = self._predict_price_change(
adjusted_impact, direction, volatility
)
# Calculate additional impacts
volume_impact = await self._calculate_volume_impact(
adjusted_impact, stock_market_data
)
volatility_impact = await self.calculate_volatility_impact(
importance_score, {"volatility": volatility}
)
result = ImpactResult(
stock_code=stock_code,
impact_score=min(1.0, max(0.0, adjusted_impact)),
direction=direction,
confidence=confidence,
predicted_change=predicted_change,
volume_impact=volume_impact,
volatility_impact=volatility_impact
)
self.stats["total_analyzed"] += 1
if adjusted_impact > 0.7:
self.stats["high_impact_events"] += 1
return result
except Exception as e:
self.logger.error(f"Error analyzing market impact: {e}")
return ImpactResult("", 0.0, "neutral", 0.0)
async def predict_price_movement(self, news_data: Dict[str, Any],
stock_data: Dict[str, Any]) -> Dict[str, Any]:
"""Predict price movement based on news.
Args:
news_data: News data with sentiment analysis
stock_data: Stock-specific market data
Returns:
Price movement prediction
"""
sentiment = news_data.get("sentiment_analysis", {})
current_price = stock_data.get("current_price", 50000)
volatility = stock_data.get("volatility", 0.25)
# Calculate expected change percentage
sentiment_score = sentiment.get("score", 0.5)
confidence = sentiment.get("confidence", 0.5)
# Convert sentiment to price change expectation
if sentiment.get("sentiment") == "positive":
base_change = (sentiment_score - 0.5) * 2 * 0.05 # Max 5% change
elif sentiment.get("sentiment") == "negative":
base_change = -(0.5 - sentiment_score) * 2 * 0.05
else:
base_change = 0.0
# Adjust for confidence and volatility
adjusted_change = base_change * confidence * (1 + volatility)
# Calculate price target
price_target = current_price * (1 + adjusted_change)
return {
"predicted_change_percent": adjusted_change * 100,
"price_target": price_target,
"confidence": confidence,
"time_horizon": 24 # 24 hours
}
async def calculate_sentiment_impact(self, sentiment_data: Dict[str, Any]) -> float:
"""Calculate impact score from sentiment data.
Args:
sentiment_data: Sentiment analysis results
Returns:
Impact score (0.0 to 1.0)
"""
if not sentiment_data:
return 0.5
sentiment = sentiment_data.get("sentiment", "neutral")
score = sentiment_data.get("score", 0.5)
confidence = sentiment_data.get("confidence", 0.5)
# Base impact from sentiment score
if sentiment == "positive":
base_impact = score
elif sentiment == "negative":
base_impact = 1.0 - score
else:
base_impact = 0.5
# Weight by confidence
weighted_impact = base_impact * confidence + 0.5 * (1 - confidence)
return min(1.0, max(0.0, weighted_impact))
async def analyze_volume_impact(self, news_data: Dict[str, Any],
market_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze expected volume impact.
Args:
news_data: News data
market_data: Market data
Returns:
Volume impact analysis
"""
sentiment = news_data.get("sentiment_analysis", {})
importance = await self.calculate_news_importance(news_data)
# Calculate volume multiplier based on sentiment strength and importance
sentiment_strength = abs(sentiment.get("score", 0.5) - 0.5) * 2
volume_multiplier = 1.0 + (sentiment_strength * importance * 2.0)
# Trading interest level
if volume_multiplier > 2.0:
interest_level = "very_high"
elif volume_multiplier > 1.5:
interest_level = "high"
elif volume_multiplier > 1.2:
interest_level = "moderate"
else:
interest_level = "low"
return {
"expected_volume_change": (volume_multiplier - 1.0) * 100,
"volume_multiplier": volume_multiplier,
"trading_interest": interest_level
}
async def calculate_volatility_impact(self, news_importance: float,
market_conditions: Dict[str, Any]) -> float:
"""Calculate volatility impact.
Args:
news_importance: Importance score of news
market_conditions: Current market conditions
Returns:
Additional volatility factor
"""
base_volatility = market_conditions.get("volatility", 0.25)
# News importance increases volatility
volatility_increase = news_importance * 0.1 # Max 10% increase
# Market sentiment can amplify or dampen
market_sentiment = market_conditions.get("market_sentiment", "neutral")
sentiment_multiplier = 1.2 if market_sentiment == "volatile" else 1.0
additional_volatility = volatility_increase * sentiment_multiplier
return additional_volatility
async def analyze_sector_impact(self, news_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze sector-wide impact.
Args:
news_data: News data
Returns:
Sector impact analysis
"""
entities = news_data.get("entities", {})
companies = entities.get("companies", [])
sentiment = news_data.get("sentiment_analysis", {})
affected_companies = []
sector_impact_score = 0.0
# Find sector for mentioned companies
for company in companies:
for sector, sector_companies in self.sector_mappings.items():
if company in sector_companies:
affected_companies.extend(sector_companies)
# Calculate sector impact
sentiment_impact = await self.calculate_sentiment_impact(sentiment)
sector_impact_score = max(sector_impact_score, sentiment_impact * 0.7)
break
# Remove duplicates
affected_companies = list(set(affected_companies))
spillover_effects = []
if sector_impact_score > 0.6:
spillover_effects.append("high_correlation_expected")
if len(affected_companies) > 3:
spillover_effects.append("broad_sector_impact")
return {
"affected_companies": affected_companies,
"sector_impact_score": sector_impact_score,
"spillover_effects": spillover_effects
}
def calculate_time_decay(self, published_time: datetime) -> float:
"""Calculate time decay factor.
Args:
published_time: When news was published
Returns:
Time decay factor (0.0 to 1.0)
"""
now = datetime.now(timezone.utc)
if published_time.tzinfo is None:
published_time = published_time.replace(tzinfo=timezone.utc)
time_diff = (now - published_time).total_seconds() / 3600 # Hours
# Exponential decay with half-life of 6 hours
decay_factor = math.exp(-time_diff / 6.0)
return max(0.1, min(1.0, decay_factor))
def calculate_market_cap_adjustment(self, market_cap: float) -> float:
"""Calculate market cap adjustment factor.
Args:
market_cap: Market capitalization
Returns:
Adjustment factor
"""
if market_cap >= self.market_cap_thresholds["large_cap"]:
return 0.8 # Large caps are less volatile
elif market_cap >= self.market_cap_thresholds["mid_cap"]:
return 1.0 # Mid caps baseline
else:
return 1.3 # Small caps more sensitive
async def calculate_news_importance(self, news_data: Dict[str, Any]) -> float:
"""Calculate importance score of news.
Args:
news_data: News data
Returns:
Importance score (0.0 to 1.0)
"""
title = news_data.get("title", "")
sentiment = news_data.get("sentiment_analysis", {})
entities = news_data.get("entities", {})
importance = 0.5 # Base importance
# High importance keywords
high_importance_keywords = [
"CEO", "실적", "인수", "합병", "파산", "상장", "폐지",
"배당", "분할", "감자", "증자", "공모", "IPO"
]
for keyword in high_importance_keywords:
if keyword in title:
importance += 0.1
# Confidence from sentiment analysis
confidence = sentiment.get("confidence", 0.5)
importance = importance * confidence + 0.5 * (1 - confidence)
# Multiple companies mentioned
companies = entities.get("companies", [])
if len(companies) > 1:
importance += 0.1
return min(1.0, max(0.0, importance))
async def calculate_sentiment_price_correlation(self,
historical_data: List[Dict[str, Any]]) -> float:
"""Calculate correlation between sentiment and price movements.
Args:
historical_data: Historical sentiment and price data
Returns:
Correlation coefficient (-1.0 to 1.0)
"""
if len(historical_data) < 2:
return 0.0
sentiments = [item["news_sentiment"] for item in historical_data]
price_changes = [item["price_change"] for item in historical_data]
# Calculate Pearson correlation
try:
correlation = statistics.correlation(sentiments, price_changes)
return correlation
except statistics.StatisticsError:
return 0.0
async def analyze_multi_stock_impact(self, news_data: Dict[str, Any]) -> List[ImpactResult]:
"""Analyze impact on multiple stocks.
Args:
news_data: News data
Returns:
List of impact results for different stocks
"""
entities = news_data.get("entities", {})
companies = entities.get("companies", [])
results = []
for company in companies:
# Create individual analysis for each company
company_news = news_data.copy()
company_news["entities"] = {"companies": [company]}
# Use dummy market data for testing
dummy_market_data = {
self._get_stock_code(company, entities): {
"current_price": 50000,
"market_cap": 5000000000000,
"volatility": 0.25
}
}
result = await self.analyze_impact(company_news, dummy_market_data)
results.append(result)
return results
async def predict_impact_duration(self, news_type: str, sentiment_strength: float,
market_conditions: Dict[str, Any]) -> Dict[str, Any]:
"""Predict duration of impact.
Args:
news_type: Type of news event
sentiment_strength: Strength of sentiment
market_conditions: Current market conditions
Returns:
Duration predictions
"""
base_duration = {
"earnings": {"short": 4, "medium": 2, "long": 1},
"management": {"short": 8, "medium": 5, "long": 2},
"product": {"short": 2, "medium": 1, "long": 0},
"corporate_action": {"short": 12, "medium": 7, "long": 4}
}
default_duration = {"short": 6, "medium": 3, "long": 1}
duration = base_duration.get(news_type, default_duration)
# Adjust for sentiment strength
strength_multiplier = 0.5 + sentiment_strength
return {
"short_term_hours": int(duration["short"] * strength_multiplier),
"medium_term_days": int(duration["medium"] * strength_multiplier),
"long_term_weeks": int(duration["long"] * strength_multiplier)
}
async def assess_impact_risk(self, impact_prediction: Dict[str, Any]) -> Dict[str, Any]:
"""Assess risk of predicted impact.
Args:
impact_prediction: Impact prediction results
Returns:
Risk assessment
"""
predicted_change = abs(impact_prediction.get("predicted_change_percent", 0))
confidence = impact_prediction.get("confidence", 0.5)
# Risk level based on predicted change and confidence
if predicted_change > 10 and confidence > 0.8:
risk_level = "very_high"
elif predicted_change > 5 and confidence > 0.6:
risk_level = "high"
elif predicted_change > 2 and confidence > 0.4:
risk_level = "medium"
else:
risk_level = "low"
# Calculate potential loss (simplified)
potential_loss = predicted_change * (1 - confidence)
return {
"risk_level": risk_level,
"potential_loss": potential_loss,
"probability_distribution": {
"best_case": predicted_change * confidence,
"worst_case": predicted_change * (2 - confidence),
"most_likely": predicted_change
}
}
async def detect_market_regime(self, market_data: Dict[str, Any]) -> str:
"""Detect current market regime.
Args:
market_data: Market data with trends
Returns:
Market regime type
"""
volatilities = market_data.get("recent_volatility", [0.2])
returns = market_data.get("recent_returns", [0.0])
volume_trend = market_data.get("volume_trend", "stable")
avg_volatility = statistics.mean(volatilities) if volatilities else 0.2
avg_return = statistics.mean(returns) if returns else 0.0
# Simple regime classification
if avg_volatility > 0.3:
return "volatile"
elif avg_return > 0.02:
return "bull"
elif avg_return < -0.02:
return "bear"
else:
return "stable"
async def classify_news_event(self, title: str) -> str:
"""Classify news event type.
Args:
title: News title
Returns:
Event classification
"""
title_lower = title.lower()
if any(word in title_lower for word in ["실적", "earnings", "분기"]):
return "earnings"
elif any(word in title_lower for word in ["ceo", "대표", "임원", "교체"]):
return "management"
elif any(word in title_lower for word in ["제품", "출시", "신제품", "product"]):
return "product"
elif any(word in title_lower for word in ["인수", "합병", "m&a", "분할"]):
return "corporate_action"
elif any(word in title_lower for word in ["규제", "정부", "법안"]):
return "regulatory"
elif any(word in title_lower for word in ["시장", "지수", "market"]):
return "market"
else:
return "other"
async def calculate_sentiment_momentum(self, sentiment_history: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Calculate sentiment momentum.
Args:
sentiment_history: Historical sentiment data
Returns:
Momentum analysis
"""
if len(sentiment_history) < 2:
return {"momentum_score": 0.0, "trend_direction": "stable", "acceleration": 0.0}
sentiments = [item["sentiment"] for item in sentiment_history]
# Calculate momentum as rate of change
recent_change = sentiments[-1] - sentiments[-2] if len(sentiments) >= 2 else 0
# Determine trend direction
if recent_change > 0.1:
trend_direction = "improving"
elif recent_change < -0.1:
trend_direction = "declining"
else:
trend_direction = "stable"
# Calculate acceleration (second derivative)
if len(sentiments) >= 3:
acceleration = (sentiments[-1] - sentiments[-2]) - (sentiments[-2] - sentiments[-3])
else:
acceleration = 0.0
return {
"momentum_score": recent_change,
"trend_direction": trend_direction,
"acceleration": acceleration
}
async def analyze_microstructure_impact(self, news_impact: Dict[str, Any],
order_book_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze microstructure impact.
Args:
news_impact: News impact data
order_book_data: Order book data
Returns:
Microstructure impact analysis
"""
sentiment = news_impact.get("sentiment", "neutral")
importance = news_impact.get("importance", 0.5)
spread = order_book_data.get("bid_ask_spread", 0.001)
depth = order_book_data.get("market_depth", 1000000)
imbalance = order_book_data.get("order_imbalance", 0.0)
# Calculate spread impact
spread_impact = importance * 0.5 # News increases spread
# Calculate liquidity impact
liquidity_impact = -importance * 0.3 # News may reduce liquidity
# Price efficiency
efficiency_score = 0.8 - importance * 0.2 # High impact news may reduce efficiency
return {
"spread_impact": spread_impact,
"liquidity_impact": liquidity_impact,
"price_efficiency": efficiency_score
}
async def analyze_cross_asset_impact(self, news_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze cross-asset impact.
Args:
news_data: News data
Returns:
Cross-asset impact analysis
"""
keywords = news_data.get("entities", {}).get("keywords", [])
sentiment = news_data.get("sentiment_analysis", {}).get("sentiment", "neutral")
# Initialize impacts
impacts = {
"equity_impact": 0.0,
"bond_impact": 0.0,
"currency_impact": 0.0,
"commodity_impact": 0.0
}
# Check for monetary policy news
if any(keyword in ["금리", "통화정책", "중앙은행"] for keyword in keywords):
if sentiment == "positive": # Rate hike expectations
impacts["bond_impact"] = -0.3 # Negative for bonds
impacts["currency_impact"] = 0.2 # Positive for currency
else:
impacts["bond_impact"] = 0.2
impacts["currency_impact"] = -0.2
# Check for economic news
if any(keyword in ["GDP", "경제성장", "인플레이션"] for keyword in keywords):
sentiment_score = 0.3 if sentiment == "positive" else -0.3
impacts["equity_impact"] = sentiment_score
impacts["bond_impact"] = -sentiment_score * 0.5
return impacts
async def backtest_predictions(self, historical_predictions: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Backtest impact predictions.
Args:
historical_predictions: Historical prediction data
Returns:
Backtesting results
"""
if not historical_predictions:
return {"accuracy_metrics": {"mae": 0, "rmse": 0, "hit_rate": 0}}
errors = []
hits = 0
for pred in historical_predictions:
predicted = pred.get("predicted_impact", 0)
actual = pred.get("actual_impact", 0)
error = abs(predicted - actual)
errors.append(error)
# Hit if direction is correct
if (predicted > 0 and actual > 0) or (predicted < 0 and actual < 0):
hits += 1
mae = statistics.mean(errors) if errors else 0
rmse = math.sqrt(statistics.mean([e**2 for e in errors])) if errors else 0
hit_rate = hits / len(historical_predictions) if historical_predictions else 0
return {
"accuracy_metrics": {
"mae": mae,
"rmse": rmse,
"hit_rate": hit_rate
}
}
async def start_impact_tracking(self, news_id: str, initial_prediction: Dict[str, Any]):
"""Start tracking impact of news.
Args:
news_id: Unique news identifier
initial_prediction: Initial impact prediction
"""
self.active_tracking[news_id] = {
"start_time": datetime.now(timezone.utc),
"initial_prediction": initial_prediction,
"updates": []
}
async def update_impact_tracking(self, news_id: str,
actual_data: Dict[str, Any]) -> Dict[str, Any]:
"""Update impact tracking with actual data.
Args:
news_id: News identifier
actual_data: Actual market data
Returns:
Tracking update results
"""
if news_id not in self.active_tracking:
return {"error": "News not being tracked"}
tracking_data = self.active_tracking[news_id]
predicted = tracking_data["initial_prediction"]["predicted_change"]
actual = actual_data.get("price_change", 0)
accuracy = 1.0 - abs(predicted - actual) / max(abs(predicted), 0.01)
tracking_data["updates"].append({
"timestamp": datetime.now(timezone.utc),
"actual_data": actual_data,
"accuracy": accuracy
})
return {
"prediction_accuracy": accuracy,
"remaining_impact": max(0, predicted - actual)
}
async def aggregate_news_impacts(self, news_items: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Aggregate multiple news impacts.
Args:
news_items: List of news items with impacts
Returns:
Aggregated impact analysis
"""
if not news_items:
return {"combined_sentiment": 0.5, "weighted_impact": 0.0}
total_weight = 0
weighted_sentiment = 0
weighted_impact = 0
for item in news_items:
sentiment = item.get("sentiment_analysis", {}).get("score", 0.5)
importance = item.get("importance", 0.5)
total_weight += importance
weighted_sentiment += sentiment * importance
weighted_impact += importance * importance # Quadratic for high-importance items
if total_weight == 0:
return {"combined_sentiment": 0.5, "weighted_impact": 0.0}
combined_sentiment = weighted_sentiment / total_weight
final_impact = weighted_impact / total_weight
# Calculate confidence interval (simplified)
confidence_interval = {
"lower": final_impact * 0.7,
"upper": final_impact * 1.3
}
return {
"combined_sentiment": combined_sentiment,
"weighted_impact": final_impact,
"confidence_interval": confidence_interval
}
async def apply_seasonal_adjustment(self, base_impact: float, date: datetime) -> float:
"""Apply seasonal adjustments.
Args:
base_impact: Base impact score
date: Date of analysis
Returns:
Seasonally adjusted impact
"""
# Year-end effect
if date.month == 12 and date.day > 20:
return base_impact * 0.8 # Reduced impact during holidays
# Earnings season effect
if date.month in [1, 4, 7, 10]: # Quarterly earnings months
return base_impact * 1.1 # Amplified impact
return base_impact
async def generate_visualization_data(self, impact_result: ImpactResult) -> Dict[str, Any]:
"""Generate data for impact visualization.
Args:
impact_result: Impact analysis result
Returns:
Visualization data
"""
return {
"chart_data": {
"impact_score": impact_result.impact_score,
"confidence": impact_result.confidence,
"predicted_change": impact_result.predicted_change,
"time_series": [] # Would contain historical data
},
"risk_metrics": {
"volatility_impact": impact_result.volatility_impact,
"volume_impact": impact_result.volume_impact,
"risk_factors": impact_result.risk_factors
},
"comparable_events": [] # Would contain similar historical events
}
async def generate_impact_alerts(self, news_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate impact-based alerts.
Args:
news_data: News data
Returns:
List of alerts
"""
alerts = []
sentiment = news_data.get("sentiment_analysis", {})
importance = news_data.get("importance", 0.5)
companies = news_data.get("entities", {}).get("companies", [])
# High impact alert
if importance > 0.8 and abs(sentiment.get("score", 0.5) - 0.5) > 0.3:
alerts.append({
"severity": "high",
"message": f"High impact news detected for {', '.join(companies)}",
"stock_codes": [self._get_stock_code(company, {}) for company in companies]
})
# Sector alert
if len(companies) > 2:
alerts.append({
"severity": "medium",
"message": f"Sector-wide impact expected for {len(companies)} companies",
"stock_codes": [self._get_stock_code(company, {}) for company in companies]
})
return alerts
async def calibrate_model(self, calibration_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Calibrate impact model.
Args:
calibration_data: Calibration data with predictions and actuals
Returns:
Calibration metrics
"""
if not calibration_data:
return {"bias": 0, "variance": 0, "calibration_score": 0}
predictions = [item["predicted"] for item in calibration_data]
actuals = [item["actual"] for item in calibration_data]
# Calculate bias
bias = statistics.mean([p - a for p, a in zip(predictions, actuals)])
# Calculate variance
errors = [p - a for p, a in zip(predictions, actuals)]
variance = statistics.variance(errors) if len(errors) > 1 else 0
# Simple calibration score
mae = statistics.mean([abs(e) for e in errors])
calibration_score = max(0, 1 - mae)
return {
"bias": bias,
"variance": variance,
"calibration_score": calibration_score
}
async def calculate_confidence_intervals(self, base_prediction: float,
uncertainty_factors: Dict[str, float]) -> Dict[str, Any]:
"""Calculate confidence intervals.
Args:
base_prediction: Base prediction value
uncertainty_factors: Factors affecting uncertainty
Returns:
Confidence intervals
"""
# Calculate overall uncertainty
sentiment_conf = uncertainty_factors.get("sentiment_confidence", 0.8)
market_vol = uncertainty_factors.get("market_volatility", 0.25)
news_reliability = uncertainty_factors.get("news_reliability", 0.9)
# Combined uncertainty
uncertainty = (1 - sentiment_conf) * 0.4 + market_vol * 0.4 + (1 - news_reliability) * 0.2
# Calculate bounds (simplified normal approximation)
margin = uncertainty * abs(base_prediction) * 1.96 # 95% confidence
return {
"lower_bound": base_prediction - margin,
"upper_bound": base_prediction + margin,
"confidence_level": 0.95
}
async def analyze_batch_impact(self, news_batch: List[Dict[str, Any]]) -> List[ImpactResult]:
"""Analyze impact for batch of news.
Args:
news_batch: List of news items
Returns:
List of impact results
"""
results = []
# Process in smaller batches for efficiency
batch_size = 10
for i in range(0, len(news_batch), batch_size):
batch = news_batch[i:i + batch_size]
# Process batch concurrently
batch_tasks = []
for news_item in batch:
dummy_market_data = {"": {"current_price": 50000, "market_cap": 1000000000000}}
task = self.analyze_impact(news_item, dummy_market_data)
batch_tasks.append(task)
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
for result in batch_results:
if isinstance(result, Exception):
self.logger.error(f"Batch processing error: {result}")
results.append(ImpactResult("", 0.0, "neutral", 0.0))
else:
results.append(result)
return results
async def save_impact_result(self, result: ImpactResult, file_path: Path):
"""Save impact result to file.
Args:
result: Impact result to save
file_path: Path to save file
"""
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(asdict(result), f, ensure_ascii=False, indent=2, default=str)
async def load_impact_result(self, file_path: Path) -> ImpactResult:
"""Load impact result from file.
Args:
file_path: Path to load file from
Returns:
Loaded impact result
"""
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Handle tuple conversion for affected_price_range
if data.get("affected_price_range"):
data["affected_price_range"] = tuple(data["affected_price_range"])
return ImpactResult(**data)
def _get_stock_code(self, company_name: str, entities: Dict[str, Any]) -> str:
"""Get stock code for company.
Args:
company_name: Company name
entities: Entities data
Returns:
Stock code or empty string
"""
# Simple mapping for testing
stock_codes = {
"삼성전자": "005930",
"LG전자": "066570",
"KB금융": "105560",
"신한금융": "055550"
}
return stock_codes.get(company_name, "")
def _determine_direction(self, sentiment: str) -> str:
"""Determine impact direction from sentiment.
Args:
sentiment: Sentiment label
Returns:
Direction label
"""
if sentiment == "positive":
return "positive"
elif sentiment == "negative":
return "negative"
else:
return "neutral"
def _calculate_confidence(self, sentiment_data: Dict[str, Any],
importance_score: float,
stock_data: Dict[str, Any]) -> float:
"""Calculate overall confidence score.
Args:
sentiment_data: Sentiment analysis data
importance_score: News importance score
stock_data: Stock market data
Returns:
Confidence score
"""
sentiment_conf = sentiment_data.get("confidence", 0.5)
# Higher confidence for important news
importance_boost = importance_score * 0.2
# Lower confidence for highly volatile stocks
volatility = stock_data.get("volatility", 0.25)
volatility_penalty = volatility * 0.3
confidence = sentiment_conf + importance_boost - volatility_penalty
return min(1.0, max(0.1, confidence))
def _predict_price_change(self, impact_score: float, direction: str,
volatility: float) -> float:
"""Predict price change percentage.
Args:
impact_score: Impact score
direction: Impact direction
volatility: Stock volatility
Returns:
Predicted price change percentage
"""
# Base change from impact score
base_change = impact_score * 0.1 # Max 10% change
# Apply direction
if direction == "negative":
base_change = -base_change
elif direction == "neutral":
base_change = 0
# Adjust for volatility
volatility_adjustment = 1 + volatility
predicted_change = base_change * volatility_adjustment
return predicted_change
async def _calculate_volume_impact(self, impact_score: float,
stock_data: Dict[str, Any]) -> float:
"""Calculate volume impact multiplier.
Args:
impact_score: Impact score
stock_data: Stock data
Returns:
Volume multiplier
"""
base_volume = stock_data.get("volume", 1000000)
# Volume increases with impact
volume_multiplier = 1.0 + impact_score * 2.0
return volume_multiplier