kospi-kosdaq
by dragon1086
Verified
import json
import logging
from datetime import datetime
from typing import Dict, Any, Union
from mcp.server.fastmcp import FastMCP
from pykrx.stock.stock_api import get_market_ohlcv, get_nearest_business_day_in_a_week, get_market_cap, \
get_market_fundamental_by_date, get_market_trading_volume_by_date
from pykrx.website.krx.market.wrap import get_market_ticker_and_name
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Create MCP server (add pykrx dependency)
mcp = FastMCP(
"kospi-kosdaq-stock-server",
dependencies=["pykrx"]
)
# Global variable to store ticker information in memory
TICKER_MAP: Dict[str, str] = {}
@mcp.tool()
def load_all_tickers() -> Dict[str, str]:
"""Loads all ticker symbols and names for KOSPI and KOSDAQ into memory.
Returns:
Dict[str, str]: A dictionary mapping tickers to stock names.
Example: {"005930": "삼성전자", "035720": "카카오", ...}
"""
try:
global TICKER_MAP
# If TICKER_MAP already has data, return it
if TICKER_MAP:
logging.debug(f"Returning cached ticker information with {len(TICKER_MAP)} stocks")
return TICKER_MAP
logging.debug("No cached data found. Loading KOSPI/KOSDAQ ticker symbols")
# Retrieve data based on today's date
today = get_nearest_business_day_in_a_week()
logging.debug(f"Reference date: {today}")
# get_market_ticker_and_name() returns a Series,
# where the index is the ticker and the values are the stock names
kospi_series = get_market_ticker_and_name(today, market="KOSPI")
kosdaq_series = get_market_ticker_and_name(today, market="KOSDAQ")
# Convert Series to dictionaries and merge them
TICKER_MAP.update(kospi_series.to_dict())
TICKER_MAP.update(kosdaq_series.to_dict())
logging.debug(f"Successfully stored information for {len(TICKER_MAP)} stocks")
return TICKER_MAP
except Exception as e:
error_message = f"Failed to retrieve ticker information: {str(e)}"
logging.error(error_message)
return {"error": error_message}
@mcp.resource("stock://tickers")
def get_ticker_map() -> str:
"""Retrieves the stored ticker symbol-name mapping information."""
try:
if not TICKER_MAP:
return json.dumps({"message": "No ticker information stored. Please run the load_all_tickers() tool first to load ticker information."})
# Return formatted for better readability
# result = ["[Ticker Symbol - Stock Name Mapping]"]
# for ticker, name in TICKER_MAP.items():
# result.append(f"- {ticker}: {name}")
# return "\n".join(result)
return json.dumps(TICKER_MAP)
except Exception as e:
return json.dumps({"error": f"Failed to retrieve ticker information: {str(e)}"})
@mcp.prompt()
def search_stock_data_prompt() -> str:
"""Prompt template for searching stock data."""
return """
Step-by-step guide for searching stock data by stock name:
1. First, load the ticker information for all stocks:
load_all_tickers()
2. Check the code of the desired stock from the loaded ticker information:
Refer to the stock://tickers resource to find the ticker corresponding to the stock name.
3. Retrieve the desired data using the found ticker:
Retrieve OHLCV (Open/High/Low/Close/Volume) data:
get_stock_ohlcv("start_date", "end_date", "ticker", adjusted=True)
Retrieve market capitalization data:
get_stock_market_cap("start_date", "end_date", "ticker")
Retrieve fundamental indicators (PER/PBR/Dividend Yield):
get_stock_fundamental("start_date", "end_date", "ticker")
Retrieve trading volume by investor type:
get_stock_trading_volume("start_date", "end_date", "ticker")
Example) To retrieve data for Samsung Electronics in January 2024:
1. load_all_tickers() # Load all tickers
2. Refer to stock://tickers # Check Samsung Electronics = 005930
3. get_stock_ohlcv("20240101", "20240131", "005930") # Retrieve OHLCV data
or
get_stock_market_cap("20240101", "20240131", "005930") # Retrieve market cap data
or
get_stock_fundamental("20240101", "20240131", "005930") # Retrieve fundamental data
or
get_stock_trading_volume("20240101", "20240131", "005930") # Retrieve trading volume
"""
@mcp.tool()
def get_stock_ohlcv(fromdate: Union[str, int], todate: Union[str, int], ticker: str, adjusted: bool = True) -> Dict[str, Any]:
"""Retrieves OHLCV (Open/High/Low/Close/Volume) data for a specific stock.
Args:
fromdate (str): Start date for retrieval (YYYYMMDD)
todate (str): End date for retrieval (YYYYMMDD)
ticker (str): Stock ticker symbol
adjusted (bool, optional): Whether to use adjusted prices (True: adjusted, False: unadjusted). Defaults to True.
Returns:
DataFrame:
>> get_stock_ohlcv("20210118", "20210126", "005930")
Open High Low Close Volume
Date
2021-01-26 89500 94800 89500 93800 46415214
2021-01-25 87300 89400 86800 88700 25577517
2021-01-22 89000 89700 86800 86800 30861661
2021-01-21 87500 88600 86500 88100 25318011
2021-01-20 89000 89000 86500 87200 25211127
2021-01-19 84500 88000 83600 87000 39895044
2021-01-18 86600 87300 84100 85000 43227951
"""
# Validate and convert date format
def validate_date(date_str: Union[str, int]) -> str:
try:
if isinstance(date_str, int):
date_str = str(date_str)
# Convert if in YYYY-MM-DD format
if '-' in date_str:
parsed_date = datetime.strptime(date_str, '%Y-%m-%d')
return parsed_date.strftime('%Y%m%d')
# Validate if in YYYYMMDD format
datetime.strptime(date_str, '%Y%m%d')
return date_str
except ValueError:
raise ValueError(f"Date must be in YYYYMMDD format. Input value: {date_str}")
try:
fromdate = validate_date(fromdate)
todate = validate_date(todate)
logging.debug(f"Retrieving stock OHLCV data: {ticker}, {fromdate}-{todate}, adjusted={adjusted}")
# Call get_market_ohlcv (changed adj -> adjusted)
df = get_market_ohlcv(fromdate, todate, ticker, adjusted=adjusted)
# Convert DataFrame to dictionary
result = df.to_dict(orient='index')
# Convert datetime index to string and sort in reverse
sorted_items = sorted(
((k.strftime('%Y-%m-%d'), v) for k, v in result.items()),
reverse=True
)
result = dict(sorted_items)
return json.dumps(result)
except Exception as e:
error_message = f"Data retrieval failed: {str(e)}"
logging.error(error_message)
return json.dumps({"error": error_message})
@mcp.resource("stock://format-guide")
def get_format_guide() -> str:
"""Provides a guide for date format and ticker symbol input."""
return """
[Input Format Guide]
1. Ticker symbol: 6-digit number (e.g., 005930 - Samsung Electronics)
2. Date format: YYYYMMDD (e.g., 20240301) or YYYY-MM-DD (e.g., 2024-03-01)
[Notes]
- The start date must be earlier than the end date.
- If adjusted=True, adjusted prices are retrieved; if False, unadjusted prices are retrieved.
"""
@mcp.resource("stock://popular-tickers")
def get_popular_tickers() -> str:
"""Provides a list of frequently queried ticker symbols."""
return """
[Frequently Queried Ticker Symbols]
- 005930: 삼성전자
- 000660: SK하이닉스
- 373220: LG에너지솔루션
- 035420: NAVER
- 035720: 카카오
"""
@mcp.prompt()
def get_stock_data_prompt() -> str:
"""Prompt template for retrieving stock data."""
return """
Please enter the following information to retrieve stock OHLCV data:
1. Ticker symbol: 6-digit number (e.g., 005930)
2. Start date: YYYYMMDD format (e.g., 20240101)
3. End date: YYYYMMDD format (e.g., 20240301)
4. Adjusted price: True/False (default: True)
Example) get_stock_ohlcv("20240101", "20240301", "005930", adjusted=True)
"""
@mcp.tool()
def get_stock_market_cap(fromdate: Union[str, int], todate: Union[str, int], ticker: str) -> Dict[str, Any]:
"""Retrieves market capitalization data for a specific stock.
Args:
fromdate (str): Start date for retrieval (YYYYMMDD)
todate (str): End date for retrieval (YYYYMMDD)
ticker (str): Stock ticker symbol
Returns:
DataFrame:
>> get_stock_market_cap("20150720", "20150724", "005930")
Market Cap Volume Trading Value Listed Shares
Date
2015-07-24 181030885173000 196584 241383636000 147299337
2015-07-23 181767381858000 208965 259446564000 147299337
2015-07-22 184566069261000 268323 333813094000 147299337
2015-07-21 186039062631000 194055 244129106000 147299337
2015-07-20 187806654675000 128928 165366199000 147299337
"""
# Validate and convert date format
def validate_date(date_str: Union[str, int]) -> str:
try:
if isinstance(date_str, int):
date_str = str(date_str)
# Convert if in YYYY-MM-DD format
if '-' in date_str:
parsed_date = datetime.strptime(date_str, '%Y-%m-%d')
return parsed_date.strftime('%Y%m%d')
# Validate if in YYYYMMDD format
datetime.strptime(date_str, '%Y%m%d')
return date_str
except ValueError:
raise ValueError(f"Date must be in YYYYMMDD format. Input value: {date_str}")
try:
fromdate = validate_date(fromdate)
todate = validate_date(todate)
logging.debug(f"Retrieving stock market capitalization data: {ticker}, {fromdate}-{todate}")
# Call get_market_cap
df = get_market_cap(fromdate, todate, ticker)
# Convert DataFrame to dictionary
result = df.to_dict(orient='index')
# Convert datetime index to string and sort in reverse
sorted_items = sorted(
((k.strftime('%Y-%m-%d'), v) for k, v in result.items()),
reverse=True
)
result = dict(sorted_items)
return json.dumps(result)
except Exception as e:
error_message = f"Data retrieval failed: {str(e)}"
logging.error(error_message)
return json.dumps({"error": error_message})
@mcp.tool()
def get_stock_fundamental(fromdate: Union[str, int], todate: Union[str, int], ticker: str) -> Dict[str, Any]:
"""Retrieves fundamental data (PER/PBR/Dividend Yield) for a specific stock.
Args:
fromdate (str): Start date for retrieval (YYYYMMDD)
todate (str): End date for retrieval (YYYYMMDD)
ticker (str): Stock ticker symbol
Returns:
DataFrame:
>> get_stock_fundamental("20210104", "20210108", "005930")
BPS PER PBR EPS DIV DPS
Date
2021-01-08 37528 28.046875 2.369141 3166 1.589844 1416
2021-01-07 37528 26.187500 2.210938 3166 1.709961 1416
2021-01-06 37528 25.953125 2.189453 3166 1.719727 1416
2021-01-05 37528 26.500000 2.240234 3166 1.690430 1416
2021-01-04 37528 26.218750 2.210938 3166 1.709961 1416
"""
# Validate and convert date format
def validate_date(date_str: Union[str, int]) -> str:
try:
if isinstance(date_str, int):
date_str = str(date_str)
if '-' in date_str:
parsed_date = datetime.strptime(date_str, '%Y-%m-%d')
return parsed_date.strftime('%Y%m%d')
datetime.strptime(date_str, '%Y%m%d')
return date_str
except ValueError:
raise ValueError(f"Date must be in YYYYMMDD format. Input value: {date_str}")
try:
fromdate = validate_date(fromdate)
todate = validate_date(todate)
logging.debug(f"Retrieving stock fundamental data: {ticker}, {fromdate}-{todate}")
# Call get_market_fundamental_by_date
df = get_market_fundamental_by_date(fromdate, todate, ticker)
# Convert DataFrame to dictionary
result = df.to_dict(orient='index')
# Convert datetime index to string and sort in reverse
sorted_items = sorted(
((k.strftime('%Y-%m-%d'), v) for k, v in result.items()),
reverse=True
)
result = dict(sorted_items)
return json.dumps(result)
except Exception as e:
error_message = f"Data retrieval failed: {str(e)}"
logging.error(error_message)
return json.dumps({"error": error_message})
@mcp.tool()
def get_stock_trading_volume(fromdate: Union[str, int], todate: Union[str, int], ticker: str) -> Dict[str, Any]:
"""Retrieves trading volume by investor type for a specific stock.
Args:
fromdate (str): Start date for retrieval (YYYYMMDD)
todate (str): End date for retrieval (YYYYMMDD)
ticker (str): Stock ticker symbol
Returns:
DataFrame with columns:
- Volume (Sell/Buy/Net Buy)
- Trading Value (Sell/Buy/Net Buy)
Broken down by investor types (Financial Investment, Insurance, Trust, etc.)
"""
# Validate and convert date format
def validate_date(date_str: Union[str, int]) -> str:
try:
if isinstance(date_str, int):
date_str = str(date_str)
if '-' in date_str:
parsed_date = datetime.strptime(date_str, '%Y-%m-%d')
return parsed_date.strftime('%Y%m%d')
datetime.strptime(date_str, '%Y%m%d')
return date_str
except ValueError:
raise ValueError(f"Date must be in YYYYMMDD format. Input value: {date_str}")
try:
fromdate = validate_date(fromdate)
todate = validate_date(todate)
logging.debug(f"Retrieving stock trading volume by investor type: {ticker}, {fromdate}-{todate}")
# Call get_market_trading_volume_by_date
df = get_market_trading_volume_by_date(fromdate, todate, ticker)
# Convert DataFrame to dictionary
result = df.to_dict(orient='index')
# Convert datetime index to string and sort in reverse
sorted_items = sorted(
((k.strftime('%Y-%m-%d'), v) for k, v in result.items()),
reverse=True
)
result = dict(sorted_items)
return result
except Exception as e:
error_message = f"Data retrieval failed: {str(e)}"
logging.error(error_message)
return {"error": error_message}
def main():
mcp.run()
if __name__ == "__main__":
main()