market_breadth_tools.pyβ’12.3 kB
"""μμ₯ ν μ§ν λꡬ"""
import json
import logging
import statistics
from datetime import datetime, timedelta
from typing import Any, Dict, List
from src.tools.base import BaseTool, ToolSchema, TextContent
from src.exceptions import DatabaseConnectionError, DataValidationError
class MarketBreadthTool(BaseTool):
"""μμ₯ ν μ§ν λΆμ λꡬ"""
def __init__(self, db_manager, cache_manager):
super().__init__(db_manager, cache_manager)
self.logger = logging.getLogger(__name__)
self.cache_ttl = 300 # 5λΆ
@property
def name(self) -> str:
return "get_market_breadth"
@property
def description(self) -> str:
return "μμ₯ ν μ§ν(μμΉ/νλ½ μ’
λͺ© λΉμ¨, A/D Line λ±)λ₯Ό λΆμνμ¬ μμ₯μ κ°λμ λ°©ν₯μ±μ μ 곡ν©λλ€."
def get_tool_definition(self) -> ToolSchema:
"""λꡬ μ μ λ°ν"""
return ToolSchema(
name=self.name,
description=self.description,
inputSchema={
"type": "object",
"properties": {
"market": {
"type": "string",
"enum": ["KOSPI", "KOSDAQ", "ALL"],
"default": "KOSPI",
"description": "μ‘°νν μμ₯"
},
"period": {
"type": "string",
"enum": ["1d", "1w", "1m", "3m"],
"default": "1d",
"description": "μ‘°ν κΈ°κ° (1d: 1μΌ, 1w: 1μ£Ό, 1m: 1κ°μ, 3m: 3κ°μ)"
},
"include_volume_analysis": {
"type": "boolean",
"default": False,
"description": "κ±°λλ λΆμ ν¬ν¨ μ¬λΆ"
}
},
"required": []
}
)
async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""μμ₯ ν μ§ν λ°μ΄ν° μ‘°ν λ° λΆμ"""
try:
# νλΌλ―Έν° μΆμΆ λ° κ²μ¦
market = arguments.get("market", "KOSPI")
period = arguments.get("period", "1d")
include_volume_analysis = arguments.get("include_volume_analysis", False)
self._validate_parameters(market, period)
# μΊμ νμΈ
cache_key = self._generate_cache_key(market, period, include_volume_analysis)
cached_data = await self.cache_manager.get(cache_key)
if cached_data and self._is_data_fresh(cached_data):
self.logger.info(f"Cache hit for {cache_key}")
return [TextContent(text=json.dumps(cached_data, ensure_ascii=False))]
# λ°μ΄ν°λ² μ΄μ€μμ λ°μ΄ν° μ‘°ν
data = await self._fetch_breadth_data(market, period, include_volume_analysis)
# μΊμ μ μ₯
await self.cache_manager.set(cache_key, data, ttl=self.cache_ttl)
self.logger.info(f"Market breadth data fetched for {market}/{period}")
return [TextContent(text=json.dumps(data, ensure_ascii=False, indent=2))]
except Exception as e:
self.logger.error(f"Error in market breadth tool: {e}")
raise
def _validate_parameters(self, market: str, period: str):
"""νλΌλ―Έν° κ²μ¦"""
valid_markets = ["KOSPI", "KOSDAQ", "ALL"]
valid_periods = ["1d", "1w", "1m", "3m"]
if market not in valid_markets:
raise ValueError(f"Invalid market: {market}")
if period not in valid_periods:
raise ValueError(f"Invalid period: {period}")
def _generate_cache_key(self, market: str, period: str, include_volume: bool) -> str:
"""μΊμ ν€ μμ±"""
return f"market_breadth:{market}:{period}:{include_volume}"
def _is_data_fresh(self, data: Dict[str, Any]) -> bool:
"""λ°μ΄ν° μ μ λ νμΈ"""
if "timestamp" not in data:
return False
try:
timestamp = datetime.fromisoformat(data["timestamp"])
return datetime.now() - timestamp < timedelta(minutes=5)
except (ValueError, TypeError):
return False
async def _fetch_breadth_data(self, market: str, period: str, include_volume: bool) -> Dict[str, Any]:
"""λ°μ΄ν°λ² μ΄μ€μμ μμ₯ ν μ§ν λ°μ΄ν° μ‘°ν"""
try:
# κΈ°κ° μ€μ
days = self._get_period_days(period)
# 쿼리 ꡬμ±
query = """
SELECT date, market, advancing, declining, unchanged, total_issues,
advance_decline_ratio, advance_volume, decline_volume, timestamp
FROM market_breadth
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
"""
params = [days]
# μμ₯ νν°
if market != "ALL":
query += " AND market = %s"
params.append(market)
query += " ORDER BY date DESC, market"
# λ°μ΄ν° μ‘°ν
breadth_data = await self.db_manager.fetch_all(query, *params)
# κ²°κ³Ό ꡬμ±
result = {
"timestamp": datetime.now().isoformat(),
"period": period,
"market": market,
"breadth_data": [self._format_breadth_data(data) for data in breadth_data],
"summary": self._calculate_summary(breadth_data)
}
# λΉ λ°μ΄ν° μ²λ¦¬
if not breadth_data:
result["message"] = "μμ²ν κΈ°κ°μ μμ₯ ν λ°μ΄ν°κ° μμ΅λλ€"
result["breadth_data"] = []
# νΈλ λ λΆμ (μ£Όκ°/μκ° κΈ°κ°μ κ²½μ°)
if period in ["1w", "1m", "3m"] and breadth_data:
result["trend_analysis"] = self._analyze_trend(breadth_data)
# κ±°λλ λΆμ ν¬ν¨
if include_volume and breadth_data:
result["volume_analysis"] = self._analyze_volume(breadth_data)
return result
except Exception as e:
self.logger.error(f"Database query failed: {e}")
if isinstance(e, DatabaseConnectionError):
raise
raise DatabaseConnectionError(f"Failed to fetch market breadth data: {e}")
def _get_period_days(self, period: str) -> int:
"""κΈ°κ°μ μΌμλ‘ λ³ν"""
period_map = {
"1d": 1,
"1w": 7,
"1m": 30,
"3m": 90
}
return period_map.get(period, 1)
def _format_breadth_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""μμ₯ ν λ°μ΄ν° ν¬λ§·ν
"""
advancing = data.get("advancing", 0)
declining = data.get("declining", 0)
return {
"date": data.get("date").isoformat() if data.get("date") else None,
"market": data.get("market"),
"advancing": int(advancing),
"declining": int(declining),
"unchanged": int(data.get("unchanged", 0)),
"total_issues": int(data.get("total_issues", 0)),
"advance_decline_ratio": float(data.get("advance_decline_ratio", 0)),
"advance_volume": int(data.get("advance_volume", 0)),
"decline_volume": int(data.get("decline_volume", 0)),
"timestamp": data.get("timestamp").isoformat() if data.get("timestamp") else None
}
def _calculate_ad_ratio(self, advancing: int, declining: int) -> float:
"""μμΉνλ½λΉμ¨ κ³μ°"""
if declining == 0:
return float('inf') if advancing > 0 else 0.0
return advancing / declining
def _interpret_market_sentiment(self, ratio: float) -> str:
"""μμ₯ μ¬λ¦¬ ν΄μ"""
if ratio >= 2.0:
return "κ°ν μμΉμΈ"
elif ratio >= 1.2:
return "μμΉμΈ"
elif ratio >= 0.8:
return "보ν©"
elif ratio >= 0.5:
return "νλ½μΈ"
else:
return "κ°ν νλ½μΈ"
def _calculate_summary(self, breadth_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""μμ₯ ν μμ½ κ³μ°"""
if not breadth_data:
return {}
# νκ· μμΉνλ½λΉμ¨ κ³μ°
ratios = [data.get("advance_decline_ratio", 0) for data in breadth_data if data.get("advance_decline_ratio") is not None]
avg_ratio = statistics.mean(ratios) if ratios else 0.0
# μμ₯ μ¬λ¦¬ νλ¨
market_sentiment = self._interpret_market_sentiment(avg_ratio)
# μ΅μ λ°μ΄ν° κΈ°μ€ μΆκ° μ 보
latest = breadth_data[0] if breadth_data else {}
return {
"avg_advance_decline_ratio": round(avg_ratio, 2),
"market_sentiment": market_sentiment,
"latest_advancing": latest.get("advancing", 0),
"latest_declining": latest.get("declining", 0),
"latest_unchanged": latest.get("unchanged", 0),
"data_points": len(breadth_data)
}
def _analyze_trend(self, breadth_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""νΈλ λ λΆμ"""
if len(breadth_data) < 2:
return {"error": "νΈλ λ λΆμμ μν μΆ©λΆν λ°μ΄ν°κ° μμ΅λλ€"}
# μκ°μ μ λ ¬ (μ€λλ κ²λΆν°)
sorted_data = sorted(breadth_data, key=lambda x: x.get("date", datetime.min))
# μμΉνλ½λΉμ¨μ λ³ν κ³μ°
ratios = [data.get("advance_decline_ratio", 0) for data in sorted_data]
# μ ν νκ·λ₯Ό ν΅ν νΈλ λ λ°©ν₯ κ³μ° (λ¨μν)
if len(ratios) >= 2:
first_half_avg = statistics.mean(ratios[:len(ratios)//2])
second_half_avg = statistics.mean(ratios[len(ratios)//2:])
trend_change = second_half_avg - first_half_avg
if trend_change > 0.1:
direction = "μμΉ"
strength = "κ°ν¨" if trend_change > 0.2 else "μ½ν¨"
elif trend_change < -0.1:
direction = "νλ½"
strength = "κ°ν¨" if trend_change < -0.2 else "μ½ν¨"
else:
direction = "보ν©"
strength = "보ν΅"
else:
direction = "λΆλͺ
"
strength = "λΆλͺ
"
return {
"direction": direction,
"strength": strength,
"trend_change": round(trend_change if 'trend_change' in locals() else 0.0, 3),
"days_analyzed": len(sorted_data)
}
def _analyze_volume(self, breadth_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""κ±°λλ λΆμ"""
if not breadth_data:
return {}
# μ΅μ λ°μ΄ν°
latest = breadth_data[0]
advance_volume = latest.get("advance_volume", 0)
decline_volume = latest.get("decline_volume", 0)
# κ±°λλ λΉμ¨ κ³μ°
total_volume = advance_volume + decline_volume
volume_ratio = advance_volume / decline_volume if decline_volume > 0 else float('inf')
# κ±°λλ νΈλ λ νλ¨
if volume_ratio >= 1.5:
volume_trend = "μμΉ κ±°λλ μ°μΈ"
elif volume_ratio >= 0.67:
volume_trend = "κ· ν"
else:
volume_trend = "νλ½ κ±°λλ μ°μΈ"
return {
"advance_volume": advance_volume,
"decline_volume": decline_volume,
"total_volume": total_volume,
"volume_ratio": round(volume_ratio, 2) if volume_ratio != float('inf') else "무νλ",
"volume_trend": volume_trend,
"advance_volume_pct": round((advance_volume / total_volume * 100), 1) if total_volume > 0 else 0.0
}