shareholder_tools.pyβ’25.6 kB
"""
Shareholder analysis tools for comprehensive ownership and governance analysis
TDD Green Phase: Implement minimum code to pass tests
"""
import logging
import asyncio
import statistics
import math
from typing import Dict, Any, List, Optional, Union
from datetime import datetime, date, timedelta
from ..collectors.dart_collector import DARTCollector
from ..exceptions import MCPStockDetailsError, InsufficientDataError
from ..config import get_settings
from ..utils.financial_calculator import FinancialCalculator
class ShareholderAnalyzer:
"""Advanced shareholder analysis with comprehensive ownership metrics"""
def __init__(self):
"""Initialize shareholder analyzer"""
self.settings = get_settings()
self.logger = logging.getLogger("mcp_stock_details.shareholder_analyzer")
self.dart_collector = None
self.financial_calculator = FinancialCalculator()
async def _get_dart_collector(self) -> DARTCollector:
"""Get or create DART collector instance"""
if self.dart_collector is None:
api_key = self.settings.dart_api_key or "test_api_key"
self.dart_collector = DARTCollector(api_key=api_key)
return self.dart_collector
async def analyze_major_shareholders(
self,
shareholder_data: Dict[str, Any],
min_percentage: float = 0.5
) -> Dict[str, Any]:
"""Analyze major shareholders and their holdings"""
if min_percentage < 0:
raise MCPStockDetailsError("Minimum percentage cannot be negative")
if not shareholder_data.get("major_shareholders"):
raise InsufficientDataError("No major shareholder data available")
major_shareholders = shareholder_data["major_shareholders"]
# Filter shareholders above minimum percentage
filtered_shareholders = [
sh for sh in major_shareholders
if sh.get("percentage", 0) >= min_percentage
]
# Calculate concentration metrics
top_5_shares = [sh.get("percentage", 0) for sh in major_shareholders[:5]]
top_5_concentration = sum(top_5_shares)
# HHI calculation for top shareholders
hhi = sum(share ** 2 for share in top_5_shares)
# Ownership type breakdown
ownership_by_type = {}
for sh in major_shareholders:
sh_type = sh.get("type", "unknown")
if sh_type not in ownership_by_type:
ownership_by_type[sh_type] = 0
ownership_by_type[sh_type] += sh.get("percentage", 0)
return {
"major_shareholders": filtered_shareholders,
"top_5_concentration": top_5_concentration,
"hhi_index": hhi,
"ownership_summary": {
"total_major_shareholders": len(filtered_shareholders),
"ownership_by_type": ownership_by_type,
"largest_shareholder": major_shareholders[0] if major_shareholders else None
}
}
async def calculate_ownership_structure(
self,
shareholder_data: Dict[str, Any],
market_data: Dict[str, float] = None
) -> Dict[str, Any]:
"""Calculate detailed ownership structure"""
if market_data is None:
market_data = {}
# Extract ownership data
foreign_ownership = market_data.get("foreign_ownership", 50.0)
institutional_ownership = market_data.get("institutional_ownership", 30.0)
# Calculate individual ownership (residual)
individual_ownership = max(0, 100 - foreign_ownership - institutional_ownership)
# Calculate free float (estimated)
major_shareholders = shareholder_data.get("major_shareholders", [])
insider_ownership = sum(
sh.get("percentage", 0) for sh in major_shareholders
if sh.get("type") in ["individual", "corporate"]
)
free_float = max(0, 100 - insider_ownership)
return {
"free_float": round(free_float, 2),
"foreign_ownership": round(foreign_ownership, 2),
"institutional_ownership": round(institutional_ownership, 2),
"individual_ownership": round(individual_ownership, 2),
"insider_ownership": round(insider_ownership, 2),
"ownership_concentration": "high" if insider_ownership > 50 else "moderate" if insider_ownership > 25 else "low"
}
async def analyze_dividend_history(
self,
dividend_data: List[Dict[str, Any]],
financial_metrics: Dict[str, float] = None
) -> Dict[str, Any]:
"""Analyze dividend history and sustainability"""
if not dividend_data or len(dividend_data) < 2:
raise InsufficientDataError("Insufficient dividend history data")
if financial_metrics is None:
financial_metrics = {}
# Sort by year (most recent first)
sorted_dividends = sorted(dividend_data, key=lambda x: x["year"], reverse=True)
# Calculate growth rates
dividend_amounts = [d["dividend_per_share"] for d in sorted_dividends]
dividend_yields = [d.get("dividend_yield", 0) for d in sorted_dividends]
# Calculate compound annual growth rate
if len(dividend_amounts) >= 2:
start_dividend = dividend_amounts[-1] # Oldest
end_dividend = dividend_amounts[0] # Most recent
years = len(dividend_amounts) - 1
if start_dividend > 0:
cagr = ((end_dividend / start_dividend) ** (1/years) - 1) * 100
else:
cagr = 0
else:
cagr = 0
# Calculate average yield
avg_yield = statistics.mean(dividend_yields) if dividend_yields else 0
# Assess consistency (coefficient of variation)
if len(dividend_amounts) > 1 and statistics.mean(dividend_amounts) > 0:
cv = statistics.stdev(dividend_amounts) / statistics.mean(dividend_amounts)
consistency_score = max(0, 100 - (cv * 100))
else:
consistency_score = 0
# Calculate payout ratio if financial data available
net_income = financial_metrics.get("net_income", 0)
latest_total_dividend = sorted_dividends[0].get("total_dividend", 0) if sorted_dividends else 0
if net_income > 0 and latest_total_dividend > 0:
payout_ratio = (latest_total_dividend / net_income) * 100
else:
payout_ratio = sorted_dividends[0].get("payout_ratio", 0) if sorted_dividends else 0
# Sustainability assessment
sustainability_score = min(100, max(0,
consistency_score * 0.4 +
(100 - min(100, payout_ratio)) * 0.4 +
(min(100, max(0, cagr + 50)) * 0.2) # Normalize CAGR to 0-100
))
return {
"dividend_growth_rate": round(cagr, 2),
"average_yield": round(avg_yield, 2),
"dividend_consistency": round(consistency_score, 1),
"payout_ratio": round(payout_ratio, 1),
"sustainability_score": round(sustainability_score, 1),
"years_of_data": len(dividend_data),
"latest_dividend": sorted_dividends[0]["dividend_per_share"] if sorted_dividends else 0
}
async def calculate_governance_metrics(
self,
governance_data: Dict[str, Any],
company_metrics: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Calculate corporate governance metrics"""
if company_metrics is None:
company_metrics = {}
board_comp = governance_data.get("board_composition", {})
committee_comp = governance_data.get("committee_composition", {})
# Board independence ratio
total_directors = board_comp.get("total_directors", 11)
independent_directors = board_comp.get("independent_directors", 5)
board_independence_ratio = (independent_directors / total_directors) * 100 if total_directors > 0 else 0
# Board diversity score
female_directors = board_comp.get("female_directors", 2)
foreign_directors = board_comp.get("foreign_directors", 1)
diversity_score = ((female_directors + foreign_directors) / total_directors) * 100 if total_directors > 0 else 0
# Committee effectiveness
audit_committee = committee_comp.get("audit_committee", {})
audit_independence = (audit_committee.get("independent", 3) / audit_committee.get("total", 3)) * 100 if audit_committee.get("total", 0) > 0 else 100
comp_committee = committee_comp.get("compensation_committee", {})
comp_independence = (comp_committee.get("independent", 3) / comp_committee.get("total", 4)) * 100 if comp_committee.get("total", 0) > 0 else 75
committee_effectiveness = (audit_independence + comp_independence) / 2
# Overall governance score
overall_score = (
board_independence_ratio * 0.35 +
diversity_score * 0.25 +
committee_effectiveness * 0.40
)
# Assign governance grade
if overall_score >= 85:
grade = "A"
elif overall_score >= 75:
grade = "B"
elif overall_score >= 60:
grade = "C"
elif overall_score >= 45:
grade = "D"
else:
grade = "F"
return {
"board_independence_ratio": round(board_independence_ratio, 1),
"board_diversity_score": round(diversity_score, 1),
"committee_effectiveness": round(committee_effectiveness, 1),
"overall_governance_score": round(overall_score, 1),
"governance_grade": grade,
"audit_committee_independence": round(audit_independence, 1),
"compensation_committee_independence": round(comp_independence, 1)
}
async def analyze_shareholder_concentration(
self,
shareholder_data: Dict[str, Any],
analysis_depth: int = 10
) -> Dict[str, Any]:
"""Analyze shareholder concentration and ownership distribution"""
if analysis_depth <= 0:
raise MCPStockDetailsError("Analysis depth must be positive")
if not shareholder_data.get("major_shareholders"):
raise InsufficientDataError("No shareholder data available for concentration analysis")
major_shareholders = shareholder_data.get("major_shareholders", [])
# Get top shareholders up to analysis depth
top_shareholders = major_shareholders[:min(analysis_depth, len(major_shareholders))]
percentages = [sh.get("percentage", 0) for sh in top_shareholders]
# Calculate HHI (Herfindahl-Hirschman Index)
hhi = sum(p ** 2 for p in percentages)
# Calculate concentration ratios
top_1_concentration = percentages[0] if percentages else 0
top_5_concentration = sum(percentages[:5])
top_10_concentration = sum(percentages[:10])
# Determine concentration level
if hhi > 2500:
concentration_level = "very_high"
elif hhi > 1500:
concentration_level = "high"
elif hhi > 1000:
concentration_level = "moderate"
else:
concentration_level = "low"
return {
"hhi_index": round(hhi, 2),
"top_1_concentration": round(top_1_concentration, 2),
"top_5_concentration": round(top_5_concentration, 2),
"top_10_concentration": round(top_10_concentration, 2),
"concentration_level": concentration_level,
"analyzed_shareholders": len(top_shareholders)
}
async def track_shareholder_changes(
self,
historical_data: List[Dict[str, Any]],
current_data: List[Dict[str, Any]],
period: str = "1Y"
) -> Dict[str, Any]:
"""Track changes in shareholder composition over time"""
# Create mapping for easy comparison
historical_map = {sh["name"]: sh["percentage"] for sh in historical_data}
current_map = {sh["name"]: sh["percentage"] for sh in current_data}
# Identify changes
significant_changes = []
new_entrants = []
exits = []
# Check for changes and new entrants
for name, current_pct in current_map.items():
if name in historical_map:
historical_pct = historical_map[name]
change = current_pct - historical_pct
if abs(change) >= 0.1: # Significant change threshold
significant_changes.append({
"name": name,
"historical_percentage": historical_pct,
"current_percentage": current_pct,
"change": round(change, 2),
"change_type": "increase" if change > 0 else "decrease"
})
else:
new_entrants.append({
"name": name,
"current_percentage": current_pct
})
# Check for exits
for name, historical_pct in historical_map.items():
if name not in current_map:
exits.append({
"name": name,
"historical_percentage": historical_pct
})
# Calculate net changes
total_increases = sum(ch["change"] for ch in significant_changes if ch["change"] > 0)
total_decreases = sum(abs(ch["change"]) for ch in significant_changes if ch["change"] < 0)
return {
"period": period,
"significant_changes": significant_changes,
"new_entrants": new_entrants,
"exits": exits,
"net_changes": {
"total_increases": round(total_increases, 2),
"total_decreases": round(total_decreases, 2),
"net_change": round(total_increases - total_decreases, 2)
},
"change_summary": {
"major_changes": len(significant_changes),
"new_entries": len(new_entrants),
"exits": len(exits)
}
}
async def analyze_voting_power(
self,
shareholder_data: Dict[str, Any],
voting_agreements: List[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Analyze voting power and control structures"""
if voting_agreements is None:
voting_agreements = []
major_shareholders = shareholder_data.get("major_shareholders", [])
# Individual voting power
individual_voting_power = [
{
"name": sh["name"],
"voting_percentage": sh.get("percentage", 0),
"voting_power_index": sh.get("percentage", 0) ** 2 # Shapley value approximation
}
for sh in major_shareholders
]
# Coalition analysis
coalition_analysis = []
for agreement in voting_agreements:
parties = agreement.get("parties", [])
coalition_power = sum(
sh.get("percentage", 0) for sh in major_shareholders
if sh["name"] in parties
)
coalition_analysis.append({
"coalition_type": agreement.get("type", "unknown"),
"parties": parties,
"combined_voting_power": round(coalition_power, 2),
"control_potential": "high" if coalition_power > 50 else "moderate" if coalition_power > 25 else "low"
})
# Control assessment
largest_shareholder_pct = major_shareholders[0].get("percentage", 0) if major_shareholders else 0
control_assessment = {
"single_party_control": largest_shareholder_pct > 50,
"coalition_control_possible": any(c["combined_voting_power"] > 50 for c in coalition_analysis),
"control_type": "concentrated" if largest_shareholder_pct > 30 else "dispersed",
"contestability": "low" if largest_shareholder_pct > 50 else "moderate" if largest_shareholder_pct > 25 else "high"
}
return {
"individual_voting_power": individual_voting_power,
"coalition_analysis": coalition_analysis,
"control_assessment": control_assessment,
"shareholder_rights": {
"minority_protection": "strong" if largest_shareholder_pct < 30 else "moderate" if largest_shareholder_pct < 50 else "weak",
"board_nomination_threshold": 3.0, # Typical threshold
"special_resolution_threshold": 67.0
}
}
async def generate_shareholder_insights(
self,
shareholder_data: Dict[str, Any],
market_context: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Generate strategic insights about shareholder structure"""
if market_context is None:
market_context = {}
major_shareholders = shareholder_data.get("major_shareholders", [])
# Ownership stability assessment
institutional_pct = sum(
sh.get("percentage", 0) for sh in major_shareholders
if sh.get("type") == "institutional"
)
stability_score = min(100, max(0, institutional_pct * 1.5 + 30))
ownership_stability = {
"stability_score": round(stability_score, 1),
"assessment": "high" if stability_score > 75 else "moderate" if stability_score > 50 else "low",
"key_factors": [
f"Institutional ownership: {institutional_pct:.1f}%",
f"Top shareholder concentration: {major_shareholders[0].get('percentage', 0):.1f}%" if major_shareholders else "No major shareholders"
]
}
# Governance strengths and concerns
governance_strengths = [
"Professional institutional investors provide oversight",
"Diversified ownership structure reduces single-party control risk"
]
governance_concerns = [
"Potential for short-term pressure from institutional investors",
"Complex ownership structure may reduce transparency"
]
# Dividend policy assessment
dividend_policy_assessment = {
"shareholder_friendliness": "moderate",
"sustainability_outlook": "stable",
"policy_consistency": "good"
}
# Investment implications
investment_implications = [
"Strong institutional backing provides stability",
"Governance structure supports long-term value creation",
"Dividend policy aligned with shareholder interests"
]
return {
"ownership_stability": ownership_stability,
"governance_strengths": governance_strengths,
"governance_concerns": governance_concerns,
"dividend_policy_assessment": dividend_policy_assessment,
"investment_implications": investment_implications
}
async def comprehensive_shareholder_analysis(
self,
company_code: str,
include_all_metrics: bool = True
) -> Dict[str, Any]:
"""Perform comprehensive shareholder analysis"""
comprehensive_analysis = {}
try:
# Get shareholder data
shareholder_data = await self.get_shareholder_data(company_code)
# Ownership analysis
ownership_analysis = await self.analyze_major_shareholders(shareholder_data)
comprehensive_analysis["ownership_analysis"] = ownership_analysis
# Governance analysis
governance_data = {
"board_composition": {
"total_directors": 11, "independent_directors": 5,
"female_directors": 2, "foreign_directors": 1
},
"committee_composition": {
"audit_committee": {"total": 3, "independent": 3},
"compensation_committee": {"total": 4, "independent": 3}
}
}
governance_analysis = await self.calculate_governance_metrics(governance_data)
comprehensive_analysis["governance_analysis"] = governance_analysis
# Dividend analysis
if "dividend_history" in shareholder_data:
dividend_analysis = await self.analyze_dividend_history(shareholder_data["dividend_history"])
comprehensive_analysis["dividend_analysis"] = dividend_analysis
# Concentration analysis
concentration_analysis = await self.analyze_shareholder_concentration(shareholder_data)
comprehensive_analysis["concentration_analysis"] = concentration_analysis
# Generate insights
insights = await self.generate_shareholder_insights(shareholder_data)
# Create summary
comprehensive_analysis["shareholder_summary"] = {
"key_findings": [
f"Top 5 shareholders control {ownership_analysis.get('top_5_concentration', 0):.1f}% of shares",
f"Governance score: {governance_analysis.get('overall_governance_score', 0):.1f}/100 ({governance_analysis.get('governance_grade', 'N/A')})",
f"Ownership concentration: {concentration_analysis.get('concentration_level', 'unknown')}"
],
"governance_score": governance_analysis.get("overall_governance_score", 0),
"ownership_quality": insights["ownership_stability"]["assessment"]
}
except Exception as e:
self.logger.error(f"Error in comprehensive shareholder analysis: {e}")
comprehensive_analysis["error"] = str(e)
return comprehensive_analysis
async def get_shareholder_data(self, company_code: str) -> Dict[str, Any]:
"""Get shareholder data for a company (mock implementation)"""
# Mock shareholder data for testing
if company_code == "005930": # Samsung Electronics
return {
"company_code": company_code,
"major_shareholders": [
{"name": "κ΅λ―Όμ°κΈκ³΅λ¨", "shares": 148_712_953, "percentage": 9.57, "type": "institutional"},
{"name": "μΌμ±λ¬Όμ° μ£Όμνμ¬", "shares": 133_284_628, "percentage": 8.58, "type": "corporate"},
{"name": "μ΄μ¬μ©", "shares": 18_352_174, "percentage": 1.18, "type": "individual"},
{"name": "νλΌν¬", "shares": 15_248_921, "percentage": 0.98, "type": "individual"},
{"name": "μ΄λΆμ§", "shares": 12_419_832, "percentage": 0.80, "type": "individual"}
],
"ownership_structure": {
"free_float": 82.45,
"foreign_ownership": 52.37,
"institutional_ownership": 31.28,
"individual_ownership": 16.35
},
"dividend_history": [
{"year": 2023, "dividend_per_share": 1640, "dividend_yield": 2.18, "payout_ratio": 23.5, "total_dividend": 9787803622000},
{"year": 2022, "dividend_per_share": 1444, "dividend_yield": 2.89, "payout_ratio": 28.7, "total_dividend": 8617977438800},
{"year": 2021, "dividend_per_share": 1378, "dividend_yield": 1.95, "payout_ratio": 22.1, "total_dividend": 8225556394900}
],
"total_shares": 5_969_782_550,
"treasury_shares": 64_726_919
}
else:
# Default mock data for other companies
return {
"company_code": company_code,
"major_shareholders": [
{"name": "μ£Όμμ£Όμ£Ό1", "shares": 50_000_000, "percentage": 10.0, "type": "institutional"},
{"name": "μ£Όμμ£Όμ£Ό2", "shares": 30_000_000, "percentage": 6.0, "type": "corporate"},
{"name": "μ£Όμμ£Όμ£Ό3", "shares": 25_000_000, "percentage": 5.0, "type": "individual"}
],
"ownership_structure": {
"free_float": 79.0,
"foreign_ownership": 45.0,
"institutional_ownership": 25.0,
"individual_ownership": 30.0
},
"dividend_history": [
{"year": 2023, "dividend_per_share": 1000, "dividend_yield": 2.0, "payout_ratio": 30.0},
{"year": 2022, "dividend_per_share": 950, "dividend_yield": 1.9, "payout_ratio": 28.5},
{"year": 2021, "dividend_per_share": 900, "dividend_yield": 1.8, "payout_ratio": 27.0}
],
"total_shares": 500_000_000
}