market_tools.pyβ’8.82 kB
"""μμ₯ κ°μ λꡬ"""
import json
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List
from src.tools.base import BaseTool, ToolSchema, TextContent
from src.exceptions import DatabaseConnectionError, DataValidationError
class MarketOverviewTool(BaseTool):
"""μμ₯ μ 체 κ°μ μ 보λ₯Ό μ 곡νλ λꡬ"""
def __init__(self, db_manager, cache_manager):
super().__init__(db_manager, cache_manager)
self.logger = logging.getLogger(__name__)
self.cache_ttl = 300 # 5λΆ
@property
def name(self) -> str:
return "get_market_overview"
@property
def description(self) -> str:
return "νκ΅ μ£Όμμμ₯μ μ 체μ μΈ νν©κ³Ό μ£Όμ μ§νλ₯Ό μ 곡ν©λλ€."
def get_tool_definition(self) -> ToolSchema:
"""λꡬ μ μ λ°ν"""
return ToolSchema(
name=self.name,
description=self.description,
inputSchema={
"type": "object",
"properties": {
"market": {
"type": "string",
"enum": ["KOSPI", "KOSDAQ", "ALL"],
"default": "ALL",
"description": "μ‘°νν μμ₯ (KOSPI, KOSDAQ, ALL)"
},
"include_details": {
"type": "boolean",
"default": False,
"description": "μμΈ μ 보 ν¬ν¨ μ¬λΆ (μμ₯ ν, μ κ³ κ°/μ μ κ°, κ±°λ μμ μ’
λͺ© λ±)"
}
},
"required": []
}
)
async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""μμ₯ κ°μ λ°μ΄ν° μ‘°ν λ° λ°ν"""
try:
# νλΌλ―Έν° μΆμΆ λ° κ²μ¦
market = arguments.get("market", "ALL")
include_details = arguments.get("include_details", False)
if market not in ["KOSPI", "KOSDAQ", "ALL"]:
raise ValueError(f"Invalid market: {market}")
# μΊμ νμΈ
cache_key = self._generate_cache_key(market, include_details)
cached_data = await self.cache_manager.get(cache_key)
if cached_data and self._is_data_fresh(cached_data):
self.logger.info(f"Cache hit for {cache_key}")
return [TextContent(text=json.dumps(cached_data, ensure_ascii=False))]
# λ°μ΄ν°λ² μ΄μ€μμ λ°μ΄ν° μ‘°ν
data = await self._fetch_market_data(market, include_details)
# μΊμ μ μ₯
await self.cache_manager.set(cache_key, data, ttl=self.cache_ttl)
self.logger.info(f"Market overview data fetched for {market}")
return [TextContent(text=json.dumps(data, ensure_ascii=False, indent=2))]
except Exception as e:
self.logger.error(f"Error in market overview tool: {e}")
raise
def _generate_cache_key(self, market: str, include_details: bool) -> str:
"""μΊμ ν€ μμ±"""
return f"market_overview:{market}:{include_details}"
def _is_data_fresh(self, data: Dict[str, Any]) -> bool:
"""λ°μ΄ν° μ μ λ νμΈ (5λΆ μ΄λ΄ λ°μ΄ν°μΈμ§)"""
if "timestamp" not in data:
return False
try:
timestamp = datetime.fromisoformat(data["timestamp"])
return datetime.now() - timestamp < timedelta(minutes=5)
except (ValueError, TypeError):
return False
async def _fetch_market_data(self, market: str, include_details: bool) -> Dict[str, Any]:
"""λ°μ΄ν°λ² μ΄μ€μμ μμ₯ λ°μ΄ν° μ‘°ν"""
try:
# λ©μΈ μ§μ λ°μ΄ν° μ‘°ν
if market == "ALL":
indices_data = await self.db_manager.fetch_all(
"""
SELECT index_code, current_value, change, change_rate,
volume, market_cap, timestamp
FROM market_indices
WHERE DATE(timestamp) = CURRENT_DATE
ORDER BY timestamp DESC
LIMIT 10
"""
)
else:
indices_data = await self.db_manager.fetch_all(
"""
SELECT index_code, current_value, change, change_rate,
volume, market_cap, timestamp
FROM market_indices
WHERE index_code = %s AND DATE(timestamp) = CURRENT_DATE
ORDER BY timestamp DESC
LIMIT 5
""",
market
)
# κ²°κ³Ό ꡬμ±
result = {
"timestamp": datetime.now().isoformat(),
"market_summary": self._calculate_market_summary(indices_data),
"indices": [self._format_index_data(idx) for idx in indices_data]
}
# λΉ λ°μ΄ν° μ²λ¦¬
if not indices_data:
result["message"] = "μμ²ν μμ₯μ λ°μ΄ν°κ° μμ΅λλ€"
result["indices"] = []
# μμΈ μ 보 ν¬ν¨
if include_details and indices_data:
details = await self._fetch_detailed_data(market)
result.update(details)
return result
except Exception as e:
self.logger.error(f"Database query failed: {e}")
if isinstance(e, DatabaseConnectionError):
raise
raise DatabaseConnectionError(f"Failed to fetch market data: {e}")
def _calculate_market_summary(self, indices_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""μμ₯ μμ½ μ 보 κ³μ°"""
if not indices_data:
return {"total_market_cap": 0, "total_volume": 0}
total_market_cap = sum(idx.get("market_cap", 0) for idx in indices_data)
total_volume = sum(idx.get("volume", 0) for idx in indices_data)
return {
"total_market_cap": total_market_cap,
"total_volume": total_volume,
"index_count": len(indices_data)
}
def _format_index_data(self, index_data: Dict[str, Any]) -> Dict[str, Any]:
"""μ§μ λ°μ΄ν° ν¬λ§·ν
"""
return {
"index_code": index_data.get("index_code"),
"current_value": float(index_data.get("current_value", 0)),
"change": float(index_data.get("change", 0)),
"change_rate": float(index_data.get("change_rate", 0)),
"volume": int(index_data.get("volume", 0)),
"market_cap": int(index_data.get("market_cap", 0)),
"timestamp": index_data.get("timestamp").isoformat() if index_data.get("timestamp") else None
}
async def _fetch_detailed_data(self, market: str) -> Dict[str, Any]:
"""μμΈ μ 보 μ‘°ν"""
details = {}
try:
# μμ₯ ν λ°μ΄ν° (μμΉ/νλ½ μ’
λͺ© μ)
breadth_data = await self.db_manager.fetch_one(
"""
SELECT advancing, declining, unchanged
FROM market_breadth
WHERE market = %s AND DATE(timestamp) = CURRENT_DATE
ORDER BY timestamp DESC
LIMIT 1
""",
market if market != "ALL" else "KOSPI"
)
if breadth_data:
details["market_breadth"] = {
"advancing": breadth_data.get("advancing", 0),
"declining": breadth_data.get("declining", 0),
"unchanged": breadth_data.get("unchanged", 0)
}
# μ κ³ κ°/μ μ κ° λ°μ΄ν°
highs_lows = await self.db_manager.fetch_one(
"""
SELECT new_highs, new_lows
FROM daily_highs_lows
WHERE DATE(date) = CURRENT_DATE
LIMIT 1
"""
)
if highs_lows:
details["new_highs_lows"] = {
"new_highs": highs_lows.get("new_highs", 0),
"new_lows": highs_lows.get("new_lows", 0)
}
except Exception as e:
self.logger.warning(f"Failed to fetch detailed data: {e}")
details["detailed_data_error"] = "μμΈ μ 보 μ‘°ν μ€ μ€λ₯ λ°μ"
return details