valuation_tools.pyβ’19.6 kB
"""
Valuation analysis tools for comprehensive company valuation
TDD Green Phase: Implement minimum code to pass tests
"""
import logging
import asyncio
import statistics
from typing import Dict, Any, List, Optional, Union
from datetime import datetime, date, timedelta
import math
from ..collectors.dart_collector import DARTCollector
from ..exceptions import MCPStockDetailsError, InsufficientDataError
from ..config import get_settings
from ..utils.financial_calculator import FinancialCalculator
class ValuationAnalyzer:
"""Advanced valuation analysis with comprehensive metrics"""
def __init__(self):
"""Initialize valuation analyzer"""
self.settings = get_settings()
self.logger = logging.getLogger("mcp_stock_details.valuation_analyzer")
self.dart_collector = None
self.financial_calculator = FinancialCalculator()
async def _get_dart_collector(self) -> DARTCollector:
"""Get or create DART collector instance"""
if self.dart_collector is None:
api_key = self.settings.dart_api_key or "test_api_key"
self.dart_collector = DARTCollector(api_key=api_key)
return self.dart_collector
async def calculate_price_multiples(self, financial_data: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate price multiples (PER, PBR, PSR)"""
# Validate input data
required_fields = ["current_price", "shares_outstanding", "financial_metrics"]
for field in required_fields:
if field not in financial_data:
raise InsufficientDataError(f"Missing required field: {field}")
metrics = financial_data["financial_metrics"]
current_price = financial_data["current_price"]
shares_outstanding = financial_data["shares_outstanding"]
# Validate positive values
if current_price <= 0 or shares_outstanding <= 0:
raise MCPStockDetailsError("Price and shares must be positive values")
if metrics.get("revenue", 0) < 0:
raise MCPStockDetailsError("Revenue cannot be negative")
# Calculate market cap
market_cap = current_price * shares_outstanding
# Calculate price multiples
multiples = {
"market_cap": market_cap,
"current_price": current_price,
"shares_outstanding": shares_outstanding
}
# PER (Price-to-Earnings Ratio)
eps = metrics.get("eps", 0)
if eps > 0:
multiples["per"] = current_price / eps
else:
multiples["per"] = None
# PBR (Price-to-Book Ratio)
book_value = metrics.get("book_value", 0)
if book_value > 0:
book_value_per_share = book_value / shares_outstanding
multiples["pbr"] = current_price / book_value_per_share
multiples["book_value_per_share"] = book_value_per_share
else:
multiples["pbr"] = None
multiples["book_value_per_share"] = 0
# PSR (Price-to-Sales Ratio)
revenue = metrics.get("revenue", 0)
if revenue > 0:
revenue_per_share = revenue / shares_outstanding
multiples["psr"] = current_price / revenue_per_share
else:
multiples["psr"] = None
return multiples
async def calculate_ev_multiples(self, financial_data: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate enterprise value multiples"""
metrics = financial_data["financial_metrics"]
market_cap = financial_data.get("market_cap")
if not market_cap:
market_cap = financial_data["current_price"] * financial_data["shares_outstanding"]
# Calculate net debt
total_debt = metrics.get("total_debt", 0)
cash = metrics.get("cash", 0)
net_debt = total_debt - cash
# Calculate enterprise value
enterprise_value = market_cap + net_debt
ev_multiples = {
"enterprise_value": enterprise_value,
"net_debt": net_debt,
"market_cap": market_cap
}
# EV/EBITDA
ebitda = metrics.get("ebitda", 0)
if ebitda > 0:
ev_multiples["ev_ebitda"] = enterprise_value / ebitda
else:
ev_multiples["ev_ebitda"] = None
# EV/Sales
revenue = metrics.get("revenue", 0)
if revenue > 0:
ev_multiples["ev_sales"] = enterprise_value / revenue
else:
ev_multiples["ev_sales"] = None
# EV/FCF (Free Cash Flow)
fcf = metrics.get("free_cash_flow", 0)
if fcf > 0:
ev_multiples["ev_fcf"] = enterprise_value / fcf
else:
ev_multiples["ev_fcf"] = None
return ev_multiples
async def calculate_historical_bands(
self,
company_code: str,
period: str = "3Y",
metrics: List[str] = None
) -> Dict[str, Any]:
"""Calculate historical valuation bands"""
if metrics is None:
metrics = ["per", "pbr"]
# For testing, generate mock historical data
bands = {}
for metric in metrics:
# Generate mock historical values
if metric == "per":
historical_values = [25.5, 28.3, 22.1, 30.2, 26.8, 24.7, 29.1, 27.5]
elif metric == "pbr":
historical_values = [1.2, 1.4, 1.1, 1.6, 1.3, 1.2, 1.5, 1.3]
else:
historical_values = [10.5, 12.3, 9.1, 14.2, 11.8, 10.7, 13.1, 12.5]
# Calculate statistical measures
bands[metric] = {
"current": historical_values[-1], # Latest value
"mean": statistics.mean(historical_values),
"median": statistics.median(historical_values),
"std_dev": statistics.stdev(historical_values),
"percentile_25": statistics.quantiles(historical_values, n=4)[0],
"percentile_75": statistics.quantiles(historical_values, n=4)[2],
"min": min(historical_values),
"max": max(historical_values),
"period": period,
"data_points": len(historical_values)
}
return bands
async def compare_with_peers(
self,
company_code: str,
industry_code: str = None,
metrics: List[str] = None
) -> Dict[str, Any]:
"""Compare valuation metrics with industry peers"""
if metrics is None:
metrics = ["per", "pbr", "psr", "ev_ebitda"]
# Mock company metrics (Samsung Electronics)
company_metrics = {
"per": 28.4,
"pbr": 1.4,
"psr": 1.68,
"ev_ebitda": 14.7
}
# Mock peer group metrics
peer_metrics = {
"mean": {
"per": 22.1,
"pbr": 1.6,
"psr": 2.3,
"ev_ebitda": 12.5
},
"median": {
"per": 21.5,
"pbr": 1.5,
"psr": 2.1,
"ev_ebitda": 11.8
},
"peer_count": 15
}
# Mock industry metrics
industry_metrics = {
"mean": {
"per": 25.3,
"pbr": 1.8,
"psr": 2.6,
"ev_ebitda": 13.2
}
}
# Calculate percentile rankings
percentile_ranking = {}
for metric in metrics:
company_value = company_metrics.get(metric, 0)
peer_mean = peer_metrics["mean"].get(metric, 0)
# Simple percentile calculation (mock)
if company_value > peer_mean:
percentile_ranking[metric] = 75 # Above average
else:
percentile_ranking[metric] = 25 # Below average
return {
"company_metrics": company_metrics,
"peer_metrics": peer_metrics,
"industry_metrics": industry_metrics,
"percentile_ranking": percentile_ranking,
"industry_code": industry_code or "26211"
}
async def analyze_dividend_metrics(self, financial_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze dividend yield and payout metrics"""
metrics = financial_data["financial_metrics"]
current_price = financial_data["current_price"]
dividend_per_share = metrics.get("dividend_per_share", 0)
net_income = metrics.get("net_income", 0)
free_cash_flow = metrics.get("free_cash_flow", 0)
shares_outstanding = financial_data["shares_outstanding"]
dividend_analysis = {
"dividend_per_share": dividend_per_share,
"current_price": current_price
}
# Dividend yield
if current_price > 0 and dividend_per_share > 0:
dividend_analysis["dividend_yield"] = (dividend_per_share / current_price) * 100
else:
dividend_analysis["dividend_yield"] = 0
# Payout ratio (dividends / net income)
if net_income > 0:
total_dividends = dividend_per_share * shares_outstanding
dividend_analysis["payout_ratio"] = (total_dividends / net_income) * 100
else:
dividend_analysis["payout_ratio"] = 0
# Dividend coverage (earnings coverage)
if dividend_per_share > 0:
eps = metrics.get("eps", 0)
if eps > 0:
dividend_analysis["dividend_coverage"] = eps / dividend_per_share
else:
dividend_analysis["dividend_coverage"] = 0
else:
dividend_analysis["dividend_coverage"] = float('inf')
# Free cash flow payout ratio
if free_cash_flow > 0:
total_dividends = dividend_per_share * shares_outstanding
dividend_analysis["free_cash_flow_payout"] = (total_dividends / free_cash_flow) * 100
else:
dividend_analysis["free_cash_flow_payout"] = 0
return dividend_analysis
async def estimate_fair_value(
self,
financial_data: Dict[str, Any],
methods: List[str] = None
) -> Dict[str, Any]:
"""Estimate fair value using multiple valuation methods"""
if methods is None:
methods = ["dcf", "multiple", "dividend_discount"]
current_price = financial_data["current_price"]
metrics = financial_data["financial_metrics"]
fair_value_estimates = {}
# DCF valuation (simplified)
if "dcf" in methods:
fcf = metrics.get("free_cash_flow", 0)
if fcf > 0:
# Simple DCF with 10% discount rate and 3% growth
growth_rate = 0.03
discount_rate = 0.10
terminal_multiple = 15
# Project 5 years of cash flows
projected_fcf = []
for year in range(1, 6):
future_fcf = fcf * ((1 + growth_rate) ** year)
pv_fcf = future_fcf / ((1 + discount_rate) ** year)
projected_fcf.append(pv_fcf)
# Terminal value
terminal_fcf = fcf * ((1 + growth_rate) ** 5)
terminal_value = terminal_fcf * terminal_multiple
pv_terminal = terminal_value / ((1 + discount_rate) ** 5)
total_value = sum(projected_fcf) + pv_terminal
shares_outstanding = financial_data["shares_outstanding"]
fair_value_estimates["dcf_value"] = total_value / shares_outstanding
else:
fair_value_estimates["dcf_value"] = None
# Multiple-based valuation
if "multiple" in methods:
# Use industry average PER of 22
industry_per = 22.0
eps = metrics.get("eps", 0)
if eps > 0:
fair_value_estimates["multiple_value"] = eps * industry_per
else:
fair_value_estimates["multiple_value"] = None
# Dividend discount model
if "dividend_discount" in methods:
dividend_per_share = metrics.get("dividend_per_share", 0)
if dividend_per_share > 0:
# Gordon growth model
dividend_growth = 0.05 # 5% growth
required_return = 0.12 # 12% required return
if required_return > dividend_growth:
next_dividend = dividend_per_share * (1 + dividend_growth)
fair_value_estimates["dividend_discount_value"] = next_dividend / (required_return - dividend_growth)
else:
fair_value_estimates["dividend_discount_value"] = None
else:
fair_value_estimates["dividend_discount_value"] = None
# Calculate weighted average
valid_estimates = [v for v in fair_value_estimates.values() if v is not None and v > 0]
if valid_estimates:
fair_value_estimates["weighted_fair_value"] = statistics.mean(valid_estimates)
# Calculate valuation range
min_estimate = min(valid_estimates)
max_estimate = max(valid_estimates)
fair_value_estimates["valuation_range"] = {
"low": min_estimate,
"high": max_estimate,
"mid": (min_estimate + max_estimate) / 2
}
# Investment recommendation
weighted_value = fair_value_estimates["weighted_fair_value"]
upside_downside = ((weighted_value - current_price) / current_price) * 100
fair_value_estimates["upside_downside"] = upside_downside
if upside_downside > 20:
fair_value_estimates["recommendation"] = "BUY"
elif upside_downside > 5:
fair_value_estimates["recommendation"] = "HOLD"
else:
fair_value_estimates["recommendation"] = "SELL"
else:
fair_value_estimates["weighted_fair_value"] = None
fair_value_estimates["recommendation"] = "INSUFFICIENT_DATA"
fair_value_estimates["upside_downside"] = 0
return fair_value_estimates
async def generate_valuation_summary(
self,
financial_data: Dict[str, Any],
include_peer_comparison: bool = True,
include_historical_analysis: bool = True
) -> Dict[str, Any]:
"""Generate comprehensive valuation summary"""
# Calculate all valuation metrics
price_multiples = await self.calculate_price_multiples(financial_data)
ev_multiples = await self.calculate_ev_multiples(financial_data)
dividend_analysis = await self.analyze_dividend_metrics(financial_data)
fair_value_analysis = await self.estimate_fair_value(financial_data)
summary = {
"price_multiples": price_multiples,
"ev_multiples": ev_multiples,
"dividend_analysis": dividend_analysis,
"fair_value_analysis": fair_value_analysis
}
# Add peer comparison if requested
if include_peer_comparison:
company_code = financial_data.get("company_code", "005930")
peer_comparison = await self.compare_with_peers(company_code)
summary["peer_comparison"] = peer_comparison
# Add historical analysis if requested
if include_historical_analysis:
company_code = financial_data.get("company_code", "005930")
historical_bands = await self.calculate_historical_bands(company_code)
summary["historical_analysis"] = historical_bands
# Generate overall assessment
current_price = financial_data["current_price"]
fair_value = fair_value_analysis.get("weighted_fair_value")
if fair_value and fair_value > 0:
deviation = ((current_price - fair_value) / fair_value) * 100
if abs(deviation) < 10:
assessment = "FAIRLY_VALUED"
elif deviation > 10:
assessment = "OVERVALUED"
else:
assessment = "UNDERVALUED"
else:
assessment = "INSUFFICIENT_DATA"
summary["valuation_assessment"] = assessment
# Generate key insights
insights = []
if price_multiples.get("per"):
if price_multiples["per"] > 30:
insights.append("High P/E ratio suggests growth expectations")
elif price_multiples["per"] < 15:
insights.append("Low P/E ratio may indicate value opportunity")
if dividend_analysis.get("dividend_yield", 0) > 3:
insights.append("Attractive dividend yield for income investors")
summary["key_insights"] = insights
# Risk factors
risk_factors = []
if ev_multiples.get("net_debt", 0) > price_multiples.get("market_cap", 0) * 0.5:
risk_factors.append("High debt levels relative to market cap")
summary["risk_factors"] = risk_factors
return summary
async def get_market_data(self, company_code: str) -> Dict[str, Any]:
"""Get market data for valuation calculations (mock implementation)"""
# Mock market data for testing
if company_code == "005930": # Samsung Electronics
return {
"current_price": 73000,
"market_cap": 435_000_000_000_000,
"shares_outstanding": 5_969_782_550,
"52_week_high": 80000,
"52_week_low": 65000,
"trading_volume": 15_000_000,
"financial_data": {
"2023": {
"revenue": 258_774_000_000_000,
"net_profit": 15_349_000_000_000,
"book_value": 319_684_000_000_000,
"ebitda": 28_500_000_000_000,
"enterprise_value": 420_000_000_000_000,
"free_cash_flow": 29_163_000_000_000,
"total_debt": 85_000_000_000_000,
"cash_and_equivalents": 25_000_000_000_000
}
}
}
else:
# Default mock data for other companies
return {
"current_price": 50000,
"market_cap": 100_000_000_000_000,
"shares_outstanding": 2_000_000_000,
"financial_data": {
"2023": {
"revenue": 50_000_000_000_000,
"net_profit": 3_000_000_000_000,
"book_value": 30_000_000_000_000,
"ebitda": 8_000_000_000_000,
"free_cash_flow": 5_000_000_000_000,
"total_debt": 15_000_000_000_000,
"cash_and_equivalents": 5_000_000_000_000
}
}
}