"""
KRX Data Client - ํตํฉ ์ฃผ์ ๋ฐ์ดํฐ ํด๋ผ์ด์ธํธ
pykrx์ ๋์ผํ ์ธํฐํ์ด์ค๋ก KRX Data Marketplace์์ ๋ฐ์ดํฐ๋ฅผ ์กฐํํฉ๋๋ค.
์นด์นด์ค ๋ก๊ทธ์ธ์ด ํ์ํ๋ฉฐ, 2์ฐจ์ธ์ฆ์ ๋นํ์ฑํ๋์ด ์์ด์ผ ํฉ๋๋ค.
์ํคํ
์ฒ:
- KakaoAuthManager: ์นด์นด์ค ๋ก๊ทธ์ธ ๊ด๋ฆฌ, ์ธ์
์ ์ง, ์๋ ์ฌ๋ก๊ทธ์ธ
- KRXDataClient: ์ค์ ๋ฐ์ดํฐ ์กฐํ API
ํ๊ฒฝ๋ณ์:
KAKAO_ID: ์นด์นด์ค ์์ด๋
KAKAO_PW: ์นด์นด์ค ๋น๋ฐ๋ฒํธ
์ฌ์ฉ๋ฒ:
from krx_data_client import KRXDataClient
client = KRXDataClient()
# pykrx์ ๋์ผํ ์ธํฐํ์ด์ค
df = client.get_market_ohlcv("20240101", "20240131", "005930")
df = client.get_market_fundamental("20240101", "20240131", "005930")
df = client.get_market_trading_volume("20240101", "20240131", "005930")
"""
import os
import json
import logging
import asyncio
import functools
import time
import fcntl
from datetime import datetime, timedelta, date
from pathlib import Path
from typing import Dict, Any, Optional, List, Callable, TypeVar
from dataclasses import dataclass
from holidays.countries import KR
# MCP ์๋ฒ ๋ฑ ์ด๋ฏธ ์ด๋ฒคํธ ๋ฃจํ๊ฐ ์คํ ์ค์ธ ํ๊ฒฝ์์ ์ค์ฒฉ ์คํ ํ์ฉ
import nest_asyncio
nest_asyncio.apply()
import requests
import pandas as pd
from pandas import DataFrame
logger = logging.getLogger(__name__)
# ํ์
ํํธ์ฉ
T = TypeVar('T')
class KRXAuthError(Exception):
"""์ธ์ฆ ๊ด๋ จ ์๋ฌ"""
pass
class KRX2FARequiredError(KRXAuthError):
"""2์ฐจ์ธ์ฆ์ด ํ์ฑํ๋์ด ์์"""
def __init__(self):
super().__init__(
"์นด์นด์ค 2์ฐจ์ธ์ฆ์ด ํ์ฑํ๋์ด ์์ต๋๋ค.\n"
"2์ฐจ์ธ์ฆ์ ๋นํ์ฑํํ์ธ์:\n"
" - ์นด์นด์คํก > ์ค์ > ์นด์นด์ค๊ณ์ > 2๋จ๊ณ ์ธ์ฆ > ํด์ \n"
" - ๋๋ https://accounts.kakao.com > ๊ณ์ ๋ณด์ > 2๋จ๊ณ ์ธ์ฆ > ํด์ "
)
class KRXSessionExpiredError(KRXAuthError):
"""์ธ์
๋ง๋ฃ"""
pass
class KRXDataError(Exception):
"""๋ฐ์ดํฐ ์กฐํ ์๋ฌ"""
pass
@dataclass
class SessionInfo:
"""์ธ์
์ ๋ณด"""
cookies: Dict[str, str]
last_login: datetime
expires_at: Optional[datetime] = None
def retry_on_session_expired(max_retries: int = 3, delay: float = 1.0):
"""์ธ์
๋ง๋ฃ ์ ์ฌ์๋ํ๋ ๋ฐ์ฝ๋ ์ดํฐ"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@functools.wraps(func)
def wrapper(self, *args, **kwargs) -> T:
last_exception = None
for attempt in range(max_retries):
try:
return func(self, *args, **kwargs)
except KRXSessionExpiredError as e:
last_exception = e
logger.warning(f"์ธ์
๋ง๋ฃ (์๋ {attempt + 1}/{max_retries}), ์ฌ๋ก๊ทธ์ธ...")
time.sleep(delay * (attempt + 1)) # exponential backoff
try:
# ์ธ์
ํ์ผ ์ญ์ ํ ์ฌ๋ก๊ทธ์ธ
self._auth_manager._cleanup_session_files()
self._auth_manager.login(force=True)
except Exception as login_error:
logger.error(f"์ฌ๋ก๊ทธ์ธ ์คํจ: {login_error}")
if attempt == max_retries - 1:
raise
except Exception as e:
# ๋ค๋ฅธ ์๋ฌ๋ ์ฌ์๋ํ์ง ์์
raise
raise last_exception
return wrapper
return decorator
class KakaoAuthManager:
"""
์นด์นด์ค ๋ก๊ทธ์ธ ๊ด๋ฆฌ์
- ์นด์นด์ค ๋ก๊ทธ์ธ/๋ก๊ทธ์์
- 2์ฐจ์ธ์ฆ ์ํ ์ฒดํฌ ๋ฐ ์๋ฌ ๋ฐ์
- ์ธ์
์ฟ ํค ์ ์ฅ/๋ก๋
- ์ธ์
๋ง๋ฃ ์ฒดํฌ ๋ฐ ์๋ ๊ฐฑ์
- ํ์ผ ๋ฝ์ ํตํ ๋์ ๋ก๊ทธ์ธ ๋ฐฉ์ง
"""
COOKIE_PATH = Path.home() / ".krx_session.json"
LEGACY_COOKIE_PATH = Path.home() / ".krx_cookies.json" # ๊ธฐ์กด ์ฟ ํค ํ์ผ
LOCK_PATH = Path.home() / ".krx_session.lock" # ๋ก๊ทธ์ธ ๋ฝ ํ์ผ
SESSION_TIMEOUT = timedelta(hours=4) # ์ธ์
ํ์์์ (๋ณด์์ ์ค์ )
SESSION_REFRESH_THRESHOLD = timedelta(hours=3) # ์ด ์๊ฐ ์ดํ๋ฉด ์ ์ ์ ๊ฐฑ์
VALIDATION_SKIP_THRESHOLD = timedelta(minutes=5) # ์ด ์๊ฐ ๋ด ๊ฒ์ฆ๋์ผ๋ฉด ์ฌ๊ฒ์ฆ ์๋ต
# Playwright ํ์์์ ์ค์ (์ด์ ์์ ์ฑ์ ์ํด ์ถฉ๋ถํ ๊ธธ๊ฒ)
PAGE_LOAD_TIMEOUT = 60000 # 60์ด
LOGIN_WAIT_TIMEOUT = 30000 # 30์ด
MAX_LOGIN_RETRIES = 3 # ๋ก๊ทธ์ธ ์ฌ์๋ ํ์
LOCK_WAIT_TIMEOUT = 120 # ๋ฝ ๋๊ธฐ ํ์์์ (์ด) - ๋ก๊ทธ์ธ์ 2๋ถ ์ด์ ๊ฑธ๋ฆด ์ ์์
def __init__(
self,
kakao_id: Optional[str] = None,
kakao_pw: Optional[str] = None,
headless: bool = True,
):
self.kakao_id = kakao_id or os.environ.get("KAKAO_ID")
self.kakao_pw = kakao_pw or os.environ.get("KAKAO_PW")
self.headless = headless
self._session: Optional[requests.Session] = None
self._session_info: Optional[SessionInfo] = None
self._last_validated: Optional[datetime] = None # ๋ง์ง๋ง ์ธ์
๊ฒ์ฆ ์๊ฐ (ํ์ผ์์ ๋ก๋)
self._browser = None
self._playwright = None
if not self.kakao_id or not self.kakao_pw:
raise KRXAuthError(
"์นด์นด์ค ๋ก๊ทธ์ธ ์ ๋ณด๊ฐ ํ์ํฉ๋๋ค.\n"
"KAKAO_ID, KAKAO_PW ํ๊ฒฝ๋ณ์๋ฅผ ์ค์ ํ์ธ์."
)
@property
def is_logged_in(self) -> bool:
"""๋ก๊ทธ์ธ ์ํ ํ์ธ"""
if not self._session_info:
return False
# ์ธ์
ํ์์์ ์ฒดํฌ
if datetime.now() - self._session_info.last_login > self.SESSION_TIMEOUT:
logger.info("์ธ์
ํ์์์")
return False
return True
@property
def session(self) -> requests.Session:
"""requests ์ธ์
๋ฐํ"""
if not self._session:
self._session = requests.Session()
self._session.headers.update({
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
"Origin": "http://data.krx.co.kr",
"Referer": "http://data.krx.co.kr/",
})
return self._session
def _load_session(self) -> bool:
"""์ ์ฅ๋ ์ธ์
๋ก๋"""
# ์ ํ์ ํ์ผ ์๋
if self.COOKIE_PATH.exists():
try:
# ํ์ผ ์์ ์๊ฐ๋ ํ์ธ (race condition ๋์)
file_mtime = datetime.fromtimestamp(self.COOKIE_PATH.stat().st_mtime)
file_is_fresh = datetime.now() - file_mtime < self.VALIDATION_SKIP_THRESHOLD
data = json.loads(self.COOKIE_PATH.read_text())
cookies = data.get("cookies", {})
krx_cookies = data.get("krx_cookies", []) # domain ํฌํจ๋ ์ฟ ํค ์ ๋ณด
last_login_str = data.get("last_login")
last_validated_str = data.get("last_validated") # ๋ง์ง๋ง ๊ฒ์ฆ ์๊ฐ
if cookies and last_login_str:
last_login = datetime.fromisoformat(last_login_str)
# ํ์์์ ์ฒดํฌ
if datetime.now() - last_login <= self.SESSION_TIMEOUT:
# ์ธ์
์ ์ฟ ํค ์ ์ฉ
if krx_cookies:
# ์ ํ์: domain, path ํฌํจ๋ ์ฟ ํค ์ฌ์ฉ
for cookie in krx_cookies:
self.session.cookies.set(
cookie["name"],
cookie["value"],
domain=cookie.get("domain", "data.krx.co.kr"),
path=cookie.get("path", "/")
)
logger.debug(f"KRX ์ฟ ํค {len(krx_cookies)}๊ฐ ๋ก๋๋จ")
else:
# ๊ธฐ์กด ํ์: domain ํ๋์ฝ๋ฉ
for name, value in cookies.items():
self.session.cookies.set(
name, value,
domain="data.krx.co.kr",
path="/"
)
self._session_info = SessionInfo(
cookies=cookies,
last_login=last_login
)
# ๋ง์ง๋ง ๊ฒ์ฆ ์๊ฐ ๋ก๋ (ํ์ผ ์์ ์๊ฐ์ผ๋ก ๋์ฒด ๊ฐ๋ฅ)
if last_validated_str:
self._last_validated = datetime.fromisoformat(last_validated_str)
elif file_is_fresh:
# last_validated ์์ด๋ ํ์ผ์ด ์ต๊ทผ ์์ ๋์ผ๋ฉด ์ ๋ขฐ
self._last_validated = file_mtime
logger.info(f"ํ์ผ ์์ ์๊ฐ์ผ๋ก ์ธ์
์ ๋ขฐ: {file_mtime}")
logger.info("์ ์ฅ๋ ์ธ์
์ ๋ก๋ํ์ต๋๋ค.")
return True
else:
logger.info("์ ์ฅ๋ ์ธ์
์ด ๋ง๋ฃ๋์์ต๋๋ค.")
except Exception as e:
logger.warning(f"์ธ์
๋ก๋ ์คํจ: {e}")
# ๊ธฐ์กด ํ์ ํ์ผ (krx_crawler_client ํธํ)
if self.LEGACY_COOKIE_PATH.exists():
try:
cookies_list = json.loads(self.LEGACY_COOKIE_PATH.read_text())
if isinstance(cookies_list, list) and cookies_list:
cookies = {c["name"]: c["value"] for c in cookies_list}
# ์ธ์
์ ์ฟ ํค ์ ์ฉ (domain ํ์!)
for name, value in cookies.items():
self.session.cookies.set(
name, value,
domain="data.krx.co.kr",
path="/"
)
self._session_info = SessionInfo(
cookies=cookies,
last_login=datetime.now() - timedelta(hours=1) # 1์๊ฐ ์ ์ผ๋ก ์ค์
)
logger.info("๊ธฐ์กด ์ฟ ํค ํ์ผ์ ๋ก๋ํ์ต๋๋ค.")
return True
except Exception as e:
logger.warning(f"๊ธฐ์กด ์ฟ ํค ๋ก๋ ์คํจ: {e}")
return False
def _save_session(self, cookies: Dict[str, str], update_validated: bool = True, krx_cookies: List[Dict] = None):
"""์ธ์
์ ์ฅ"""
try:
now = datetime.now()
data = {
"cookies": cookies,
"krx_cookies": krx_cookies or [], # domain, path ํฌํจ๋ ์ ์ฒด ์ฟ ํค ์ ๋ณด
"last_login": now.isoformat(),
"last_validated": now.isoformat() if update_validated else None
}
self.COOKIE_PATH.write_text(json.dumps(data, indent=2))
if update_validated:
self._last_validated = now
logger.info("์ธ์
์ ์ ์ฅํ์ต๋๋ค.")
except Exception as e:
logger.warning(f"์ธ์
์ ์ฅ ์คํจ: {e}")
def _update_last_validated(self):
"""์ธ์
ํ์ผ์ last_validated๋ง ์
๋ฐ์ดํธ (๊ฒ์ฆ ์ฑ๊ณต ์ ํธ์ถ)"""
try:
if not self.COOKIE_PATH.exists():
return
data = json.loads(self.COOKIE_PATH.read_text())
now = datetime.now()
data["last_validated"] = now.isoformat()
self.COOKIE_PATH.write_text(json.dumps(data, indent=2))
self._last_validated = now
logger.debug("์ธ์
๊ฒ์ฆ ์๊ฐ ์
๋ฐ์ดํธ")
except Exception as e:
logger.warning(f"์ธ์
๊ฒ์ฆ ์๊ฐ ์
๋ฐ์ดํธ ์คํจ: {e}")
def _cleanup_session_files(self):
"""์ธ์
ํ์ผ ์ญ์ (์์/๋ง๋ฃ ์ ํธ์ถ)"""
for path in [self.COOKIE_PATH, self.LEGACY_COOKIE_PATH]:
try:
if path.exists():
path.unlink()
logger.info(f"์ธ์
ํ์ผ ์ญ์ : {path}")
except Exception as e:
logger.warning(f"์ธ์
ํ์ผ ์ญ์ ์คํจ: {path}, {e}")
# ์ธ์
์ ๋ณด ์ด๊ธฐํ
self._session_info = None
if self._session:
self._session.cookies.clear()
def _get_recent_business_day(self) -> str:
"""๊ฐ์ฅ ์ต๊ทผ ์์
์ผ ๋ฐํ (์ธ์
๊ฒ์ฆ์ฉ)"""
kr_holidays = KR()
dt = date.today()
# ์ฅ ์์ ์ (09:00 ์ด์ )์ด๋ฉด ์ ์ผ๋ถํฐ ํ์
if datetime.now().hour < 9:
dt -= timedelta(days=1)
# ์ต๋ 10์ผ ์ ๊น์ง ํ์
for _ in range(10):
# ์ฃผ๋ง ์ฒดํฌ
if dt.weekday() >= 5:
dt -= timedelta(days=1)
continue
# ๊ณตํด์ผ ์ฒดํฌ
if dt in kr_holidays:
dt -= timedelta(days=1)
continue
# ์ฐ๋ง(12/31), ๋
ธ๋์ (5/1) ์ฒดํฌ
if (dt.month == 12 and dt.day == 31) or (dt.month == 5 and dt.day == 1):
dt -= timedelta(days=1)
continue
return dt.strftime("%Y%m%d")
dt -= timedelta(days=1)
return dt.strftime("%Y%m%d")
def _validate_session(self) -> bool:
"""์ธ์
์ ํจ์ฑ ๊ฒ์ฆ (์ค์ API ํธ์ถ๋ก ์ฒดํฌ)"""
try:
# ๊ฐ์ฅ ์ต๊ทผ ์์
์ผ ๊ณ์ฐ (์ฅ ์์ ์ /ํด์ผ์๋ ๋์)
check_date = self._get_recent_business_day()
# ๊ฐ๋จํ API ํธ์ถ๋ก ์ธ์
์ ํจ์ฑ ์ฒดํฌ
resp = self.session.post(
"http://data.krx.co.kr/comm/bldAttendant/getJsonData.cmd",
data={
"bld": "dbms/MDC/STAT/standard/MDCSTAT03501",
"mktId": "STK",
"trdDd": check_date,
},
timeout=10
)
# ์๋ต์ด ๋น์ด์๊ฑฐ๋ HTML์ธ ๊ฒฝ์ฐ (๋ก๊ทธ์ธ ํ์)
content_type = resp.headers.get("Content-Type", "")
if "text/html" in content_type:
logger.info("HTML ์๋ต - ๋ก๊ทธ์ธ ํ์")
return False
if not resp.text.strip():
logger.info("๋น ์๋ต - ๋ก๊ทธ์ธ ํ์")
return False
try:
data = resp.json()
except:
logger.info("JSON ํ์ฑ ์คํจ - ๋ก๊ทธ์ธ ํ์")
return False
# ๋ก๊ทธ์์ ์ํ ์ฒดํฌ
if isinstance(data, dict) and data.get("RESULT") == "LOGOUT":
return False
# ๋ฐ์ดํฐ๊ฐ ์์ผ๋ฉด ์ฑ๊ณต
if "output" in data or "OutBlock_1" in data:
return True
# ๋น ๋ฐ์ดํฐ๋ ์ฑ๊ณต์ผ๋ก ์ฒ๋ฆฌ (ํด์ผ ๋ฑ)
if isinstance(data, dict):
return True
return False
except Exception as e:
logger.warning(f"์ธ์
๊ฒ์ฆ ์คํจ: {e}")
return False
def _acquire_lock(self, lock_file, timeout: float) -> bool:
"""
ํ์ผ ๋ฝ ํ๋ ์๋ (ํ์์์ ํฌํจ)
Args:
lock_file: ๋ฝ ํ์ผ ํธ๋ค
timeout: ํ์์์ (์ด)
Returns:
๋ฝ ํ๋ ์ฑ๊ณต ์ฌ๋ถ
"""
start_time = time.time()
while True:
try:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
return True
except BlockingIOError:
# ๋ค๋ฅธ ํ๋ก์ธ์ค๊ฐ ๋ฝ์ ๋ณด์ ์ค
elapsed = time.time() - start_time
if elapsed >= timeout:
logger.warning(f"๋ฝ ํ๋ ํ์์์ ({timeout}์ด)")
return False
logger.debug(f"๋ฝ ๋๊ธฐ ์ค... ({elapsed:.1f}์ด)")
time.sleep(1) # 1์ด ๊ฐ๊ฒฉ์ผ๋ก ์ฌ์๋
except Exception as e:
logger.warning(f"๋ฝ ํ๋ ์คํจ: {e}")
return False
def login(self, force: bool = False) -> bool:
"""
์นด์นด์ค ๋ก๊ทธ์ธ (์๋ ์ฌ์๋ ํฌํจ, ํ์ผ ๋ฝ์ผ๋ก ๋์ ๋ก๊ทธ์ธ ๋ฐฉ์ง)
Args:
force: True๋ฉด ๊ธฐ์กด ์ธ์
๋ฌด์ํ๊ณ ์ฌ๋ก๊ทธ์ธ
Returns:
๋ก๊ทธ์ธ ์ฑ๊ณต ์ฌ๋ถ
Raises:
KRX2FARequiredError: 2์ฐจ์ธ์ฆ์ด ํ์ฑํ๋ ๊ฒฝ์ฐ
"""
# ๊ธฐ์กด ์ธ์
์ฒดํฌ (๋ฝ ์์ด)
if not force and self._load_session():
# ์ต๊ทผ ๊ฒ์ฆ๋์ผ๋ฉด ์ฌ๊ฒ์ฆ ์๋ต (๋ค๋ฅธ ํ๋ก์ธ์ค์์ ๊ฒ์ฆํ ๊ฒฝ์ฐ ํฌํจ)
now = datetime.now()
if self._last_validated:
elapsed = now - self._last_validated
logger.debug(f"์ธ์
๊ฒ์ฆ ์๊ฐ ์ฒดํฌ: last_validated={self._last_validated}, elapsed={elapsed}, threshold={self.VALIDATION_SKIP_THRESHOLD}")
if elapsed < self.VALIDATION_SKIP_THRESHOLD:
logger.info(f"์ต๊ทผ ๊ฒ์ฆ๋ ์ธ์
์ฌ์ฉ (๊ฒ์ฆ ์๊ฐ: {self._last_validated})")
return True
else:
logger.debug("last_validated๊ฐ ์์ - ๊ฒ์ฆ ํ์")
if self._validate_session():
self._update_last_validated() # ๊ฒ์ฆ ์ฑ๊ณต ์ ํ์ผ์ ๊ธฐ๋ก
logger.info("๊ธฐ์กด ์ธ์
์ด ์ ํจํฉ๋๋ค.")
return True
logger.info("๊ธฐ์กด ์ธ์
์ด ๋ง๋ฃ๋์ด ์ฌ๋ก๊ทธ์ธํฉ๋๋ค.")
# ์ฌ๋ก๊ทธ์ธ์ด ํ์ํ ๊ฒฝ์ฐ, ํ์ผ ๋ฝ์ ํ๋ํ์ฌ ๋์ ๋ก๊ทธ์ธ ๋ฐฉ์ง
return self._login_with_lock(force)
def _login_with_lock(self, force: bool = False) -> bool:
"""ํ์ผ ๋ฝ์ ์ฌ์ฉํ ๋ก๊ทธ์ธ (๋์ ๋ก๊ทธ์ธ ๋ฐฉ์ง)"""
# ๋ฝ ํ์ผ ์์ฑ/์ด๊ธฐ
self.LOCK_PATH.touch(exist_ok=True)
with open(self.LOCK_PATH, 'w') as lock_file:
logger.debug("๋ก๊ทธ์ธ ๋ฝ ํ๋ ์๋...")
if not self._acquire_lock(lock_file, self.LOCK_WAIT_TIMEOUT):
# ๋ฝ ํ๋ ์คํจ - ํ์์์
raise KRXAuthError("๋ก๊ทธ์ธ ๋ฝ ํ๋ ํ์์์ - ๋ค๋ฅธ ํ๋ก์ธ์ค๊ฐ ๋ก๊ทธ์ธ ์ค์ผ ์ ์์")
logger.debug("๋ก๊ทธ์ธ ๋ฝ ํ๋ ์ฑ๊ณต")
try:
# ๋ฝ ํ๋ ํ ๋ค์ ์ธ์
์ฒดํฌ (๋ค๋ฅธ ํ๋ก์ธ์ค๊ฐ ๋ก๊ทธ์ธํ์ ์ ์์)
if not force and self._load_session():
now = datetime.now()
if self._last_validated:
elapsed = now - self._last_validated
if elapsed < self.VALIDATION_SKIP_THRESHOLD:
logger.info(f"๋ค๋ฅธ ํ๋ก์ธ์ค๊ฐ ๋ก๊ทธ์ธ ์๋ฃ - ์ธ์
์ฌ์ฌ์ฉ (๊ฒ์ฆ ์๊ฐ: {self._last_validated})")
return True
if self._validate_session():
self._update_last_validated()
logger.info("๋ค๋ฅธ ํ๋ก์ธ์ค๊ฐ ๋ก๊ทธ์ธ ์๋ฃ - ์ธ์
์ ํจ")
return True
# ์ธ์
ํ์ผ ์ ๋ฆฌ ํ ๋ก๊ทธ์ธ
self._cleanup_session_files()
# Playwright๋ก ๋ก๊ทธ์ธ (์ฌ์๋ ๋ก์ง ํฌํจ)
last_error = None
for attempt in range(self.MAX_LOGIN_RETRIES):
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(self._login_async())
if result:
return True
finally:
loop.close()
except KRX2FARequiredError:
# 2FA ์๋ฌ๋ ์ฌ์๋ํด๋ ์๋ฏธ ์์
raise
except Exception as e:
last_error = e
logger.warning(f"๋ก๊ทธ์ธ ์๋ {attempt + 1}/{self.MAX_LOGIN_RETRIES} ์คํจ: {e}")
if attempt < self.MAX_LOGIN_RETRIES - 1:
wait_time = (attempt + 1) * 5 # 5์ด, 10์ด, 15์ด...
logger.info(f"{wait_time}์ด ํ ์ฌ์๋...")
time.sleep(wait_time)
self._cleanup_session_files()
raise KRXAuthError(f"๋ก๊ทธ์ธ ์คํจ (์ต๋ ์ฌ์๋ ํ์ ์ด๊ณผ): {last_error}")
finally:
# ๋ฝ ํด์ (with ๋ฌธ ์ข
๋ฃ ์ ์๋ ํด์ ๋์ง๋ง ๋ช
์์ ์ผ๋ก)
try:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
logger.debug("๋ก๊ทธ์ธ ๋ฝ ํด์ ")
except:
pass
async def _login_async(self) -> bool:
"""๋น๋๊ธฐ ๋ก๊ทธ์ธ ์ฒ๋ฆฌ"""
try:
from playwright.async_api import async_playwright
except ImportError:
raise KRXAuthError(
"playwright๊ฐ ์ค์น๋์ง ์์์ต๋๋ค.\n"
"'pip install playwright && playwright install chromium'์ ์คํํ์ธ์."
)
self._playwright = await async_playwright().start()
self._browser = await self._playwright.chromium.launch(headless=self.headless)
context = await self._browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36",
viewport={"width": 1920, "height": 1080},
locale="ko-KR",
)
page = await context.new_page()
try:
# KRX ๋ก๊ทธ์ธ ํ์ด์ง
login_url = "https://data.krx.co.kr/contents/MDC/COMS/client/MDCCOMS001.cmd"
await page.goto(login_url, wait_until="networkidle", timeout=self.PAGE_LOAD_TIMEOUT)
await asyncio.sleep(2)
# iframe์์ ์นด์นด์ค ๋ก๊ทธ์ธ ๋ฒํผ ํด๋ฆญ
iframe = await page.query_selector('iframe')
if not iframe:
raise KRXAuthError("๋ก๊ทธ์ธ iframe์ ์ฐพ์ ์ ์์ต๋๋ค.")
frame = await iframe.content_frame()
kakao_btn = await frame.wait_for_selector(
'a.ms-kakao, a:has-text("์นด์นด์ค๋ก ๋ก๊ทธ์ธ")',
timeout=self.LOGIN_WAIT_TIMEOUT
)
await kakao_btn.click()
# ์นด์นด์ค ๋ก๊ทธ์ธ ํ์ด์ง ๋๊ธฐ
await page.wait_for_url("**/accounts.kakao.com/**", timeout=self.LOGIN_WAIT_TIMEOUT)
await page.wait_for_load_state("networkidle")
await asyncio.sleep(1)
# ์์ด๋/๋น๋ฐ๋ฒํธ ์
๋ ฅ
await page.fill('input[name="loginId"], input#loginId', self.kakao_id)
await asyncio.sleep(0.3)
await page.fill('input[name="password"], input#password', self.kakao_pw)
await asyncio.sleep(0.3)
# ๋ก๊ทธ์ธ ๋ฒํผ ํด๋ฆญ
await page.click('button[type="submit"], button.submit')
logger.info("๋ก๊ทธ์ธ ๋ฒํผ ํด๋ฆญ๋จ. 2FA ํ์ธ ๋๊ธฐ ์ค...")
# ๋ก๊ทธ์ธ ๊ฒฐ๊ณผ ๋๊ธฐ (์ต๋ 120์ด - 2FA ํ์ธ ์๊ฐ ํฌํจ)
two_fa_detected = False
krx_redirected = False
for i in range(120):
await asyncio.sleep(1)
current_url = page.url
# KRX๋ก ๋ฆฌ๋ค์ด๋ ํธ ์ฑ๊ณต
if current_url.startswith("http://data.krx.co.kr") or \
current_url.startswith("https://data.krx.co.kr"):
logger.info("KRX ๋ก๊ทธ์ธ ์ฑ๊ณต!")
krx_redirected = True
break
if i % 10 == 0:
logger.info(f"2FA ํ์ธ ๋๊ธฐ ์ค... ({i}์ด)")
# "๊ณ์ํ๊ธฐ" ๋ฒํผ ์ฒ๋ฆฌ (2FA ํ ๋ํ๋จ)
try:
continue_btn = await page.query_selector('button:has-text("๊ณ์ํ๊ธฐ")')
if continue_btn:
logger.info("'๊ณ์ํ๊ธฐ' ๋ฒํผ ๋ฐ๊ฒฌ, ํด๋ฆญ...")
await continue_btn.click()
await asyncio.sleep(2)
continue
except:
pass
# ๋์ ํ๋ฉด ์ฒ๋ฆฌ
try:
agree_btn = await page.query_selector(
'button:has-text("๋์ํ๊ณ ๊ณ์ํ๊ธฐ"), button:has-text("์ ์ฒด ๋์")'
)
if agree_btn:
logger.info("๋์ ๋ฒํผ ํด๋ฆญ...")
await agree_btn.click()
await asyncio.sleep(2)
except:
pass
if not krx_redirected:
# 2์ฐจ์ธ์ฆ ํ๋ฉด ๊ฐ์ง
try:
tfa_indicators = [
'text="์นด์นด์คํก์ผ๋ก ์ธ์ฆ"',
'text="์ธ์ฆ ์์ฒญ"',
'text="๋ณธ์ธํ์ธ"',
'text="2๋จ๊ณ ์ธ์ฆ"',
]
for indicator in tfa_indicators:
elem = await page.query_selector(indicator)
if elem:
two_fa_detected = True
break
except:
pass
# 2์ฐจ์ธ์ฆ ๊ฐ์ง ์ ์๋ฌ ๋ฐ์
if two_fa_detected:
await self._cleanup_browser()
raise KRX2FARequiredError()
# ๋ก๊ทธ์ธ ์ฑ๊ณต ํ์ธ
if not krx_redirected:
current_url = page.url
await self._cleanup_browser()
raise KRXAuthError(
f"๋ก๊ทธ์ธ ์คํจ. 2FA ํ์ธ ๋๋ ์ธ์ฆ ์ ๋ณด๋ฅผ ํ์ธํ์ธ์.\n"
f"ํ์ฌ URL: {current_url[:100]}..."
)
# ์ฟ ํค ์ถ์ถ ๋ฐ ์ ์ฅ
cookies = await context.cookies()
cookie_dict = {}
krx_cookies = []
# KRX ๊ด๋ จ ์ฟ ํค๋ง ํํฐ๋ง ๋ฐ ์ ์ฉ
for cookie in cookies:
name = cookie["name"]
value = cookie["value"]
domain = cookie.get("domain", "")
path = cookie.get("path", "/")
# KRX ๋๋ฉ์ธ ์ฟ ํค๋ง ์ ์ฅ (์นด์นด์ค ์ฟ ํค๋ ์ ์ธ)
if "krx.co.kr" in domain:
logger.debug(f"KRX ์ฟ ํค ๋ฐ๊ฒฌ: {name}={value[:20]}..., domain={domain}")
self.session.cookies.set(
name, value,
domain=domain,
path=path
)
cookie_dict[name] = value
krx_cookies.append({"name": name, "value": value, "domain": domain, "path": path})
logger.info(f"KRX ์ฟ ํค {len(krx_cookies)}๊ฐ ์ ์ฅ๋จ")
self._session_info = SessionInfo(
cookies=cookie_dict,
last_login=datetime.now()
)
self._save_session(cookie_dict, krx_cookies=krx_cookies)
logger.info("์นด์นด์ค ๋ก๊ทธ์ธ ์ฑ๊ณต")
return True
except KRX2FARequiredError:
raise
except Exception as e:
logger.error(f"๋ก๊ทธ์ธ ์คํจ: {e}")
raise KRXAuthError(f"๋ก๊ทธ์ธ ์คํจ: {e}")
finally:
await self._cleanup_browser()
async def _cleanup_browser(self):
"""๋ธ๋ผ์ฐ์ ์ ๋ฆฌ"""
if self._browser:
await self._browser.close()
self._browser = None
if self._playwright:
await self._playwright.stop()
self._playwright = None
def _needs_refresh(self) -> bool:
"""์ธ์
์ ์ ์ ๊ฐฑ์ ์ด ํ์ํ์ง ํ์ธ"""
if not self._session_info:
return True
elapsed = datetime.now() - self._session_info.last_login
return elapsed >= self.SESSION_REFRESH_THRESHOLD
def check_session(self) -> bool:
"""
์ธ์
์ํ ์ฒดํฌ ๋ฐ ํ์์ ์ฌ๋ก๊ทธ์ธ
Returns:
์ธ์
์ ํจ ์ฌ๋ถ
"""
if not self.is_logged_in:
return self.login()
# ์ ์ ์ ๊ฐฑ์ : ๋ง๋ฃ๋๊ธฐ ์ ์ ๋ฏธ๋ฆฌ ๊ฐฑ์
if self._needs_refresh():
logger.info("์ธ์
๋ง๋ฃ ์์ , ์ ์ ์ ๊ฐฑ์ ์๋...")
if not self._validate_session():
logger.info("์ธ์
์ด ๋ง๋ฃ๋์ด ์ฌ๋ก๊ทธ์ธํฉ๋๋ค.")
self._cleanup_session_files()
return self.login(force=True)
if not self._validate_session():
logger.info("์ธ์
์ด ๋ง๋ฃ๋์ด ์ฌ๋ก๊ทธ์ธํฉ๋๋ค.")
self._cleanup_session_files()
return self.login(force=True)
return True
class KRXDataClient:
"""
KRX ๋ฐ์ดํฐ ํด๋ผ์ด์ธํธ
pykrx์ ๋์ผํ ์ธํฐํ์ด์ค๋ก KRX Data Marketplace์์ ๋ฐ์ดํฐ๋ฅผ ์กฐํํฉ๋๋ค.
"""
API_URL = "http://data.krx.co.kr/comm/bldAttendant/getJsonData.cmd"
ISIN_CACHE_PATH = Path.home() / ".krx_isin_cache.json" # ISIN ์บ์ ํ์ผ
ISIN_CACHE_TTL = timedelta(hours=12) # ์บ์ ์ ํจ ์๊ฐ
# bld ํ๋ผ๋ฏธํฐ (pykrx ๋ถ์ ๊ฒฐ๊ณผ)
BLD = {
# ์ข
๋ชฉ ๊ฒ์
"finder_stkisu": "dbms/comm/finder/finder_stkisu",
# ๊ฐ๋ณ์ข
๋ชฉ ์์ธ (OHLCV)
"ohlcv": "dbms/MDC/STAT/standard/MDCSTAT01701",
# ์ ์ข
๋ชฉ ์์ธ
"ohlcv_all": "dbms/MDC/STAT/standard/MDCSTAT01501",
# PER/PBR - ์ ์ข
๋ชฉ
"fundamental_all": "dbms/MDC/STAT/standard/MDCSTAT03501",
# PER/PBR - ๊ฐ๋ณ์ข
๋ชฉ ๊ธฐ๊ฐ์กฐํ
"fundamental": "dbms/MDC/STAT/standard/MDCSTAT03502",
# ํฌ์์๋ณ ๊ฑฐ๋ - ๊ธฐ๊ฐํฉ๊ณ
"investor_summary": "dbms/MDC/STAT/standard/MDCSTAT02301",
# ํฌ์์๋ณ ๊ฑฐ๋ - ์ผ๋ณ์ถ์ด (์ผ๋ฐ: 5๊ฐ ํฌ์์ ์ ํ)
"investor_daily": "dbms/MDC/STAT/standard/MDCSTAT02302",
# ํฌ์์๋ณ ๊ฑฐ๋ - ์ผ๋ณ์ถ์ด (์์ธ: 12๊ฐ ํฌ์์ ์ ํ)
"investor_daily_detail": "dbms/MDC/STAT/standard/MDCSTAT02303",
# ์ง์ ์์ธ
"index_ohlcv": "dbms/MDC/STAT/standard/MDCSTAT00301",
# ์ง์ ๊ฒ์
"finder_index": "dbms/comm/finder/finder_equidx",
}
# ์์ฅ ์ฝ๋ ๋งคํ
MARKET_CODE = {
"KOSPI": "STK",
"KOSDAQ": "KSQ",
"KONEX": "KNX",
"ALL": "ALL",
}
def __init__(
self,
kakao_id: Optional[str] = None,
kakao_pw: Optional[str] = None,
headless: bool = True,
auto_login: bool = True,
):
"""
ํด๋ผ์ด์ธํธ ์ด๊ธฐํ
Args:
kakao_id: ์นด์นด์ค ์์ด๋
kakao_pw: ์นด์นด์ค ๋น๋ฐ๋ฒํธ
headless: ํค๋๋ฆฌ์ค ๋ธ๋ผ์ฐ์ ๋ชจ๋
auto_login: ์๋ ๋ก๊ทธ์ธ ์ฌ๋ถ
"""
self._auth_manager = KakaoAuthManager(
kakao_id=kakao_id,
kakao_pw=kakao_pw,
headless=headless,
)
# ticker โ ISIN ์บ์
self._isin_cache: Dict[str, str] = {}
self._isin_cache_date: Optional[str] = None
if auto_login:
self._auth_manager.login()
@property
def session(self) -> requests.Session:
"""requests ์ธ์
"""
return self._auth_manager.session
def _ensure_session(self):
"""
์ธ์
์ ํจ์ฑ ํ์ธ - ํด๋ผ์ด์ธํธ๋ ์ด ๋ฉ์๋๋ฅผ ์ง์ ํธ์ถํ ํ์ ์์
์๋์ผ๋ก ์ฒ๋ฆฌ๋๋ ๊ฒ๋ค:
1. ํ๋ก์ธ์ค ๋ด ์บ์๋ ์ธ์
์ฌ์ฌ์ฉ (5๋ถ๊ฐ)
2. ํ์ผ์ ์ ์ฅ๋ ์ธ์
๋ก๋ ๋ฐ ์ฌ์ฌ์ฉ (๋ค๋ฅธ ํ๋ก์ธ์ค๊ฐ ๊ฒ์ฆํ ์ธ์
)
3. ์ธ์
๋ง๋ฃ ์ ์๋ ์ฌ๋ก๊ทธ์ธ (ํ์ผ ๋ฝ์ผ๋ก ๋์ ๋ก๊ทธ์ธ ๋ฐฉ์ง)
"""
global _last_session_check_time
# 1. ํ๋ก์ธ์ค ๋ด ์บ์ ํ์ธ (๊ฐ์ฅ ๋น ๋ฆ)
if _last_session_check_time and datetime.now() - _last_session_check_time < FRESH_SESSION_THRESHOLD:
return
# 2. ํ์ผ์์ ์ธ์
๋ก๋ (๋ค๋ฅธ ํ๋ก์ธ์ค๊ฐ ๊ฒ์ฆํ ์ธ์
์ฌ์ฌ์ฉ)
# login() ๋ฉ์๋๊ฐ ํ์ผ ๊ธฐ๋ฐ ์ธ์
๊ณต์ , ํ์ผ ๋ฝ, ์๋ ์ฌ๋ก๊ทธ์ธ ๋ชจ๋ ์ฒ๋ฆฌํจ
if not self._auth_manager.login():
raise KRXSessionExpiredError("์ธ์
์ ๋ณต๊ตฌํ ์ ์์ต๋๋ค.")
# ๊ฒ์ฆ ์ฑ๊ณต ์๊ฐ ๊ธฐ๋ก
_last_session_check_time = datetime.now()
def _request(
self,
bld: str,
params: Dict[str, Any],
output_key: str = "output"
) -> List[Dict[str, Any]]:
"""
KRX API ์์ฒญ
Args:
bld: bld ํ๋ผ๋ฏธํฐ
params: ์์ฒญ ํ๋ผ๋ฏธํฐ
output_key: ์๋ต์์ ๋ฐ์ดํฐ๋ฅผ ์ถ์ถํ ํค
Returns:
์๋ต ๋ฐ์ดํฐ ๋ฆฌ์คํธ
"""
self._ensure_session()
request_data = {"bld": bld, **params}
try:
resp = self.session.post(self.API_URL, data=request_data, timeout=30)
resp.raise_for_status()
data = resp.json()
# ๋ก๊ทธ์์ ์ํ ์ฒดํฌ
if isinstance(data, dict):
if data.get("RESULT") == "LOGOUT":
raise KRXSessionExpiredError("์ธ์
์ด ๋ง๋ฃ๋์์ต๋๋ค.")
# ๋ฐ์ดํฐ ์ถ์ถ
if output_key in data:
return data[output_key]
elif "OutBlock_1" in data:
return data["OutBlock_1"]
elif "block1" in data:
return data["block1"]
else:
return [data] if data else []
elif isinstance(data, list):
return data
else:
return []
except requests.exceptions.HTTPError as e:
if e.response is not None and e.response.status_code == 400:
# 400 ์๋ฌ์ ์ค์ ์์ธ ํ์
response_text = e.response.text[:200] if e.response.text else "(empty)"
content_type = e.response.headers.get("Content-Type", "")
logger.warning(f"400 Bad Request - ์๋ต: {response_text}, Content-Type: {content_type}")
logger.debug(f"์์ฒญ ํ๋ผ๋ฏธํฐ: {request_data}")
# ์ธ์
๋ฌธ์ ์ฌ๋ถ ํ๋จ
is_session_issue = False
if "text/html" in content_type:
# HTML ์๋ต = ๋ก๊ทธ์ธ ํ์ด์ง๋ก ๋ฆฌ๋ค์ด๋ ํธ๋จ
is_session_issue = True
logger.info("HTML ์๋ต ๊ฐ์ง - ์ธ์
๋ง๋ฃ๋ก ํ๋จ")
elif not e.response.text or e.response.text.strip() == "":
# ๋น ์๋ต = ์ธ์
๋ฌธ์ ์ผ ๊ฐ๋ฅ์ฑ
is_session_issue = True
logger.info("๋น ์๋ต ๊ฐ์ง - ์ธ์
๋ง๋ฃ๋ก ํ๋จ")
elif "LOGOUT" in response_text.upper():
is_session_issue = True
logger.info("LOGOUT ๊ฐ์ง - ์ธ์
๋ง๋ฃ๋ก ํ๋จ")
if is_session_issue:
raise KRXSessionExpiredError(f"์ธ์
๋ง๋ฃ: {response_text}")
else:
# ์ธ์
๋ฌธ์ ๊ฐ ์๋ 400 ์๋ฌ (์๋ชป๋ ํ๋ผ๋ฏธํฐ ๋ฑ)
raise KRXDataError(f"API ์์ฒญ ์คํจ (400): {response_text}")
raise KRXDataError(f"API ์์ฒญ ์คํจ: {e}")
except requests.exceptions.RequestException as e:
raise KRXDataError(f"API ์์ฒญ ์คํจ: {e}")
# =========================================================================
# ์ข
๋ชฉ ๊ฒ์
# =========================================================================
@retry_on_session_expired()
def get_market_ticker_list(
self,
date: Optional[str] = None,
market: str = "ALL"
) -> List[str]:
"""
์ข
๋ชฉ ์ฝ๋ ๋ฆฌ์คํธ ์กฐํ
Args:
date: ๊ธฐ์ค์ผ์ (YYYYMMDD), None์ด๋ฉด ์ค๋
market: ์์ฅ (KOSPI/KOSDAQ/KONEX/ALL)
Returns:
์ข
๋ชฉ ์ฝ๋ ๋ฆฌ์คํธ
"""
df = self._get_ticker_info(market)
return df["short_code"].tolist() if not df.empty else []
@retry_on_session_expired()
def get_market_ticker_name(self, date: Optional[str] = None, market: str = "ALL") -> Dict[str, str]:
"""
์ข
๋ชฉ์ฝ๋-์ข
๋ชฉ๋ช
๋งคํ
Args:
date: ๊ธฐ์ค์ผ์ (๋ฏธ์ฌ์ฉ, ํธํ์ฑ์ฉ)
market: ์์ฅ
Returns:
{์ข
๋ชฉ์ฝ๋: ์ข
๋ชฉ๋ช
} ๋์
๋๋ฆฌ
"""
df = self._get_ticker_info(market)
if df.empty:
return {}
return dict(zip(df["short_code"], df["codeName"]))
def _get_ticker_info(self, market: str = "ALL") -> DataFrame:
"""์ข
๋ชฉ ์ ๋ณด ์กฐํ (๋ด๋ถ์ฉ)"""
mktsel = self.MARKET_CODE.get(market.upper(), "ALL")
items = self._request(
self.BLD["finder_stkisu"],
{"locale": "ko_KR", "mktsel": mktsel, "searchText": "", "typeNo": 0}
)
if not items:
return DataFrame()
return DataFrame(items)
def _load_isin_cache(self) -> bool:
"""ํ์ผ์์ ISIN ์บ์ ๋ก๋"""
try:
if not self.ISIN_CACHE_PATH.exists():
return False
data = json.loads(self.ISIN_CACHE_PATH.read_text())
cache_date = data.get("date")
cache_time_str = data.get("cached_at")
cache = data.get("cache", {})
if not cache_date or not cache_time_str or not cache:
return False
# TTL ์ฒดํฌ
cached_at = datetime.fromisoformat(cache_time_str)
if datetime.now() - cached_at > self.ISIN_CACHE_TTL:
logger.debug("ISIN ์บ์ ๋ง๋ฃ (TTL ์ด๊ณผ)")
return False
self._isin_cache = cache
self._isin_cache_date = cache_date
logger.debug(f"ISIN ์บ์ ํ์ผ์์ ๋ก๋: {len(cache)}๊ฐ ์ข
๋ชฉ (๋ ์ง: {cache_date})")
return True
except Exception as e:
logger.warning(f"ISIN ์บ์ ๋ก๋ ์คํจ: {e}")
return False
def _save_isin_cache(self, date: str):
"""ISIN ์บ์๋ฅผ ํ์ผ์ ์ ์ฅ"""
try:
data = {
"date": date,
"cached_at": datetime.now().isoformat(),
"cache": self._isin_cache
}
self.ISIN_CACHE_PATH.write_text(json.dumps(data, ensure_ascii=False))
logger.debug(f"ISIN ์บ์ ํ์ผ ์ ์ฅ: {len(self._isin_cache)}๊ฐ ์ข
๋ชฉ")
except Exception as e:
logger.warning(f"ISIN ์บ์ ์ ์ฅ ์คํจ: {e}")
def _build_isin_cache(self, date: str):
"""ISIN ์บ์ ๊ตฌ์ถ (๋ฉ๋ชจ๋ฆฌ โ ํ์ผ โ API ์์๋ก ํ์ธ)"""
# 1. ๋ฉ๋ชจ๋ฆฌ ์บ์ ํ์ธ
if self._isin_cache and self._isin_cache_date == date:
return
# 2. ํ์ผ ์บ์ ํ์ธ
if self._load_isin_cache():
if self._isin_cache_date == date:
return
# ๋ ์ง๊ฐ ๋ค๋ฅด๋ฉด ์ฌ๊ตฌ์ถ ํ์
logger.debug(f"ISIN ์บ์ ๋ ์ง ๋ถ์ผ์น: {self._isin_cache_date} vs {date}")
# 3. API์์ ์๋ก ๊ตฌ์ถ
logger.info(f"ISIN ์บ์ ๊ตฌ์ถ ์ค... (๋ ์ง: {date})")
items = self._request(
self.BLD["fundamental_all"],
{"mktId": "ALL", "trdDd": date}
)
self._isin_cache = {}
for item in items:
ticker = item.get("ISU_SRT_CD", "")
isin = item.get("ISU_CD", "")
if ticker and isin:
self._isin_cache[ticker] = isin
self._isin_cache_date = date
logger.info(f"ISIN ์บ์ ๊ตฌ์ถ ์๋ฃ: {len(self._isin_cache)}๊ฐ ์ข
๋ชฉ")
# ํ์ผ์ ์ ์ฅ (๋ค๋ฅธ ํ๋ก์ธ์ค๊ฐ ์ฌ์ฌ์ฉ ๊ฐ๋ฅ)
self._save_isin_cache(date)
def _get_isin(self, ticker: str, date: str) -> Optional[str]:
"""ticker์์ ISIN ์กฐํ"""
# ISIN ์บ์๋ ๊ฐ์ฅ ์ต๊ทผ ์์
์ผ ๊ธฐ์ค์ผ๋ก ๊ตฌ์ถ (์ฅ ์์ ์ /ํด์ผ์๋ ๋์)
cache_date = self.get_nearest_business_day(date)
self._build_isin_cache(cache_date)
isin = self._isin_cache.get(ticker)
# fundamental_all์์ ๋ชป ์ฐพ์ผ๋ฉด finder_stkisu์์ ๊ฒ์ (์ธ๊ตญ ์์ฅ์ฌ ๋ฑ)
if not isin:
items = self._request(
self.BLD["finder_stkisu"],
{"locale": "ko_KR", "mktsel": "ALL", "searchText": ticker, "typeNo": 0}
)
for item in items:
if item.get("short_code") == ticker:
isin = item.get("full_code")
# ์บ์์ ์ถ๊ฐ
if isin:
self._isin_cache[ticker] = isin
logger.debug(f"finder_stkisu์์ ISIN ์ฐพ์: {ticker} -> {isin}")
break
return isin
# =========================================================================
# OHLCV (์์ธ)
# =========================================================================
@retry_on_session_expired()
def get_market_ohlcv(
self,
fromdate: str,
todate: str,
ticker: str,
adjusted: bool = True
) -> DataFrame:
"""
๊ฐ๋ณ์ข
๋ชฉ OHLCV ์กฐํ (pykrx ํธํ)
Args:
fromdate: ์์์ผ (YYYYMMDD)
todate: ์ข
๋ฃ์ผ (YYYYMMDD)
ticker: ์ข
๋ชฉ์ฝ๋ (6์๋ฆฌ)
adjusted: ์์ ์ฃผ๊ฐ ์ฌ๋ถ
Returns:
DataFrame: ๋ ์ง๋ณ OHLCV
- Open, High, Low, Close, Volume
"""
isin = self._get_isin(ticker, todate)
if not isin:
raise KRXDataError(f"์ข
๋ชฉ์ ์ฐพ์ ์ ์์ต๋๋ค: {ticker}")
items = self._request(
self.BLD["ohlcv"],
{
"isuCd": isin,
"strtDd": fromdate,
"endDd": todate,
"adjStkPrc": 2 if adjusted else 1, # 2: ์์ ์ฃผ๊ฐ, 1: ๋จ์์ฃผ๊ฐ
}
)
if not items:
return DataFrame()
df = DataFrame(items)
# ์ปฌ๋ผ ๋งคํ (pykrx ํ์)
column_map = {
"TRD_DD": "๋ ์ง",
"TDD_OPNPRC": "์๊ฐ",
"TDD_HGPRC": "๊ณ ๊ฐ",
"TDD_LWPRC": "์ ๊ฐ",
"TDD_CLSPRC": "์ข
๊ฐ",
"ACC_TRDVOL": "๊ฑฐ๋๋",
"ACC_TRDVAL": "๊ฑฐ๋๋๊ธ",
"MKTCAP": "์๊ฐ์ด์ก",
}
df = df.rename(columns=column_map)
# pykrx ์๋ฌธ ์ปฌ๋ผ๋ช
์ผ๋ก ๋ณํ
eng_map = {
"์๊ฐ": "Open",
"๊ณ ๊ฐ": "High",
"์ ๊ฐ": "Low",
"์ข
๊ฐ": "Close",
"๊ฑฐ๋๋": "Volume",
"๊ฑฐ๋๋๊ธ": "Amount",
"์๊ฐ์ด์ก": "MarketCap",
}
df = df.rename(columns=eng_map)
# ์ซ์ ๋ณํ
numeric_cols = ["Open", "High", "Low", "Close", "Volume", "Amount", "MarketCap"]
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(
df[col].astype(str).str.replace(",", ""),
errors="coerce"
)
# ๋ ์ง ์ธ๋ฑ์ค
if "๋ ์ง" in df.columns:
df["๋ ์ง"] = pd.to_datetime(df["๋ ์ง"], format="%Y/%m/%d")
df = df.set_index("๋ ์ง")
df.index.name = None
df = df.sort_index()
# pykrx์ ๋์ผํ ์ปฌ๋ผ๋ง ๋ฐํ
result_cols = ["Open", "High", "Low", "Close", "Volume", "Amount", "MarketCap"]
available = [c for c in result_cols if c in df.columns]
return df[available] if available else df
# =========================================================================
# ์๊ฐ์ด์ก
# =========================================================================
@retry_on_session_expired()
def get_market_cap(
self,
fromdate: str,
todate: str,
ticker: str
) -> DataFrame:
"""
์๊ฐ์ด์ก ์กฐํ (pykrx ํธํ)
Args:
fromdate: ์์์ผ (YYYYMMDD)
todate: ์ข
๋ฃ์ผ (YYYYMMDD)
ticker: ์ข
๋ชฉ์ฝ๋
Returns:
DataFrame: ์๊ฐ์ด์ก, ๊ฑฐ๋๋, ๊ฑฐ๋๋๊ธ, ์์ฅ์ฃผ์์
"""
df = self.get_market_ohlcv(fromdate, todate, ticker)
if df.empty:
return df
# ์๊ฐ์ด์ก ๊ด๋ จ ์ปฌ๋ผ๋ง ๋ฐํ
cols = ["MarketCap", "Volume", "Amount"]
available = [c for c in cols if c in df.columns]
return df[available] if available else df
# =========================================================================
# PER/PBR/๋ฐฐ๋น์์ต๋ฅ (Fundamental)
# =========================================================================
@retry_on_session_expired()
def get_market_fundamental(
self,
fromdate: str,
todate: str,
ticker: str
) -> DataFrame:
"""
PER/PBR/๋ฐฐ๋น์์ต๋ฅ ์กฐํ (pykrx ํธํ)
Args:
fromdate: ์์์ผ (YYYYMMDD)
todate: ์ข
๋ฃ์ผ (YYYYMMDD)
ticker: ์ข
๋ชฉ์ฝ๋
Returns:
DataFrame: BPS, PER, PBR, EPS, DIV, DPS
"""
isin = self._get_isin(ticker, todate)
if not isin:
raise KRXDataError(f"์ข
๋ชฉ์ ์ฐพ์ ์ ์์ต๋๋ค: {ticker}")
items = self._request(
self.BLD["fundamental"],
{
"isuCd": isin,
"mktId": "ALL",
"strtDd": fromdate,
"endDd": todate,
}
)
if not items:
return DataFrame()
df = DataFrame(items)
# ์ปฌ๋ผ ๋งคํ (pykrx ํ์)
column_map = {
"TRD_DD": "๋ ์ง",
"TDD_CLSPRC": "์ข
๊ฐ",
"EPS": "EPS",
"PER": "PER",
"BPS": "BPS",
"PBR": "PBR",
"DPS": "DPS",
"DVD_YLD": "DIV",
}
df = df.rename(columns=column_map)
# ์ซ์ ๋ณํ
numeric_cols = ["์ข
๊ฐ", "EPS", "PER", "BPS", "PBR", "DPS", "DIV"]
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(
df[col].astype(str).str.replace(",", ""),
errors="coerce"
)
# ๋ ์ง ์ธ๋ฑ์ค
if "๋ ์ง" in df.columns:
df["๋ ์ง"] = pd.to_datetime(df["๋ ์ง"], format="%Y/%m/%d")
df = df.set_index("๋ ์ง")
df.index.name = None
df = df.sort_index()
# pykrx์ ๋์ผํ ์ปฌ๋ผ๋ง ๋ฐํ
result_cols = ["BPS", "PER", "PBR", "EPS", "DIV", "DPS"]
available = [c for c in result_cols if c in df.columns]
return df[available] if available else df
# =========================================================================
# ํฌ์์๋ณ ๊ฑฐ๋๋
# =========================================================================
@retry_on_session_expired()
def get_market_trading_volume_by_date(
self,
fromdate: str,
todate: str,
ticker: str,
detail: bool = False
) -> DataFrame:
"""
ํฌ์์๋ณ ๊ฑฐ๋๋ ์กฐํ (pykrx ํธํ)
Args:
fromdate: ์์์ผ (YYYYMMDD)
todate: ์ข
๋ฃ์ผ (YYYYMMDD)
ticker: ์ข
๋ชฉ์ฝ๋
detail: ์์ธ ํฌ์์ ๊ตฌ๋ถ ์ฌ๋ถ
- False: 5๊ฐ ์ ํ (๊ธฐ๊ดํฉ๊ณ, ๊ธฐํ๋ฒ์ธ, ๊ฐ์ธ, ์ธ๊ตญ์ธํฉ๊ณ, ์ ์ฒด)
- True: 12๊ฐ ์ ํ (๊ธ์ตํฌ์, ๋ณดํ, ํฌ์ , ์ฌ๋ชจ, ์ํ, ๊ธฐํ๊ธ์ต, ์ฐ๊ธฐ๊ธ, ๊ธฐํ๋ฒ์ธ, ๊ฐ์ธ, ์ธ๊ตญ์ธ, ๊ธฐํ์ธ๊ตญ์ธ, ์ ์ฒด)
Returns:
DataFrame: ํฌ์์๋ณ ์๋งค์๋
"""
isin = self._get_isin(ticker, todate)
if not isin:
raise KRXDataError(f"์ข
๋ชฉ์ ์ฐพ์ ์ ์์ต๋๋ค: {ticker}")
# detail ์ฌ๋ถ์ ๋ฐ๋ผ ๋ค๋ฅธ bld ์ฌ์ฉ
bld_key = "investor_daily_detail" if detail else "investor_daily"
items = self._request(
self.BLD[bld_key],
{
"isuCd": isin,
"strtDd": fromdate,
"endDd": todate,
"inqTpCd": 2,
"trdVolVal": 1, # ๊ฑฐ๋๋
"askBid": 3, # ์๋งค์
}
)
if not items:
return DataFrame()
df = DataFrame(items)
# ์ปฌ๋ผ ๋งคํ (detail ์ฌ๋ถ์ ๋ฐ๋ผ ๋ค๋ฆ)
if detail:
# ์์ธ: 12๊ฐ ํฌ์์ ์ ํ
column_map = {
"TRD_DD": "๋ ์ง",
"TRDVAL1": "๊ธ์ตํฌ์",
"TRDVAL2": "๋ณดํ",
"TRDVAL3": "ํฌ์ ",
"TRDVAL4": "์ฌ๋ชจ",
"TRDVAL5": "์ํ",
"TRDVAL6": "๊ธฐํ๊ธ์ต",
"TRDVAL7": "์ฐ๊ธฐ๊ธ",
"TRDVAL8": "๊ธฐํ๋ฒ์ธ",
"TRDVAL9": "๊ฐ์ธ",
"TRDVAL10": "์ธ๊ตญ์ธ",
"TRDVAL11": "๊ธฐํ์ธ๊ตญ์ธ",
"TRDVAL_TOT": "์ ์ฒด",
}
else:
# ์ผ๋ฐ: 5๊ฐ ํฌ์์ ์ ํ
column_map = {
"TRD_DD": "๋ ์ง",
"TRDVAL1": "๊ธฐ๊ดํฉ๊ณ",
"TRDVAL2": "๊ธฐํ๋ฒ์ธ",
"TRDVAL3": "๊ฐ์ธ",
"TRDVAL4": "์ธ๊ตญ์ธํฉ๊ณ",
"TRDVAL_TOT": "์ ์ฒด",
}
df = df.rename(columns=column_map)
# ์ซ์ ๋ณํ
numeric_cols = list(column_map.values())[1:]
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(
df[col].astype(str).str.replace(",", ""),
errors="coerce"
)
# ๋ ์ง ์ธ๋ฑ์ค
if "๋ ์ง" in df.columns:
df["๋ ์ง"] = pd.to_datetime(df["๋ ์ง"], format="%Y/%m/%d")
df = df.set_index("๋ ์ง")
df.index.name = None
df = df.sort_index()
return df
# =========================================================================
# ์ง์ OHLCV
# =========================================================================
@retry_on_session_expired()
def get_index_ohlcv(
self,
fromdate: str,
todate: str,
ticker: str,
freq: str = "d"
) -> DataFrame:
"""
์ง์ OHLCV ์กฐํ (pykrx ํธํ)
Args:
fromdate: ์์์ผ (YYYYMMDD)
todate: ์ข
๋ฃ์ผ (YYYYMMDD)
ticker: ์ง์ ์ฝ๋ (์: 1001=KOSPI, 2001=KOSDAQ)
freq: ๋น๋ (d/m/y) - ํ์ฌ d๋ง ์ง์
Returns:
DataFrame: ์ง์ OHLCV
"""
# pykrx ์ง์ ํฐ์ปค ํ์: 1xxx=KOSPI, 2xxx=KOSDAQ
# API ํ๋ผ๋ฏธํฐ:
# indIdx: ๊ทธ๋ฃน ID (1=KOSPI, 2=KOSDAQ ๋ฑ)
# indIdx2: ์ง์ ์ฝ๋ (001=์ฝ์คํผ/์ฝ์ค๋ฅ, 028=KOSPI 200 ๋ฑ)
ticker_str = str(ticker)
ind_idx = ticker_str[0] # ์ฒซ ๋ฒ์งธ ์๋ฆฌ: ๊ทธ๋ฃน ID
idx_code = ticker_str[1:] # ๋๋จธ์ง: ์ง์ ์ฝ๋
items = self._request(
self.BLD["index_ohlcv"],
{
"indIdx2": idx_code,
"indIdx": ind_idx,
"strtDd": fromdate,
"endDd": todate,
}
)
if not items:
return DataFrame()
df = DataFrame(items)
# ์ปฌ๋ผ ๋งคํ
column_map = {
"TRD_DD": "๋ ์ง",
"OPNPRC_IDX": "์๊ฐ",
"HGPRC_IDX": "๊ณ ๊ฐ",
"LWPRC_IDX": "์ ๊ฐ",
"CLSPRC_IDX": "์ข
๊ฐ",
"ACC_TRDVOL": "๊ฑฐ๋๋",
"ACC_TRDVAL": "๊ฑฐ๋๋๊ธ",
}
df = df.rename(columns=column_map)
# pykrx ์๋ฌธ ์ปฌ๋ผ
eng_map = {
"์๊ฐ": "Open",
"๊ณ ๊ฐ": "High",
"์ ๊ฐ": "Low",
"์ข
๊ฐ": "Close",
"๊ฑฐ๋๋": "Volume",
"๊ฑฐ๋๋๊ธ": "Amount",
}
df = df.rename(columns=eng_map)
# ์ซ์ ๋ณํ
numeric_cols = ["Open", "High", "Low", "Close", "Volume", "Amount"]
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(
df[col].astype(str).str.replace(",", ""),
errors="coerce"
)
# ๋ ์ง ์ธ๋ฑ์ค
if "๋ ์ง" in df.columns:
df["๋ ์ง"] = pd.to_datetime(df["๋ ์ง"], format="%Y/%m/%d")
df = df.set_index("๋ ์ง")
df.index.name = None
df = df.sort_index()
# pykrx์ ๋์ผํ ์ปฌ๋ผ๋ง ๋ฐํ
result_cols = ["Open", "High", "Low", "Close", "Volume", "Amount"]
available = [c for c in result_cols if c in df.columns]
return df[available] if available else df
# =========================================================================
# ์ ์ฒด ์ข
๋ชฉ ์กฐํ (pykrx ํธํ)
# =========================================================================
@retry_on_session_expired()
def get_market_ohlcv_by_ticker(self, date: str, market: str = "ALL") -> DataFrame:
"""
ํน์ ์ผ ์ ์ฒด ์ข
๋ชฉ์ OHLCV ์กฐํ (pykrx ํธํ)
Args:
date: ์กฐํ์ผ (YYYYMMDD)
market: ์์ฅ๊ตฌ๋ถ ("ALL", "KOSPI", "KOSDAQ", "KONEX")
Returns:
DataFrame: ์ข
๋ชฉ์ฝ๋ ์ธ๋ฑ์ค, OHLCV ์ปฌ๋ผ
"""
# ๊ฐ์ฅ ์ต๊ทผ ์์
์ผ๋ก ๋ณํ (์ฅ ์์ ์ /ํด์ผ์๋ ๋์)
query_date = self.get_nearest_business_day(date)
market_map = {
"ALL": "ALL",
"KOSPI": "STK",
"KOSDAQ": "KSQ",
"KONEX": "KNX",
}
mkt_id = market_map.get(market.upper(), "ALL")
items = self._request(
"dbms/MDC/STAT/standard/MDCSTAT01501",
{
"mktId": mkt_id,
"trdDd": query_date,
}
)
if not items:
return DataFrame()
df = DataFrame(items)
# ์ปฌ๋ผ ๋งคํ (pykrx ํธํ ์๋ฌธ ์ปฌ๋ผ๋ช
)
column_map = {
"ISU_SRT_CD": "Ticker",
"TDD_OPNPRC": "Open",
"TDD_HGPRC": "High",
"TDD_LWPRC": "Low",
"TDD_CLSPRC": "Close",
"ACC_TRDVOL": "Volume",
"ACC_TRDVAL": "Amount",
}
df = df.rename(columns=column_map)
# ์ซ์ ๋ณํ
numeric_cols = ["Open", "High", "Low", "Close", "Volume", "Amount"]
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(
df[col].astype(str).str.replace(",", ""),
errors="coerce"
)
# ํฐ์ปค ์ธ๋ฑ์ค
if "Ticker" in df.columns:
df = df.set_index("Ticker")
# ํ์ํ ์ปฌ๋ผ๋ง ๋ฐํ (pykrx ํธํ ์๋ฌธ ์ปฌ๋ผ๋ช
)
result_cols = ["Open", "High", "Low", "Close", "Volume", "Amount"]
available = [c for c in result_cols if c in df.columns]
return df[available] if available else df
@retry_on_session_expired()
def get_market_cap_by_ticker(self, date: str, market: str = "ALL") -> DataFrame:
"""
ํน์ ์ผ ์ ์ฒด ์ข
๋ชฉ์ ์๊ฐ์ด์ก ์กฐํ (pykrx ํธํ)
Args:
date: ์กฐํ์ผ (YYYYMMDD)
market: ์์ฅ๊ตฌ๋ถ ("ALL", "KOSPI", "KOSDAQ", "KONEX")
Returns:
DataFrame: ์ข
๋ชฉ์ฝ๋ ์ธ๋ฑ์ค, ์๊ฐ์ด์ก/๊ฑฐ๋๋/๊ฑฐ๋๋๊ธ/์์ฅ์ฃผ์์ ์ปฌ๋ผ
"""
# ๊ฐ์ฅ ์ต๊ทผ ์์
์ผ๋ก ๋ณํ (์ฅ ์์ ์ /ํด์ผ์๋ ๋์)
query_date = self.get_nearest_business_day(date)
market_map = {
"ALL": "ALL",
"KOSPI": "STK",
"KOSDAQ": "KSQ",
"KONEX": "KNX",
}
mkt_id = market_map.get(market.upper(), "ALL")
# MDCSTAT01501 (์ ์ข
๋ชฉ ์์ธ) endpoint ์ฌ์ฉ - MKTCAP ํฌํจ
items = self._request(
self.BLD["ohlcv_all"],
{
"mktId": mkt_id,
"trdDd": query_date,
}
)
if not items:
return DataFrame()
df = DataFrame(items)
# ์ปฌ๋ผ ๋งคํ (pykrx ํ์)
column_map = {
"ISU_SRT_CD": "ํฐ์ปค",
"MKTCAP": "์๊ฐ์ด์ก",
"ACC_TRDVOL": "๊ฑฐ๋๋",
"ACC_TRDVAL": "๊ฑฐ๋๋๊ธ",
"LIST_SHRS": "์์ฅ์ฃผ์์",
}
df = df.rename(columns=column_map)
# ์ซ์ ๋ณํ
numeric_cols = ["์๊ฐ์ด์ก", "๊ฑฐ๋๋", "๊ฑฐ๋๋๊ธ", "์์ฅ์ฃผ์์"]
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(
df[col].astype(str).str.replace(",", ""),
errors="coerce"
)
# ํฐ์ปค ์ธ๋ฑ์ค
if "ํฐ์ปค" in df.columns:
df = df.set_index("ํฐ์ปค")
# ํ์ํ ์ปฌ๋ผ๋ง ๋ฐํ
result_cols = ["์๊ฐ์ด์ก", "๊ฑฐ๋๋", "๊ฑฐ๋๋๊ธ", "์์ฅ์ฃผ์์"]
available = [c for c in result_cols if c in df.columns]
return df[available] if available else df
def get_market_ticker_list(self, date: Optional[str] = None, market: str = "KOSPI") -> List[str]:
"""
ํฐ์ปค ๋ชฉ๋ก ์กฐํ (pykrx ํธํ)
Args:
date: ์กฐํ์ผ (YYYYMMDD), None์ด๋ฉด ์ต๊ทผ ์์
์ผ
market: ์์ฅ๊ตฌ๋ถ ("KOSPI", "KOSDAQ", "KONEX", "ALL")
Returns:
List[str]: ํฐ์ปค ์ฝ๋ ๋ฆฌ์คํธ
"""
if date is None:
date = self.get_nearest_business_day()
tickers = self.get_market_ticker_name(date=date, market=market)
return list(tickers.keys())
@retry_on_session_expired()
def get_market_trading_value_by_investor(
self,
fromdate: str,
todate: str,
ticker: str,
detail: bool = False
) -> DataFrame:
"""
ํฌ์์๋ณ ๊ฑฐ๋๋๊ธ ์กฐํ (pykrx ํธํ)
Args:
fromdate: ์์์ผ (YYYYMMDD)
todate: ์ข
๋ฃ์ผ (YYYYMMDD)
ticker: ์ข
๋ชฉ์ฝ๋ (6์๋ฆฌ)
detail: True๋ฉด 12๊ฐ ํฌ์์, False๋ฉด 5๊ฐ ํฉ์ฐ
Returns:
DataFrame: ๋ ์ง๋ณ ํฌ์์๋ณ ์๋งค์๊ธ์ก
"""
isin = self._get_isin(ticker, todate)
if not isin:
raise KRXDataError(f"์ข
๋ชฉ์ ์ฐพ์ ์ ์์ต๋๋ค: {ticker}")
# detail ์ฌ๋ถ์ ๋ฐ๋ผ ๋ค๋ฅธ BLD ์ฌ์ฉ
bld = self.BLD["trading_volume_detail"] if detail else self.BLD["trading_volume"]
items = self._request(
bld,
{
"isuCd": isin,
"strtDd": fromdate,
"endDd": todate,
}
)
if not items:
return DataFrame()
df = DataFrame(items)
# ์ปฌ๋ผ ๋งคํ - ๊ฑฐ๋๋๊ธ(ASK_TRDVAL: ๋งค๋, BID_TRDVAL: ๋งค์)
date_col = "TRD_DD"
if date_col not in df.columns:
return DataFrame()
# ์๋งค์๊ธ์ก ๊ณ์ฐ (๋งค์ - ๋งค๋)
result_data = []
for _, row in df.iterrows():
row_dict = {"๋ ์ง": row[date_col]}
if detail:
# ์์ธ (12๊ฐ ํฌ์์)
investors = [
("๊ธ์ตํฌ์", "TRDVAL1"),
("๋ณดํ", "TRDVAL2"),
("ํฌ์ ", "TRDVAL3"),
("์ฌ๋ชจ", "TRDVAL4"),
("์ํ", "TRDVAL5"),
("๊ธฐํ๊ธ์ต", "TRDVAL6"),
("์ฐ๊ธฐ๊ธ", "TRDVAL7"),
("๊ธฐ๊ดํฉ๊ณ", "TRDVAL_INST"),
("๊ธฐํ๋ฒ์ธ", "TRDVAL8"),
("๊ฐ์ธ", "TRDVAL9"),
("์ธ๊ตญ์ธ", "TRDVAL10"),
("๊ธฐํ์ธ๊ตญ์ธ", "TRDVAL11"),
]
else:
# ํฉ์ฐ (5๊ฐ ํฌ์์)
investors = [
("๊ธฐ๊ดํฉ๊ณ", "TRDVAL_INST"),
("๊ธฐํ๋ฒ์ธ", "TRDVAL1"),
("๊ฐ์ธ", "TRDVAL2"),
("์ธ๊ตญ์ธํฉ๊ณ", "TRDVAL3"),
("์ ์ฒด", "TRDVAL4"),
]
for name, col_prefix in investors:
buy_col = f"BID_{col_prefix}" if f"BID_{col_prefix}" in row else col_prefix
sell_col = f"ASK_{col_prefix}" if f"ASK_{col_prefix}" in row else None
if buy_col in row:
buy = pd.to_numeric(str(row.get(buy_col, 0)).replace(",", ""), errors="coerce") or 0
sell = pd.to_numeric(str(row.get(sell_col, 0)).replace(",", ""), errors="coerce") or 0 if sell_col else 0
row_dict[name] = buy - sell
result_data.append(row_dict)
result_df = DataFrame(result_data)
# ๋ ์ง ์ธ๋ฑ์ค
if "๋ ์ง" in result_df.columns:
result_df["๋ ์ง"] = pd.to_datetime(result_df["๋ ์ง"], format="%Y/%m/%d")
result_df = result_df.set_index("๋ ์ง")
result_df.index.name = None
result_df = result_df.sort_index()
return result_df
@retry_on_session_expired()
def get_market_trading_value_by_date(
self,
fromdate: str,
todate: str,
ticker: str,
on: str = "์๋งค์"
) -> DataFrame:
"""
์ผ์๋ณ ๊ฑฐ๋๋๊ธ ์กฐํ (pykrx ํธํ)
Args:
fromdate: ์์์ผ (YYYYMMDD)
todate: ์ข
๋ฃ์ผ (YYYYMMDD)
ticker: ์ข
๋ชฉ์ฝ๋ (6์๋ฆฌ)
on: "๋งค๋", "๋งค์", "์๋งค์" ์ค ํ๋
Returns:
DataFrame: ๋ ์ง๋ณ ๊ฑฐ๋๋๊ธ
"""
# ํฌ์์๋ณ ๊ฑฐ๋๋๊ธ ์กฐํ ํ ์ง๊ณ
df = self.get_market_trading_value_by_investor(fromdate, todate, ticker)
return df
# =========================================================================
# pykrx ํธํ ๋ณ์นญ
# =========================================================================
def get_market_ohlcv_by_date(
self,
fromdate: str,
todate: str,
ticker: str,
adjusted: bool = True
) -> DataFrame:
"""pykrx ํธํ: get_market_ohlcv์ ๋ณ์นญ"""
return self.get_market_ohlcv(fromdate, todate, ticker, adjusted)
def get_market_cap_by_date(
self,
fromdate: str,
todate: str,
ticker: str
) -> DataFrame:
"""pykrx ํธํ: get_market_cap์ ๋ณ์นญ"""
return self.get_market_cap(fromdate, todate, ticker)
def get_market_fundamental_by_date(
self,
fromdate: str,
todate: str,
ticker: str
) -> DataFrame:
"""pykrx ํธํ: get_market_fundamental์ ๋ณ์นญ"""
return self.get_market_fundamental(fromdate, todate, ticker)
def get_index_ohlcv_by_date(
self,
fromdate: str,
todate: str,
ticker: str,
freq: str = "d"
) -> DataFrame:
"""pykrx ํธํ: get_index_ohlcv์ ๋ณ์นญ"""
return self.get_index_ohlcv(fromdate, todate, ticker, freq)
# =========================================================================
# ์ ํธ๋ฆฌํฐ
# =========================================================================
def get_nearest_business_day(self, target_date: Optional[str] = None) -> str:
"""
๊ฐ์ฅ ๊ฐ๊น์ด ์์
์ผ ์กฐํ (๊ณผ๊ฑฐ ๋ฐฉํฅ์ผ๋ก ํ์)
Args:
target_date: ๊ธฐ์ค์ผ (YYYYMMDD), None์ด๋ฉด ์ค๋
Returns:
์์
์ผ (YYYYMMDD)
Note:
- ์ฅ ์์ ์ (09:00 ์ด์ )์๋ ์ ์์
์ผ์ ๋ฐํ
- ์ฅ ๋ง๊ฐ ํ์๋ ๋น์ผ์ ๋ฐํ
- ์ ๋ฌ๋ ๋ ์ง๊ฐ ์ค๋์ธ ๊ฒฝ์ฐ์๋ ๋์ผํ๊ฒ ์ ์ฉ
"""
today = date.today()
if target_date:
dt = datetime.strptime(target_date, "%Y%m%d").date()
else:
dt = today
kr_holidays = KR()
# ์ค๋ ๋ ์ง์ธ ๊ฒฝ์ฐ, ์ฅ ์์ ์ (09:00 ์ด์ )์ด๋ฉด ์ ์ผ ๊ธฐ์ค์ผ๋ก ํ์
if dt == today:
now = datetime.now()
if now.hour < 9:
# ์ค๋์ด ์์
์ผ์ด์ด๋ ์์ง ๋ฐ์ดํฐ๊ฐ ์์ผ๋ฏ๋ก ์ ์ผ๋ถํฐ ํ์
dt -= timedelta(days=1)
# ์ต๋ 10์ผ ์ ๊น์ง ํ์
for _ in range(10):
if self._is_market_day(dt, kr_holidays):
return dt.strftime("%Y%m%d")
dt -= timedelta(days=1)
return dt.strftime("%Y%m%d")
def _is_market_day(self, dt: date, kr_holidays) -> bool:
"""
ํ๊ตญ ์ฃผ์ ์์ฅ ์์
์ผ ์ฌ๋ถ ํ์ธ
Args:
dt: ํ์ธํ ๋ ์ง
kr_holidays: ํ๊ตญ ๊ณตํด์ผ ๊ฐ์ฒด
Returns:
์์
์ผ์ด๋ฉด True
"""
# ์ฃผ๋ง
if dt.weekday() >= 5:
return False
# ๊ณตํด์ผ
if dt in kr_holidays:
return False
# ๋
ธ๋์ (5/1) - ์ฆ๊ถ์์ฅ ํด์ฅ
if dt.month == 5 and dt.day == 1:
return False
# ์ฐ๋ง (12/31) - ์ฆ๊ถ์์ฅ ํด์ฅ
if dt.month == 12 and dt.day == 31:
return False
# ์ฐ๋๋ณ ํน๋ณ ํด์ฅ์ผ
if dt.year == 2025:
special_holidays = [
(1, 27), # ์ค๋ ์ฐํด ์์๊ณตํด์ผ
(3, 3), # ์ผ์ผ์ ๋์ฒด๊ณตํด์ผ
(5, 6), # ์ด๋ฆฐ์ด๋ /๋ถ์ฒ๋์ค์ ๋ ๋์ฒด๊ณตํด์ผ
(6, 3), # ๋ํต๋ น์ ๊ฑฐ์ผ
(10, 8), # ์ถ์ ๋์ฒด๊ณตํด์ผ
]
if (dt.month, dt.day) in special_holidays:
return False
return True
def is_market_day(self, target_date: Optional[str] = None) -> bool:
"""
ํน์ ๋ ์ง๊ฐ ์์
์ผ์ธ์ง ํ์ธ
Args:
target_date: ํ์ธํ ๋ ์ง (YYYYMMDD), None์ด๋ฉด ์ค๋
Returns:
์์
์ผ์ด๋ฉด True
"""
if target_date:
dt = datetime.strptime(target_date, "%Y%m%d").date()
else:
dt = date.today()
return self._is_market_day(dt, KR())
def close(self):
"""๋ฆฌ์์ค ์ ๋ฆฌ"""
pass # ํ์ฌ๋ ํน๋ณํ ์ ๋ฆฌํ ๊ฒ ์์
def get_nearest_business_day_in_a_week(
self,
target_date: Optional[str] = None,
prev: bool = True
) -> str:
"""
pykrx ํธํ: ์ผ์ฃผ์ผ ๋ด ๊ฐ์ฅ ๊ฐ๊น์ด ์์
์ผ ์กฐํ
Args:
target_date: ๊ธฐ์ค์ผ (YYYYMMDD), None์ด๋ฉด ์ค๋
prev: True๋ฉด ๊ณผ๊ฑฐ ๋ฐฉํฅ, False๋ฉด ๋ฏธ๋ ๋ฐฉํฅ
Returns:
์์
์ผ (YYYYMMDD)
"""
# ํ์ฌ๋ ๊ณผ๊ฑฐ ๋ฐฉํฅ๋ง ์ง์ (prev=True)
return self.get_nearest_business_day(target_date)
# =============================================================================
# ๋ชจ๋ ๋ ๋ฒจ ํธ์ ๊ฐ์ฒด (pykrx ํธํ์ฉ)
# =============================================================================
# ์ฑ๊ธํค ํด๋ผ์ด์ธํธ (lazy initialization)
_default_client: Optional[KRXDataClient] = None
_last_session_check_time: Optional[datetime] = None
# ์ธ์
๊ฒ์ฆ ์๋ต ์๊ณ๊ฐ (์ด ์๊ฐ ๋ด์ ๊ฒ์ฆํ์ผ๋ฉด ์ฌ๊ฒ์ฆ ์๋ต)
FRESH_SESSION_THRESHOLD = timedelta(minutes=5)
def _get_client() -> KRXDataClient:
"""๊ธฐ๋ณธ ํด๋ผ์ด์ธํธ ๋ฐํ (lazy initialization)"""
global _default_client
if _default_client is None:
_default_client = KRXDataClient()
return _default_client
def ensure_session_valid() -> bool:
"""
[DEPRECATED] ์ธ์
์ ํจ์ฑ ํ์ธ - ํธ์ถํ ํ์ ์์
v0.3.19๋ถํฐ ๋ชจ๋ API ํธ์ถ ์ ์๋์ผ๋ก ์ธ์
์ด ๊ด๋ฆฌ๋ฉ๋๋ค:
- ์ธ์
๋ง๋ฃ ์ ์๋ ์ฌ๋ก๊ทธ์ธ
- ํ์ผ ๋ฝ์ผ๋ก ๋์ ๋ก๊ทธ์ธ ๋ฐฉ์ง
- ํ๋ก์ธ์ค ๊ฐ ์ธ์
๊ณต์
ํด๋ผ์ด์ธํธ๋ ๊ทธ๋ฅ ๋ฐ์ดํฐ๋ฅผ ์์ฒญํ๋ฉด ๋ฉ๋๋ค.
์ด ํจ์๋ ํ์ ํธํ์ฑ์ ์ํด ์ ์ง๋๋ฉฐ, ํธ์ถํด๋ ๋ฌดํดํฉ๋๋ค.
Returns:
ํญ์ True (์ธ์
์ด ์๋์ผ๋ก ๋ณต๊ตฌ๋๋ฏ๋ก)
"""
global _last_session_check_time
# ์ต๊ทผ์ ๊ฒ์ฆํ์ผ๋ฉด ์๋ต
if _last_session_check_time and datetime.now() - _last_session_check_time < FRESH_SESSION_THRESHOLD:
logger.debug(f"์ธ์
์ต๊ทผ ๊ฒ์ฆ๋จ ({_last_session_check_time}), ์๋ต")
return True
try:
client = _get_client()
# ์ธ์
์ด ์ด๋ฏธ ๋ก๊ทธ์ธ๋ ์ํ๋ฉด ์ฑ๊ณต
if client._auth_manager.is_logged_in:
_last_session_check_time = datetime.now()
logger.info("์ธ์
ํ๋ฆฌ์๋ฐ ์๋ฃ (๊ธฐ์กด ์ธ์
์ฌ์ฉ)")
return True
# ์ธ์
๊ฒ์ฆ ๋ฐ ํ์์ ๋ก๊ทธ์ธ
result = client._auth_manager.check_session()
if result:
_last_session_check_time = datetime.now()
logger.info("์ธ์
ํ๋ฆฌ์๋ฐ ์๋ฃ (์ธ์
๊ฒ์ฆ/๋ก๊ทธ์ธ)")
return result
except Exception as e:
logger.error(f"์ธ์
ํ๋ฆฌ์๋ฐ ์คํจ: {e}")
return False
def is_session_fresh() -> bool:
"""์ธ์
์ด ์ต๊ทผ์ ๊ฒ์ฆ๋์ด ์ฌ๊ฒ์ฆ์ด ๋ถํ์ํ์ง ํ์ธ"""
global _last_session_check_time
if _last_session_check_time is None:
return False
return datetime.now() - _last_session_check_time < FRESH_SESSION_THRESHOLD
def reset_session_freshness():
"""์ธ์
freshness ๋ฆฌ์
(ํ
์คํธ์ฉ)"""
global _last_session_check_time
_last_session_check_time = None
# pykrx.stock.stock_api ํธํ ํจ์๋ค
def get_market_ohlcv_by_date(fromdate: str, todate: str, ticker: str, adjusted: bool = True) -> DataFrame:
"""pykrx ํธํ: ๊ฐ๋ณ์ข
๋ชฉ OHLCV"""
return _get_client().get_market_ohlcv_by_date(fromdate, todate, ticker, adjusted)
def get_market_ohlcv_by_ticker(date: str, market: str = "ALL") -> DataFrame:
"""pykrx ํธํ: ํน์ ์ผ ์ ์ฒด ์ข
๋ชฉ OHLCV"""
return _get_client().get_market_ohlcv_by_ticker(date, market)
def get_market_cap_by_ticker(date: str, market: str = "ALL") -> DataFrame:
"""pykrx ํธํ: ํน์ ์ผ ์ ์ฒด ์ข
๋ชฉ ์๊ฐ์ด์ก"""
return _get_client().get_market_cap_by_ticker(date, market)
def get_market_cap_by_date(fromdate: str, todate: str, ticker: str) -> DataFrame:
"""pykrx ํธํ: ์๊ฐ์ด์ก"""
return _get_client().get_market_cap_by_date(fromdate, todate, ticker)
def get_market_fundamental_by_date(fromdate: str, todate: str, ticker: str) -> DataFrame:
"""pykrx ํธํ: ๊ธฐ๋ณธ ์งํ"""
return _get_client().get_market_fundamental_by_date(fromdate, todate, ticker)
def get_market_trading_volume_by_date(fromdate: str, todate: str, ticker: str, detail: bool = False) -> DataFrame:
"""pykrx ํธํ: ํฌ์์๋ณ ๊ฑฐ๋๋"""
return _get_client().get_market_trading_volume_by_date(fromdate, todate, ticker, detail)
def get_market_trading_value_by_date(fromdate: str, todate: str, ticker: str, on: str = "์๋งค์") -> DataFrame:
"""pykrx ํธํ: ์ผ์๋ณ ๊ฑฐ๋๋๊ธ"""
return _get_client().get_market_trading_value_by_date(fromdate, todate, ticker, on)
def get_market_trading_volume_by_investor(fromdate: str, todate: str, ticker: str, detail: bool = False) -> DataFrame:
"""pykrx ํธํ: ํฌ์์๋ณ ๊ฑฐ๋๋"""
return _get_client().get_market_trading_volume_by_date(fromdate, todate, ticker, detail)
def get_market_trading_value_by_investor(fromdate: str, todate: str, ticker: str, detail: bool = False) -> DataFrame:
"""pykrx ํธํ: ํฌ์์๋ณ ๊ฑฐ๋๋๊ธ"""
return _get_client().get_market_trading_value_by_investor(fromdate, todate, ticker, detail)
def get_market_ticker_list(date: Optional[str] = None, market: str = "KOSPI") -> List[str]:
"""pykrx ํธํ: ํฐ์ปค ๋ชฉ๋ก"""
return _get_client().get_market_ticker_list(date, market)
def get_market_ticker_name(ticker: str) -> str:
"""pykrx ํธํ: ํฐ์ปค ์ด๋ฆ"""
client = _get_client()
tickers = client.get_market_ticker_name(market="ALL")
return tickers.get(ticker, "")
def get_index_ohlcv_by_date(fromdate: str, todate: str, ticker: str, freq: str = "d") -> DataFrame:
"""pykrx ํธํ: ์ง์ OHLCV"""
return _get_client().get_index_ohlcv_by_date(fromdate, todate, ticker, freq)
def get_nearest_business_day_in_a_week(target_date: Optional[str] = None, prev: bool = True) -> str:
"""pykrx ํธํ: ์ผ์ฃผ์ผ ๋ด ๊ฐ์ฅ ๊ฐ๊น์ด ์์
์ผ"""
return _get_client().get_nearest_business_day_in_a_week(target_date, prev)
# =============================================================================
# ํ
์คํธ
# =============================================================================
def test_client():
"""ํด๋ผ์ด์ธํธ ํ
์คํธ"""
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
print("=" * 60)
print("KRX Data Client ํ
์คํธ")
print("=" * 60)
try:
client = KRXDataClient()
# ์ข
๋ชฉ ์ฝ๋ ์กฐํ
print("\n[1] ์ข
๋ชฉ์ฝ๋ ์กฐํ")
ticker_map = client.get_market_ticker_name(market="KOSPI")
print(f"KOSPI ์ข
๋ชฉ ์: {len(ticker_map)}")
# OHLCV ์กฐํ
print("\n[2] ์ผ์ฑ์ ์ OHLCV (2024-12-01 ~ 2024-12-20)")
df = client.get_market_ohlcv("20241201", "20241220", "005930")
print(df.head())
# PER/PBR ์กฐํ
print("\n[3] ์ผ์ฑ์ ์ PER/PBR (2024-12-01 ~ 2024-12-20)")
df = client.get_market_fundamental("20241201", "20241220", "005930")
print(df.head())
# ํฌ์์๋ณ ๊ฑฐ๋๋
print("\n[4] ์ผ์ฑ์ ์ ํฌ์์๋ณ ๊ฑฐ๋๋ (2024-12-01 ~ 2024-12-20)")
df = client.get_market_trading_volume_by_date("20241201", "20241220", "005930")
print(df.head())
# ์ง์ OHLCV
print("\n[5] KOSPI ์ง์ (2024-12-01 ~ 2024-12-20)")
df = client.get_index_ohlcv("20241201", "20241220", "1001")
print(df.head())
print("\n" + "=" * 60)
print("๋ชจ๋ ํ
์คํธ ์๋ฃ!")
print("=" * 60)
except KRX2FARequiredError as e:
print(f"\n[ERROR] {e}")
except Exception as e:
print(f"\n[ERROR] {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
test_client()