Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
test_liquidity_tools.py26.5 kB
"""유동성 분석 도구 테스트""" import pytest import random from datetime import datetime, timedelta from unittest.mock import AsyncMock, MagicMock from typing import Dict, List, Any from src.tools.liquidity_tools import LiquidityAnalysisTool from src.exceptions import DataValidationError, DatabaseConnectionError class TestLiquidityAnalysisTool: """유동성 분석 도구 테스트""" @pytest.fixture def mock_db_manager(self): """Mock 데이터베이스 매니저""" return AsyncMock() @pytest.fixture def mock_cache_manager(self): """Mock 캐시 매니저""" return AsyncMock() @pytest.fixture def liquidity_tool(self, mock_db_manager, mock_cache_manager): """유동성 분석 도구 인스턴스""" return LiquidityAnalysisTool(mock_db_manager, mock_cache_manager) @pytest.fixture def sample_liquidity_data(self): """샘플 유동성 데이터""" base_date = datetime.now().date() data = [] markets = ["KOSPI", "KOSDAQ"] for i in range(30): # 30일 데이터 date = base_date - timedelta(days=i) for market in markets: # 시장별 유동성 패턴 시뮬레이션 if market == "KOSPI": # KOSPI는 더 높은 유동성 daily_volume = random.gauss(500000000000, 100000000000) # 5천억 중심 bid_ask_spread = random.gauss(0.02, 0.005) # 2bp 중심 market_depth = random.gauss(1000000000, 200000000) # 10억 중심 else: # KOSDAQ은 상대적으로 낮은 유동성 daily_volume = random.gauss(200000000000, 50000000000) # 2천억 중심 bid_ask_spread = random.gauss(0.05, 0.01) # 5bp 중심 market_depth = random.gauss(500000000, 100000000) # 5억 중심 data.append({ "date": date, "market": market, "daily_volume": max(daily_volume, 0), "daily_transaction_value": daily_volume * random.gauss(2500, 500), "bid_ask_spread": max(bid_ask_spread, 0.001), "market_depth_buy": max(market_depth, 0), "market_depth_sell": max(market_depth, 0), "turnover_rate": random.gauss(0.15, 0.05), # 15% 중심 "price_impact": random.gauss(0.001, 0.0005), "volatility": random.gauss(0.02, 0.01), "active_stocks_count": random.randint(800, 1200), "block_trade_volume": random.gauss(50000000000, 10000000000) }) return data @pytest.fixture def sample_intraday_data(self): """샘플 일중 유동성 데이터""" base_time = datetime.now() data = [] # 하루 중 30분 간격 데이터 for i in range(16): # 9:00 ~ 15:30 timestamp = base_time.replace(hour=9, minute=0) + timedelta(minutes=i*30) # 장 시작/마감 시간대 유동성 패턴 if i < 2 or i > 13: # 장 시작/마감 1시간 volume_multiplier = 1.5 spread_multiplier = 1.3 else: # 중간 시간대 volume_multiplier = 1.0 spread_multiplier = 1.0 data.append({ "timestamp": timestamp, "market": "KOSPI", "volume": random.gauss(30000000000, 5000000000) * volume_multiplier, "bid_ask_spread": random.gauss(0.02, 0.005) * spread_multiplier, "market_depth": random.gauss(1000000000, 200000000), "trade_count": random.randint(5000, 15000), "large_trade_ratio": random.gauss(0.25, 0.05) }) return data def test_tool_initialization(self, liquidity_tool, mock_db_manager, mock_cache_manager): """도구 초기화 테스트""" assert liquidity_tool.name == "analyze_liquidity" assert liquidity_tool.description is not None assert "유동성" in liquidity_tool.description or "liquidity" in liquidity_tool.description.lower() assert liquidity_tool.db_manager == mock_db_manager assert liquidity_tool.cache_manager == mock_cache_manager def test_tool_definition(self, liquidity_tool): """도구 정의 테스트""" definition = liquidity_tool.get_tool_definition() assert definition.name == "analyze_liquidity" assert definition.description is not None assert definition.inputSchema is not None # 입력 스키마 검증 schema = definition.inputSchema assert schema["type"] == "object" assert "properties" in schema properties = schema["properties"] assert "analysis_type" in properties assert "markets" in properties assert "time_period" in properties assert "include_intraday" in properties # analysis_type 파라미터 검증 analysis_prop = properties["analysis_type"] assert analysis_prop["type"] == "array" assert "market_liquidity" in str(analysis_prop) assert "spread_analysis" in str(analysis_prop) assert "depth_analysis" in str(analysis_prop) @pytest.mark.asyncio async def test_execute_market_liquidity_analysis(self, liquidity_tool, sample_liquidity_data): """시장 유동성 분석 테스트""" # 캐시 미스 liquidity_tool.cache_manager.get.return_value = None # 데이터베이스 응답 설정 liquidity_tool.db_manager.fetch_all.return_value = sample_liquidity_data # 실행 result = await liquidity_tool.execute({ "analysis_type": ["market_liquidity"], "markets": ["KOSPI", "KOSDAQ"], "time_period": "30d" }) # 결과 검증 assert len(result) == 1 content = result[0] assert content.type == "text" # JSON 파싱하여 내용 확인 import json data = json.loads(content.text) assert "liquidity_analysis" in data assert "market_liquidity" in data["liquidity_analysis"] # 시장 유동성 결과 검증 market_liquidity = data["liquidity_analysis"]["market_liquidity"] assert "overall_liquidity_score" in market_liquidity assert "market_comparison" in market_liquidity assert "liquidity_trends" in market_liquidity assert "volume_analysis" in market_liquidity # 시장별 비교 데이터 확인 market_comparison = market_liquidity["market_comparison"] assert "KOSPI" in market_comparison assert "KOSDAQ" in market_comparison kospi_data = market_comparison["KOSPI"] assert "liquidity_score" in kospi_data assert "average_volume" in kospi_data assert "average_spread" in kospi_data @pytest.mark.asyncio async def test_execute_spread_analysis(self, liquidity_tool, sample_liquidity_data): """스프레드 분석 테스트""" liquidity_tool.cache_manager.get.return_value = None liquidity_tool.db_manager.fetch_all.return_value = sample_liquidity_data # 실행 result = await liquidity_tool.execute({ "analysis_type": ["spread_analysis"], "markets": ["KOSPI"], "time_period": "20d", "include_percentiles": True }) # 결과 검증 content = result[0] import json data = json.loads(content.text) assert "spread_analysis" in data["liquidity_analysis"] spread_analysis = data["liquidity_analysis"]["spread_analysis"] assert "average_spread" in spread_analysis assert "spread_volatility" in spread_analysis assert "spread_percentiles" in spread_analysis assert "time_of_day_patterns" in spread_analysis # 백분위수 확인 percentiles = spread_analysis["spread_percentiles"] assert "p25" in percentiles assert "p50" in percentiles assert "p75" in percentiles assert "p95" in percentiles @pytest.mark.asyncio async def test_execute_depth_analysis(self, liquidity_tool, sample_liquidity_data): """시장 깊이 분석 테스트""" liquidity_tool.cache_manager.get.return_value = None liquidity_tool.db_manager.fetch_all.return_value = sample_liquidity_data # 실행 result = await liquidity_tool.execute({ "analysis_type": ["depth_analysis"], "markets": ["KOSPI", "KOSDAQ"], "time_period": "15d" }) # 결과 검증 content = result[0] import json data = json.loads(content.text) assert "depth_analysis" in data["liquidity_analysis"] depth_analysis = data["liquidity_analysis"]["depth_analysis"] assert "average_market_depth" in depth_analysis assert "depth_imbalance" in depth_analysis assert "depth_resilience" in depth_analysis assert "market_making_activity" in depth_analysis # 시장 깊이 데이터 확인 avg_depth = depth_analysis["average_market_depth"] assert "total_depth" in avg_depth assert "buy_side_depth" in avg_depth assert "sell_side_depth" in avg_depth @pytest.mark.asyncio async def test_execute_turnover_analysis(self, liquidity_tool, sample_liquidity_data): """회전율 분석 테스트""" liquidity_tool.cache_manager.get.return_value = None liquidity_tool.db_manager.fetch_all.return_value = sample_liquidity_data # 실행 result = await liquidity_tool.execute({ "analysis_type": ["turnover_analysis"], "markets": ["KOSPI"], "time_period": "30d", "include_sector_breakdown": True }) # 결과 검증 content = result[0] import json data = json.loads(content.text) assert "turnover_analysis" in data["liquidity_analysis"] turnover_analysis = data["liquidity_analysis"]["turnover_analysis"] assert "average_turnover_rate" in turnover_analysis assert "turnover_volatility" in turnover_analysis assert "high_turnover_stocks" in turnover_analysis assert "sector_turnover" in turnover_analysis # 섹터별 회전율 확인 sector_turnover = turnover_analysis["sector_turnover"] assert isinstance(sector_turnover, dict) @pytest.mark.asyncio async def test_execute_price_impact_analysis(self, liquidity_tool, sample_liquidity_data): """가격 충격 분석 테스트""" liquidity_tool.cache_manager.get.return_value = None liquidity_tool.db_manager.fetch_all.return_value = sample_liquidity_data # 실행 result = await liquidity_tool.execute({ "analysis_type": ["price_impact"], "markets": ["KOSPI", "KOSDAQ"], "time_period": "30d", "trade_size_analysis": True }) # 결과 검증 content = result[0] import json data = json.loads(content.text) assert "price_impact" in data["liquidity_analysis"] price_impact = data["liquidity_analysis"]["price_impact"] assert "average_price_impact" in price_impact assert "impact_by_trade_size" in price_impact assert "market_resilience" in price_impact assert "temporary_vs_permanent_impact" in price_impact # 거래 규모별 가격 충격 확인 impact_by_size = price_impact["impact_by_trade_size"] assert "small_trades" in impact_by_size assert "medium_trades" in impact_by_size assert "large_trades" in impact_by_size @pytest.mark.asyncio async def test_comprehensive_liquidity_analysis(self, liquidity_tool, sample_liquidity_data): """종합 유동성 분석 테스트""" liquidity_tool.cache_manager.get.return_value = None liquidity_tool.db_manager.fetch_all.return_value = sample_liquidity_data # 실행 result = await liquidity_tool.execute({ "analysis_type": ["market_liquidity", "spread_analysis", "depth_analysis", "turnover_analysis"], "markets": ["KOSPI", "KOSDAQ"], "time_period": "30d", "include_risk_metrics": True, "include_forecasts": True }) # 결과 검증 content = result[0] import json data = json.loads(content.text) analysis = data["liquidity_analysis"] assert "market_liquidity" in analysis assert "spread_analysis" in analysis assert "depth_analysis" in analysis assert "turnover_analysis" in analysis # 리스크 메트릭 및 예측 assert "risk_metrics" in data assert "forecasts" in data risk_metrics = data["risk_metrics"] assert "liquidity_risk_score" in risk_metrics assert "market_stress_indicators" in risk_metrics forecasts = data["forecasts"] assert "liquidity_trend_forecast" in forecasts assert "expected_spread_range" in forecasts @pytest.mark.asyncio async def test_intraday_liquidity_analysis(self, liquidity_tool, sample_intraday_data): """일중 유동성 분석 테스트""" liquidity_tool.cache_manager.get.return_value = None liquidity_tool.db_manager.fetch_all.return_value = sample_intraday_data # 실행 result = await liquidity_tool.execute({ "analysis_type": ["intraday_patterns"], "markets": ["KOSPI"], "time_period": "1d", "include_intraday": True }) # 결과 검증 content = result[0] import json data = json.loads(content.text) assert "intraday_patterns" in data["liquidity_analysis"] intraday = data["liquidity_analysis"]["intraday_patterns"] assert "hourly_liquidity" in intraday assert "opening_closing_effects" in intraday assert "lunch_break_impact" in intraday assert "volume_concentration" in intraday # 시간별 유동성 패턴 확인 hourly_liquidity = intraday["hourly_liquidity"] assert isinstance(hourly_liquidity, list) assert len(hourly_liquidity) > 0 def test_liquidity_score_calculation(self, liquidity_tool): """유동성 점수 계산 테스트""" # 테스트 데이터 market_data = { "daily_volume": 500000000000, # 5천억 "bid_ask_spread": 0.02, # 2bp "market_depth_buy": 1000000000, # 10억 "market_depth_sell": 1000000000, # 10억 "turnover_rate": 0.15, # 15% "price_impact": 0.001, # 0.1% "volatility": 0.02 # 2% } score = liquidity_tool._calculate_liquidity_score(market_data) assert "overall_score" in score assert "volume_score" in score assert "spread_score" in score assert "depth_score" in score assert "impact_score" in score # 점수는 0-100 범위 assert 0 <= score["overall_score"] <= 100 assert 0 <= score["volume_score"] <= 100 assert 0 <= score["spread_score"] <= 100 def test_spread_percentile_calculation(self, liquidity_tool): """스프레드 백분위수 계산 테스트""" # 테스트 데이터 spread_data = [0.01, 0.015, 0.02, 0.025, 0.03, 0.035, 0.04, 0.045, 0.05, 0.055] percentiles = liquidity_tool._calculate_spread_percentiles(spread_data) assert "p25" in percentiles assert "p50" in percentiles assert "p75" in percentiles assert "p95" in percentiles # 백분위수 순서 확인 assert percentiles["p25"] <= percentiles["p50"] assert percentiles["p50"] <= percentiles["p75"] assert percentiles["p75"] <= percentiles["p95"] def test_market_depth_analysis(self, liquidity_tool): """시장 깊이 분석 테스트""" depth_data = [ {"market_depth_buy": 1000000000, "market_depth_sell": 800000000}, {"market_depth_buy": 1200000000, "market_depth_sell": 1000000000}, {"market_depth_buy": 900000000, "market_depth_sell": 1100000000}, ] depth_analysis = liquidity_tool._analyze_market_depth(depth_data) assert "average_total_depth" in depth_analysis assert "buy_sell_imbalance" in depth_analysis assert "depth_stability" in depth_analysis # 평균 깊이 계산 확인 expected_avg_buy = (1000000000 + 1200000000 + 900000000) / 3 expected_avg_sell = (800000000 + 1000000000 + 1100000000) / 3 expected_total = expected_avg_buy + expected_avg_sell assert depth_analysis["average_total_depth"] == expected_total def test_turnover_volatility_calculation(self, liquidity_tool): """회전율 변동성 계산 테스트""" turnover_data = [ {"turnover_rate": 0.10}, {"turnover_rate": 0.15}, {"turnover_rate": 0.20}, {"turnover_rate": 0.12}, {"turnover_rate": 0.18} ] volatility = liquidity_tool._calculate_turnover_volatility(turnover_data) assert "volatility" in volatility assert "coefficient_of_variation" in volatility assert "volatility_trend" in volatility # 변동성은 양수 assert volatility["volatility"] >= 0 assert volatility["coefficient_of_variation"] >= 0 def test_price_impact_by_trade_size(self, liquidity_tool): """거래 규모별 가격 충격 테스트""" impact_data = [ {"trade_size": 1000000, "price_impact": 0.0005}, # 소액 {"trade_size": 10000000, "price_impact": 0.001}, # 중간 {"trade_size": 100000000, "price_impact": 0.005}, # 대형 {"trade_size": 500000000, "price_impact": 0.015}, # 초대형 ] impact_analysis = liquidity_tool._analyze_price_impact_by_size(impact_data) assert "small_trades" in impact_analysis assert "medium_trades" in impact_analysis assert "large_trades" in impact_analysis assert "block_trades" in impact_analysis # 거래 규모가 클수록 가격 충격이 커야 함 assert impact_analysis["small_trades"]["avg_impact"] < impact_analysis["large_trades"]["avg_impact"] def test_intraday_pattern_detection(self, liquidity_tool): """일중 패턴 탐지 테스트""" # 시간별 거래량 데이터 (9시~15시) hourly_data = [] hours = [9, 10, 11, 12, 13, 14, 15] volumes = [150, 100, 80, 60, 80, 100, 140] # 장 시작/마감 높음 for hour, volume in zip(hours, volumes): hourly_data.append({ "hour": hour, "volume": volume * 1000000000, "trade_count": volume * 100, "avg_spread": 0.02 if hour in [9, 15] else 0.015 }) patterns = liquidity_tool._detect_intraday_patterns(hourly_data) assert "peak_hours" in patterns assert "low_liquidity_hours" in patterns assert "opening_effect" in patterns assert "closing_effect" in patterns # 장 시작/마감 효과 확인 assert patterns["opening_effect"]["enhanced_volume"] == True assert patterns["closing_effect"]["enhanced_volume"] == True def test_liquidity_risk_assessment(self, liquidity_tool): """유동성 리스크 평가 테스트""" risk_data = { "average_spread": 0.035, # 높은 스프레드 "spread_volatility": 0.015, # 높은 변동성 "market_depth": 500000000, # 낮은 깊이 "turnover_rate": 0.05, # 낮은 회전율 "price_impact": 0.008 # 높은 가격 충격 } risk_assessment = liquidity_tool._assess_liquidity_risk(risk_data) assert "overall_risk_level" in risk_assessment assert "risk_factors" in risk_assessment assert "risk_score" in risk_assessment assert "recommendations" in risk_assessment # 리스크 레벨은 low/medium/high 중 하나 assert risk_assessment["overall_risk_level"] in ["low", "medium", "high"] assert risk_assessment["risk_score"] >= 0 @pytest.mark.asyncio async def test_liquidity_forecast(self, liquidity_tool): """유동성 예측 테스트""" # 시계열 유동성 데이터 historical_data = [] for i in range(20): historical_data.append({ "date": (datetime.now() - timedelta(days=i)).date(), "liquidity_score": 75 + i * 0.5 + random.gauss(0, 2), "volume": 400000000000 + i * 10000000000, "spread": 0.02 + i * 0.0001 }) forecast = await liquidity_tool._forecast_liquidity(historical_data, forecast_days=5) assert "liquidity_scores" in forecast assert "volume_forecast" in forecast assert "spread_forecast" in forecast assert "confidence_intervals" in forecast # 5일 예측 확인 assert len(forecast["liquidity_scores"]) == 5 assert len(forecast["volume_forecast"]) == 5 assert len(forecast["spread_forecast"]) == 5 @pytest.mark.asyncio async def test_cache_functionality(self, liquidity_tool): """캐시 기능 테스트""" # 캐시 히트 시나리오 cached_data = { "timestamp": datetime.now().isoformat(), "liquidity_analysis": { "market_liquidity": {"overall_liquidity_score": 85} } } liquidity_tool.cache_manager.get.return_value = cached_data # 실행 result = await liquidity_tool.execute({ "analysis_type": ["market_liquidity"], "markets": ["KOSPI"] }) # 캐시에서 데이터 반환 확인 content = result[0] import json data = json.loads(content.text) assert data["liquidity_analysis"]["market_liquidity"]["overall_liquidity_score"] == 85 # 데이터베이스 호출 없음 확인 liquidity_tool.db_manager.fetch_all.assert_not_called() @pytest.mark.asyncio async def test_error_handling(self, liquidity_tool): """에러 처리 테스트""" liquidity_tool.cache_manager.get.return_value = None liquidity_tool.db_manager.fetch_all.side_effect = DatabaseConnectionError("DB 연결 실패") with pytest.raises(DatabaseConnectionError): await liquidity_tool.execute({ "analysis_type": ["market_liquidity"] }) @pytest.mark.asyncio async def test_invalid_parameters(self, liquidity_tool): """잘못된 파라미터 테스트""" # 빈 분석 타입 with pytest.raises(ValueError, match="At least one analysis type"): await liquidity_tool.execute({ "analysis_type": [] }) # 잘못된 분석 타입 with pytest.raises(ValueError, match="Invalid analysis type"): await liquidity_tool.execute({ "analysis_type": ["invalid_analysis"] }) # 잘못된 시간 기간 with pytest.raises(ValueError, match="Invalid time period"): await liquidity_tool.execute({ "analysis_type": ["market_liquidity"], "time_period": "invalid_period" }) @pytest.mark.asyncio async def test_insufficient_data_handling(self, liquidity_tool): """데이터 부족 처리 테스트""" # 데이터가 부족한 경우 insufficient_data = [ { "date": datetime.now().date(), "market": "KOSPI", "daily_volume": 100000000000, "bid_ask_spread": 0.02 } ] liquidity_tool.cache_manager.get.return_value = None liquidity_tool.db_manager.fetch_all.return_value = insufficient_data result = await liquidity_tool.execute({ "analysis_type": ["market_liquidity"], "time_period": "30d" }) content = result[0] import json data = json.loads(content.text) assert "warning" in data or "insufficient data" in str(data).lower() def test_market_stress_detection(self, liquidity_tool): """시장 스트레스 탐지 테스트""" # 스트레스 상황 데이터 stress_data = { "bid_ask_spread": 0.08, # 매우 높은 스프레드 "market_depth": 200000000, # 매우 낮은 깊이 "volume": 1000000000000, # 비정상적 높은 거래량 "price_impact": 0.02, # 높은 가격 충격 "volatility": 0.05 # 높은 변동성 } stress_indicators = liquidity_tool._detect_market_stress(stress_data) assert "stress_level" in stress_indicators assert "stress_factors" in stress_indicators assert "liquidity_crunch_risk" in stress_indicators # 스트레스 상황이므로 높은 스트레스 레벨 assert stress_indicators["stress_level"] in ["medium", "high"] assert len(stress_indicators["stress_factors"]) > 0

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server