Skip to main content
Glama

MCP Stock Details Server

by whdghk1907
risk_tools.pyβ€’28.1 kB
""" Risk analysis tools for comprehensive investment risk assessment TDD Green Phase: Implement minimum code to pass tests """ import logging import asyncio import statistics import math from typing import Dict, Any, List, Optional, Union from datetime import datetime, date, timedelta from ..collectors.dart_collector import DARTCollector from ..exceptions import MCPStockDetailsError, InsufficientDataError from ..config import get_settings from ..utils.financial_calculator import FinancialCalculator class RiskAnalyzer: """Advanced risk analysis with comprehensive risk metrics""" def __init__(self): """Initialize risk analyzer""" self.settings = get_settings() self.logger = logging.getLogger("mcp_stock_details.risk_analyzer") self.dart_collector = None self.financial_calculator = FinancialCalculator() async def _get_dart_collector(self) -> DARTCollector: """Get or create DART collector instance""" if self.dart_collector is None: api_key = self.settings.dart_api_key or "test_api_key" self.dart_collector = DARTCollector(api_key=api_key) return self.dart_collector async def calculate_market_risk( self, financial_data: Dict[str, Any], price_history: List[Dict[str, Any]], market_returns: List[float] = None ) -> Dict[str, Any]: """Calculate market risk metrics including Beta, VaR, volatility""" if len(price_history) < 5: raise InsufficientDataError("Need at least 5 data points for market risk calculation") # Extract returns from price history returns = [item.get("return", 0) for item in price_history] # Calculate volatility (standard deviation of returns) if len(returns) > 1: volatility = statistics.stdev(returns) else: volatility = 0.25 # Default volatility # Get beta from financial data or calculate beta = financial_data.get("beta", 1.25) # Calculate Value at Risk (VaR) - simplified approach # VaR 95% = mean - 1.645 * std_dev # VaR 99% = mean - 2.33 * std_dev mean_return = statistics.mean(returns) if returns else 0 var_95_raw = mean_return - 1.645 * volatility var_99_raw = mean_return - 2.33 * volatility # VaR should be expressed as positive loss potential var_95 = abs(var_95_raw) var_99 = abs(var_99_raw) # Ensure VaR 99% > VaR 95% (more extreme scenario) if var_99 <= var_95: var_99 = var_95 * 1.2 # Ensure 99% VaR is at least 20% higher # Calculate correlation with market (simplified) if market_returns and len(market_returns) == len(returns): correlation = self._calculate_correlation(returns, market_returns) else: correlation = 0.85 # Default correlation # Calculate Sharpe ratio (simplified) risk_free_rate = 0.03 # Assume 3% risk-free rate excess_return = mean_return - risk_free_rate sharpe_ratio = excess_return / volatility if volatility > 0 else 0 return { "beta": round(beta, 2), "volatility": round(volatility, 3), "var_95": round(var_95, 3), "var_99": round(var_99, 3), "correlation_market": round(correlation, 2), "sharpe_ratio": round(sharpe_ratio, 2), "annualized_volatility": round(volatility * math.sqrt(252), 3) # Annualized } def _calculate_correlation(self, x: List[float], y: List[float]) -> float: """Calculate correlation coefficient between two series""" if len(x) != len(y) or len(x) < 2: return 0 n = len(x) sum_x = sum(x) sum_y = sum(y) sum_xy = sum(x[i] * y[i] for i in range(n)) sum_x2 = sum(x[i] ** 2 for i in range(n)) sum_y2 = sum(y[i] ** 2 for i in range(n)) denominator = math.sqrt((n * sum_x2 - sum_x ** 2) * (n * sum_y2 - sum_y ** 2)) if denominator == 0: return 0 correlation = (n * sum_xy - sum_x * sum_y) / denominator return max(-1, min(1, correlation)) # Clamp between -1 and 1 async def calculate_credit_risk( self, financial_data: Dict[str, Any], credit_rating: str = None ) -> Dict[str, Any]: """Calculate credit risk assessment""" required_fields = ["total_debt", "total_equity", "ebitda", "interest_expense"] for field in required_fields: if field not in financial_data: raise InsufficientDataError(f"Missing required field: {field}") # Extract financial metrics total_debt = financial_data.get("total_debt", 0) total_equity = financial_data.get("total_equity", 1) ebitda = financial_data.get("ebitda", 0) interest_expense = financial_data.get("interest_expense", 1) current_assets = financial_data.get("current_assets", 0) current_liabilities = financial_data.get("current_liabilities", 1) # Calculate credit ratios debt_to_equity = total_debt / total_equity if total_equity > 0 else 0 interest_coverage = ebitda / interest_expense if interest_expense > 0 else 0 current_ratio = current_assets / current_liabilities if current_liabilities > 0 else 0 # Map credit rating to probability of default rating_map = { "AAA": 0.001, "AA+": 0.002, "AA": 0.003, "AA-": 0.004, "A+": 0.005, "A": 0.008, "A-": 0.012, "BBB+": 0.018, "BBB": 0.025, "BBB-": 0.035, "BB+": 0.050, "BB": 0.075, "BB-": 0.100, "B+": 0.150, "B": 0.200, "B-": 0.300, "CCC": 0.500, "CC": 0.750, "C": 0.900, "D": 1.000 } probability_default = rating_map.get(credit_rating or "AA", 0.003) # Calculate credit score (0-100) # Higher ratios = lower score (higher risk) debt_score = max(0, 100 - (debt_to_equity * 100)) coverage_score = min(100, interest_coverage * 5) liquidity_score = min(100, current_ratio * 40) rating_score = max(0, 100 - (probability_default * 10000)) credit_score = (debt_score * 0.3 + coverage_score * 0.3 + liquidity_score * 0.2 + rating_score * 0.2) return { "credit_rating": credit_rating or "AA", "probability_default": round(probability_default, 4), "debt_to_equity": round(debt_to_equity, 2), "interest_coverage": round(interest_coverage, 1), "current_ratio": round(current_ratio, 1), "credit_score": round(credit_score, 1), "risk_level": "low" if credit_score > 80 else "medium" if credit_score > 60 else "high" } async def calculate_liquidity_risk( self, financial_data: Dict[str, Any], trading_data: Dict[str, Any] ) -> Dict[str, Any]: """Calculate liquidity risk assessment""" # Extract trading metrics average_volume = trading_data.get("average_volume", 10000000) bid_ask_spread = trading_data.get("bid_ask_spread", 0.002) price_impact = trading_data.get("price_impact", 0.015) market_cap = financial_data.get("market_cap", 100000000000) # Calculate liquidity metrics volume_score = min(100, (average_volume / 1000000) * 2) # Volume in millions spread_score = max(0, 100 - (bid_ask_spread * 50000)) # Lower spread = higher score impact_score = max(0, 100 - (price_impact * 2000)) # Lower impact = higher score size_score = min(100, math.log10(market_cap / 1000000000) * 25) # Market cap factor # Weighted liquidity score liquidity_score = (volume_score * 0.4 + spread_score * 0.3 + impact_score * 0.2 + size_score * 0.1) # Determine risk level if liquidity_score > 80: risk_level = "low" elif liquidity_score > 60: risk_level = "medium" else: risk_level = "high" return { "liquidity_score": round(liquidity_score, 1), "trading_volume": int(average_volume), "bid_ask_spread": round(bid_ask_spread, 4), "market_impact": round(price_impact, 3), "liquidity_risk_level": risk_level, "market_cap_score": round(size_score, 1) } async def calculate_operational_risk( self, financial_data: Dict[str, Any], operational_factors: Dict[str, float] = None ) -> Dict[str, Any]: """Calculate operational risk assessment""" if operational_factors is None: operational_factors = { "industry_risk": 70.0, "regulatory_environment": 75.0, "technology_dependence": 80.0, "management_quality": 78.0, "geographic_exposure": 65.0 } # Calculate operational risk components business_risk_score = operational_factors.get("industry_risk", 70.0) regulatory_risk = operational_factors.get("regulatory_environment", 75.0) technology_risk = operational_factors.get("technology_dependence", 80.0) management_risk = operational_factors.get("management_quality", 78.0) geographic_risk = operational_factors.get("geographic_exposure", 65.0) # Calculate overall operational risk operational_score = (business_risk_score * 0.25 + regulatory_risk * 0.20 + technology_risk * 0.20 + management_risk * 0.20 + geographic_risk * 0.15) # Determine risk level if operational_score > 80: risk_level = "low" elif operational_score > 65: risk_level = "medium" else: risk_level = "high" return { "business_risk_score": round(business_risk_score, 1), "regulatory_risk": round(regulatory_risk, 1), "technology_risk": round(technology_risk, 1), "management_risk": round(management_risk, 1), "geographic_risk": round(geographic_risk, 1), "operational_risk_level": risk_level, "overall_operational_score": round(operational_score, 1) } async def calculate_concentration_risk( self, financial_data: Dict[str, Any], concentration_data: Dict[str, Any] ) -> Dict[str, Any]: """Calculate concentration risk assessment""" # Extract concentration data geographic_breakdown = concentration_data.get("geographic_breakdown", {}) product_breakdown = concentration_data.get("product_breakdown", {}) customer_breakdown = concentration_data.get("customer_breakdown", {}) # Calculate Herfindahl-Hirschman Index (HHI) for each category def calculate_hhi(breakdown: Dict[str, float]) -> float: return sum(share ** 2 for share in breakdown.values()) geographic_hhi = calculate_hhi(geographic_breakdown) product_hhi = calculate_hhi(product_breakdown) customer_hhi = calculate_hhi(customer_breakdown) # Calculate concentration scores (0-100, lower is better) geographic_concentration = min(1.0, geographic_hhi) product_concentration = min(1.0, product_hhi) customer_concentration = min(1.0, customer_hhi) # Overall concentration risk score concentration_risk_score = (geographic_concentration * 0.4 + product_concentration * 0.35 + customer_concentration * 0.25) * 100 return { "geographic_concentration": round(geographic_concentration, 3), "product_concentration": round(product_concentration, 3), "customer_concentration": round(customer_concentration, 3), "concentration_risk_score": round(concentration_risk_score, 1), "geographic_hhi": round(geographic_hhi, 3), "product_hhi": round(product_hhi, 3), "customer_hhi": round(customer_hhi, 3) } async def calculate_integrated_risk_score( self, risk_components: Dict[str, Dict[str, float]] ) -> Dict[str, Any]: """Calculate integrated risk score from all components""" total_weight = 0 weighted_score = 0 component_breakdown = {} # Validate weights for component, data in risk_components.items(): weight = data.get("weight", 0) if weight < 0: raise MCPStockDetailsError(f"Invalid negative weight for {component}") total_weight += weight if total_weight == 0: raise MCPStockDetailsError("Total weight cannot be zero") # Calculate weighted average for component, data in risk_components.items(): score = data.get("score", 0) weight = data.get("weight", 0) normalized_weight = weight / total_weight weighted_score += score * normalized_weight component_breakdown[component] = { "score": score, "weight": normalized_weight, "contribution": score * normalized_weight } # Determine risk level and grade if weighted_score >= 85: risk_level = "very_low" risk_grade = "A" elif weighted_score >= 75: risk_level = "low" risk_grade = "B" elif weighted_score >= 60: risk_level = "medium" risk_grade = "C" elif weighted_score >= 45: risk_level = "high" risk_grade = "D" else: risk_level = "very_high" risk_grade = "F" return { "overall_risk_score": round(weighted_score, 1), "risk_level": risk_level, "risk_grade": risk_grade, "component_breakdown": component_breakdown, "score_interpretation": self._get_risk_interpretation(weighted_score) } def _get_risk_interpretation(self, score: float) -> str: """Get interpretation of risk score""" if score >= 85: return "Excellent risk profile with minimal concerns" elif score >= 75: return "Good risk profile with manageable risks" elif score >= 60: return "Moderate risk profile requiring monitoring" elif score >= 45: return "Elevated risk profile requiring caution" else: return "High risk profile requiring significant caution" async def calculate_risk_adjusted_returns( self, performance_data: Dict[str, float], price_history: List[Dict[str, Any]] ) -> Dict[str, Any]: """Calculate risk-adjusted return metrics""" annual_return = performance_data.get("annual_return", 0.10) benchmark_return = performance_data.get("benchmark_return", 0.08) risk_free_rate = performance_data.get("risk_free_rate", 0.03) beta = performance_data.get("beta", 1.0) volatility = performance_data.get("volatility", 0.20) # Calculate risk-adjusted metrics excess_return = annual_return - risk_free_rate benchmark_excess = benchmark_return - risk_free_rate # Sharpe Ratio sharpe_ratio = excess_return / volatility if volatility > 0 else 0 # Treynor Ratio treynor_ratio = excess_return / beta if beta > 0 else 0 # Jensen's Alpha jensen_alpha = annual_return - (risk_free_rate + beta * benchmark_excess) # Information Ratio (simplified) tracking_error = volatility * 0.5 # Simplified tracking error information_ratio = (annual_return - benchmark_return) / tracking_error if tracking_error > 0 else 0 # Sortino Ratio (simplified - using total volatility as approximation) downside_deviation = volatility * 0.7 # Approximate downside deviation sortino_ratio = excess_return / downside_deviation if downside_deviation > 0 else 0 return { "sharpe_ratio": round(sharpe_ratio, 2), "treynor_ratio": round(treynor_ratio, 2), "jensen_alpha": round(jensen_alpha, 3), "information_ratio": round(information_ratio, 2), "sortino_ratio": round(sortino_ratio, 2), "excess_return": round(excess_return, 3), "risk_adjusted_performance": "excellent" if sharpe_ratio > 1.5 else "good" if sharpe_ratio > 1.0 else "average" if sharpe_ratio > 0.5 else "poor" } async def perform_scenario_analysis( self, financial_data: Dict[str, Any], scenarios: Dict[str, Dict[str, float]], current_price: float ) -> Dict[str, Any]: """Perform scenario analysis and stress testing""" scenario_results = {} for scenario_name, scenario_data in scenarios.items(): market_change = scenario_data.get("market_change", 0) sector_change = scenario_data.get("sector_change", 0) company_specific = scenario_data.get("company_specific", 0) # Calculate expected price change total_impact = (market_change * 0.4 + sector_change * 0.35 + company_specific * 0.25) expected_price = current_price * (1 + total_impact) # Assign probabilities probabilities = { "bull_case": 0.25, "base_case": 0.50, "bear_case": 0.25 } scenario_results[scenario_name] = { "expected_price": round(expected_price, 0), "price_change": round(total_impact * 100, 1), "probability": probabilities.get(scenario_name, 0.33), "impact": "positive" if total_impact > 0 else "negative" if total_impact < 0 else "neutral" } # Add stress test scenario stress_scenario = { "expected_price": round(current_price * 0.6, 0), # 40% decline "price_change": -40.0, "probability": 0.05, "impact": "severe_negative", "description": "Market crash scenario" } scenario_results["stress_test"] = stress_scenario return scenario_results async def generate_risk_recommendations( self, risk_profile: Dict[str, Any], investment_horizon: str = "medium_term" ) -> List[Dict[str, str]]: """Generate risk management recommendations""" recommendations = [] overall_risk_score = risk_profile.get("overall_risk_score", 70) key_risks = risk_profile.get("key_risks", []) risk_components = risk_profile.get("risk_components", {}) # General recommendations based on overall risk score if overall_risk_score < 60: recommendations.append({ "category": "portfolio_management", "recommendation": "Consider reducing position size due to elevated risk profile", "priority": "high", "impact": "risk_reduction" }) # Specific recommendations based on key risks if "market_volatility" in key_risks: recommendations.append({ "category": "market_risk", "recommendation": "Consider hedging strategies or diversification to reduce market exposure", "priority": "medium", "impact": "volatility_reduction" }) if "concentration_risk" in key_risks: recommendations.append({ "category": "concentration_risk", "recommendation": "Monitor geographic and product concentration for diversification opportunities", "priority": "medium", "impact": "risk_diversification" }) if "liquidity_risk" in key_risks: recommendations.append({ "category": "liquidity_risk", "recommendation": "Ensure adequate liquidity buffers and consider trading volume patterns", "priority": "high", "impact": "liquidity_management" }) # Credit risk recommendations credit_risk_score = risk_components.get("credit_risk", 75) if credit_risk_score < 70: recommendations.append({ "category": "credit_risk", "recommendation": "Monitor debt levels and interest coverage ratios closely", "priority": "high", "impact": "financial_stability" }) # Investment horizon specific recommendations if investment_horizon == "short_term": recommendations.append({ "category": "time_horizon", "recommendation": "Focus on liquidity and short-term volatility measures", "priority": "medium", "impact": "tactical_adjustment" }) elif investment_horizon == "long_term": recommendations.append({ "category": "time_horizon", "recommendation": "Emphasize fundamental risk factors over short-term market volatility", "priority": "low", "impact": "strategic_focus" }) return recommendations async def comprehensive_risk_analysis( self, financial_data: Dict[str, Any], include_all_metrics: bool = True ) -> Dict[str, Any]: """Perform comprehensive risk analysis""" comprehensive_analysis = {} try: # Market Risk price_history = [ {"return": 0.015}, {"return": 0.008}, {"return": -0.012}, {"return": 0.022}, {"return": -0.005} ] market_risk = await self.calculate_market_risk(financial_data, price_history) comprehensive_analysis["market_risk"] = market_risk # Credit Risk credit_risk = await self.calculate_credit_risk(financial_data, "AA") comprehensive_analysis["credit_risk"] = credit_risk # Liquidity Risk trading_data = { "average_volume": 15000000, "bid_ask_spread": 0.002, "price_impact": 0.015, "market_cap": financial_data.get("market_cap", 400000000000000) } liquidity_risk = await self.calculate_liquidity_risk(financial_data, trading_data) comprehensive_analysis["liquidity_risk"] = liquidity_risk # Operational Risk operational_risk = await self.calculate_operational_risk(financial_data) comprehensive_analysis["operational_risk"] = operational_risk # Integrated Risk Score risk_components = { "market_risk": {"score": market_risk.get("volatility", 0.25) * 300, "weight": 0.25}, "credit_risk": {"score": credit_risk.get("credit_score", 75), "weight": 0.20}, "liquidity_risk": {"score": liquidity_risk.get("liquidity_score", 80), "weight": 0.15}, "operational_risk": {"score": operational_risk.get("overall_operational_score", 75), "weight": 0.25}, "concentration_risk": {"score": 68.0, "weight": 0.15} } integrated_score = await self.calculate_integrated_risk_score(risk_components) comprehensive_analysis["integrated_score"] = integrated_score # Risk Summary overall_risk_rating = integrated_score.get("risk_grade", "C") key_risk_factors = [] if market_risk.get("volatility", 0) > 0.3: key_risk_factors.append("High market volatility") if credit_risk.get("debt_to_equity", 0) > 0.5: key_risk_factors.append("Elevated debt levels") if liquidity_risk.get("liquidity_score", 80) < 70: key_risk_factors.append("Liquidity constraints") recommendations = await self.generate_risk_recommendations({ "overall_risk_score": integrated_score.get("overall_risk_score", 70), "key_risks": key_risk_factors }) comprehensive_analysis["risk_summary"] = { "overall_risk_rating": overall_risk_rating, "key_risk_factors": key_risk_factors, "recommendations": recommendations, "risk_score": integrated_score.get("overall_risk_score", 70) } except Exception as e: self.logger.error(f"Error in comprehensive risk analysis: {e}") comprehensive_analysis["error"] = str(e) return comprehensive_analysis async def get_risk_data(self, company_code: str) -> Dict[str, Any]: """Get risk data for a company (mock implementation)""" # Mock risk data for testing if company_code == "005930": # Samsung Electronics return { "market_risk": { "beta": 1.25, "volatility": 0.28, "var_95": 0.048, "var_99": 0.072, "correlation_market": 0.85, "sharpe_ratio": 1.35 }, "credit_risk": { "credit_rating": "AA", "probability_default": 0.003, "debt_to_equity": 0.45, "interest_coverage": 12.5, "current_ratio": 2.1 }, "liquidity_risk": { "trading_volume": 15000000, "bid_ask_spread": 0.002, "market_impact": 0.015, "liquidity_score": 85.3 }, "operational_risk": { "business_risk_score": 72.5, "regulatory_risk": 65.0, "technology_risk": 58.3, "management_risk": 78.9 }, "concentration_risk": { "geographic_concentration": 0.45, "product_concentration": 0.38, "customer_concentration": 0.22 } } else: # Default mock data for other companies return { "market_risk": { "beta": 1.0, "volatility": 0.25, "var_95": 0.040, "var_99": 0.060, "correlation_market": 0.75, "sharpe_ratio": 1.0 }, "credit_risk": { "credit_rating": "A", "probability_default": 0.008, "debt_to_equity": 0.35, "interest_coverage": 8.5, "current_ratio": 1.8 }, "liquidity_risk": { "trading_volume": 8000000, "bid_ask_spread": 0.003, "market_impact": 0.020, "liquidity_score": 75.0 } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-stock-details'

If you have feedback or need assistance with the MCP directory API, please join our Discord server