Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
test_anomaly_tools.pyโ€ข17.5 kB
"""์ด์ƒ ์ง•ํ›„ ํƒ์ง€ ๋„๊ตฌ ํ…Œ์ŠคํŠธ""" import pytest import random import math from datetime import datetime, timedelta from unittest.mock import AsyncMock, MagicMock from src.tools.anomaly_tools import AnomalyDetectionTool from src.exceptions import DataValidationError, DatabaseConnectionError class TestAnomalyDetectionTool: """์ด์ƒ ์ง•ํ›„ ํƒ์ง€ ๋„๊ตฌ ํ…Œ์ŠคํŠธ""" @pytest.fixture def mock_db_manager(self): """Mock ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋งค๋‹ˆ์ €""" return AsyncMock() @pytest.fixture def mock_cache_manager(self): """Mock ์บ์‹œ ๋งค๋‹ˆ์ €""" return AsyncMock() @pytest.fixture def anomaly_tool(self, mock_db_manager, mock_cache_manager): """์ด์ƒ ์ง•ํ›„ ํƒ์ง€ ๋„๊ตฌ ์ธ์Šคํ„ด์Šค""" return AnomalyDetectionTool(mock_db_manager, mock_cache_manager) @pytest.fixture def sample_market_data(self): """์ƒ˜ํ”Œ ์‹œ์žฅ ๋ฐ์ดํ„ฐ (์ •์ƒ ํŒจํ„ด + ์ด์ƒ์น˜ ํฌํ•จ)""" base_date = datetime.now().date() data = [] for i in range(100): # 100์ผ ๋ฐ์ดํ„ฐ date = base_date - timedelta(days=i) # ๋Œ€๋ถ€๋ถ„ ์ •์ƒ ๋ฐ์ดํ„ฐ if i < 90: price = 2650 + random.gauss(0, 30) # ์ •์ƒ ๋ณ€๋™ volume = 450000000 + random.gauss(0, 50000000) # ์ •์ƒ ๊ฑฐ๋ž˜๋Ÿ‰ volatility = 0.015 + random.uniform(-0.005, 0.005) # ์ •์ƒ ๋ณ€๋™์„ฑ else: # ์ด์ƒ์น˜ ๋ฐ์ดํ„ฐ (๋งˆ์ง€๋ง‰ 10์ผ) price = 2650 + random.gauss(0, 150) # ํฐ ๋ณ€๋™ volume = 450000000 + random.gauss(0, 200000000) # ๊ฑฐ๋ž˜๋Ÿ‰ ๊ธ‰์ฆ volatility = 0.035 + random.uniform(-0.015, 0.015) # ๋†’์€ ๋ณ€๋™์„ฑ data.append({ "date": date, "market": "KOSPI", "close_price": max(price, 100), # ์Œ์ˆ˜ ๋ฐฉ์ง€ "volume": max(int(volume), 1000000), # ์ตœ์†Œ๊ฐ’ ๋ณด์žฅ "daily_return": random.gauss(0.001, 0.02), "volatility": max(volatility, 0.001), # ์–‘์ˆ˜ ๋ณด์žฅ "vix": 20 + random.gauss(0, 5), "put_call_ratio": 0.8 + random.uniform(-0.3, 0.3) }) return data def test_tool_initialization(self, anomaly_tool, mock_db_manager, mock_cache_manager): """๋„๊ตฌ ์ดˆ๊ธฐํ™” ํ…Œ์ŠคํŠธ""" assert anomaly_tool.name == "detect_market_anomalies" assert anomaly_tool.description is not None assert "์ด์ƒ ์ง•ํ›„" in anomaly_tool.description or "anomaly" in anomaly_tool.description.lower() assert anomaly_tool.db_manager == mock_db_manager assert anomaly_tool.cache_manager == mock_cache_manager def test_tool_definition(self, anomaly_tool): """๋„๊ตฌ ์ •์˜ ํ…Œ์ŠคํŠธ""" definition = anomaly_tool.get_tool_definition() assert definition.name == "detect_market_anomalies" assert definition.description is not None assert definition.inputSchema is not None # ์ž…๋ ฅ ์Šคํ‚ค๋งˆ ๊ฒ€์ฆ schema = definition.inputSchema assert schema["type"] == "object" assert "properties" in schema properties = schema["properties"] assert "market" in properties assert "detection_methods" in properties assert "lookback_period" in properties assert "sensitivity" in properties assert "include_realtime_alerts" in properties # detection_methods ํŒŒ๋ผ๋ฏธํ„ฐ ๊ฒ€์ฆ methods_prop = properties["detection_methods"] assert methods_prop["type"] == "array" assert "statistical" in str(methods_prop) assert "isolation_forest" in str(methods_prop) assert "timeseries" in str(methods_prop) @pytest.mark.asyncio async def test_execute_statistical_anomaly_detection(self, anomaly_tool, sample_market_data): """ํ†ต๊ณ„์  ์ด์ƒ ํƒ์ง€ ํ…Œ์ŠคํŠธ""" # ์บ์‹œ ๋ฏธ์Šค anomaly_tool.cache_manager.get.return_value = None # ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์‘๋‹ต ์„ค์ • anomaly_tool.db_manager.fetch_all.return_value = sample_market_data # ์‹คํ–‰ result = await anomaly_tool.execute({ "market": "KOSPI", "detection_methods": ["statistical"], "lookback_period": "30d", "sensitivity": 2.0 }) # ๊ฒฐ๊ณผ ๊ฒ€์ฆ assert len(result) == 1 content = result[0] assert content.type == "text" # JSON ํŒŒ์‹ฑํ•˜์—ฌ ๋‚ด์šฉ ํ™•์ธ import json data = json.loads(content.text) assert "timestamp" in data assert "market" in data assert "anomaly_detection_results" in data # ํ†ต๊ณ„์  ์ด์ƒ ํƒ์ง€ ๊ฒฐ๊ณผ ๊ฒ€์ฆ results = data["anomaly_detection_results"] assert "statistical" in results stat_results = results["statistical"] assert "z_score_anomalies" in stat_results assert "iqr_anomalies" in stat_results assert "anomaly_count" in stat_results assert "anomaly_percentage" in stat_results @pytest.mark.asyncio async def test_execute_isolation_forest_detection(self, anomaly_tool, sample_market_data): """Isolation Forest ์ด์ƒ ํƒ์ง€ ํ…Œ์ŠคํŠธ""" anomaly_tool.cache_manager.get.return_value = None anomaly_tool.db_manager.fetch_all.return_value = sample_market_data # ์‹คํ–‰ result = await anomaly_tool.execute({ "market": "KOSPI", "detection_methods": ["isolation_forest"], "lookback_period": "60d", "contamination": 0.1 }) # ๊ฒฐ๊ณผ ๊ฒ€์ฆ content = result[0] import json data = json.loads(content.text) results = data["anomaly_detection_results"] assert "isolation_forest" in results if_results = results["isolation_forest"] assert "anomaly_scores" in if_results assert "anomalies_detected" in if_results assert "feature_importance" in if_results assert "model_stats" in if_results @pytest.mark.asyncio async def test_execute_timeseries_anomaly_detection(self, anomaly_tool, sample_market_data): """์‹œ๊ณ„์—ด ์ด์ƒ ํŒจํ„ด ํƒ์ง€ ํ…Œ์ŠคํŠธ""" anomaly_tool.cache_manager.get.return_value = None anomaly_tool.db_manager.fetch_all.return_value = sample_market_data # ์‹คํ–‰ result = await anomaly_tool.execute({ "market": "KOSPI", "detection_methods": ["timeseries"], "lookback_period": "90d" }) # ๊ฒฐ๊ณผ ๊ฒ€์ฆ content = result[0] import json data = json.loads(content.text) results = data["anomaly_detection_results"] assert "timeseries" in results ts_results = results["timeseries"] assert "seasonal_anomalies" in ts_results assert "trend_breaks" in ts_results assert "volatility_regimes" in ts_results assert "structural_breaks" in ts_results @pytest.mark.asyncio async def test_comprehensive_anomaly_detection(self, anomaly_tool, sample_market_data): """์ข…ํ•ฉ ์ด์ƒ ํƒ์ง€ ํ…Œ์ŠคํŠธ (๋ชจ๋“  ๋ฐฉ๋ฒ•)""" anomaly_tool.cache_manager.get.return_value = None anomaly_tool.db_manager.fetch_all.return_value = sample_market_data # ์‹คํ–‰ result = await anomaly_tool.execute({ "market": "KOSPI", "detection_methods": ["statistical", "isolation_forest", "timeseries"], "lookback_period": "60d", "sensitivity": 2.5, "contamination": 0.05, "include_feature_analysis": True, "include_risk_assessment": True }) # ๊ฒฐ๊ณผ ๊ฒ€์ฆ content = result[0] import json data = json.loads(content.text) assert "anomaly_detection_results" in data assert "anomaly_summary" in data assert "risk_assessment" in data assert "feature_analysis" in data # ๋ชจ๋“  ํƒ์ง€ ๋ฐฉ๋ฒ• ๊ฒฐ๊ณผ ํ™•์ธ results = data["anomaly_detection_results"] assert "statistical" in results assert "isolation_forest" in results assert "timeseries" in results # ์ข…ํ•ฉ ์š”์•ฝ ํ™•์ธ summary = data["anomaly_summary"] assert "total_anomalies_detected" in summary assert "severity_distribution" in summary assert "confidence_scores" in summary def test_z_score_calculation(self, anomaly_tool): """Z-score ์ด์ƒ ํƒ์ง€ ๊ณ„์‚ฐ ํ…Œ์ŠคํŠธ""" # ์ •์ƒ ๋ฐ์ดํ„ฐ + ์ด์ƒ์น˜ data = [1, 2, 3, 2, 1, 2, 3, 50, 2, 1] # 50์ด ์ด์ƒ์น˜ anomalies = anomaly_tool._detect_z_score_anomalies(data, threshold=2.0) assert len(anomalies) > 0 assert any(item["value"] == 50 for item in anomalies) assert all(-1 <= item["z_score"] or item["z_score"] >= 1 for item in anomalies) def test_iqr_anomaly_detection(self, anomaly_tool): """IQR ์ด์ƒ ํƒ์ง€ ํ…Œ์ŠคํŠธ""" # ์ด์ƒ์น˜๊ฐ€ ํฌํ•จ๋œ ๋ฐ์ดํ„ฐ data = list(range(10, 20)) + [100, 105] # 100, 105๊ฐ€ ์ด์ƒ์น˜ anomalies = anomaly_tool._detect_iqr_anomalies(data) assert len(anomalies) >= 2 assert any(item["value"] == 100 for item in anomalies) assert any(item["value"] == 105 for item in anomalies) def test_isolation_forest_model(self, anomaly_tool): """Isolation Forest ๋ชจ๋ธ ํ…Œ์ŠคํŠธ""" # ๋‹ค์ฐจ์› ํ”ผ์ฒ˜ ๋ฐ์ดํ„ฐ features = [ [1, 2, 3], [2, 3, 4], [1, 1, 2], [2, 2, 3], [10, 15, 20], # ์ด์ƒ์น˜ [1, 2, 2], [2, 3, 3] ] anomalies, scores, model_stats = anomaly_tool._detect_isolation_forest_anomalies( features, contamination=0.2 ) assert len(anomalies) > 0 assert len(scores) == len(features) assert "n_estimators" in model_stats assert "contamination" in model_stats def test_volatility_regime_detection(self, anomaly_tool): """๋ณ€๋™์„ฑ ์ฒด์ œ ํƒ์ง€ ํ…Œ์ŠคํŠธ""" # ๋ณ€๋™์„ฑ ๋ฐ์ดํ„ฐ (๋‚ฎ์Œ -> ๋†’์Œ -> ๋‚ฎ์Œ) volatility_data = ([0.01] * 30 + [0.05] * 20 + [0.01] * 30) regimes = anomaly_tool._detect_volatility_regimes(volatility_data) assert len(regimes) >= 2 # ์ตœ์†Œ 2๊ฐœ์˜ ์ฒด์ œ ๋ณ€ํ™” assert all("start_index" in regime for regime in regimes) assert all("regime_type" in regime for regime in regimes) assert all("volatility_level" in regime for regime in regimes) def test_structural_break_detection(self, anomaly_tool): """๊ตฌ์กฐ์  ๋ณ€ํ™”์  ํƒ์ง€ ํ…Œ์ŠคํŠธ""" # ๊ตฌ์กฐ์  ๋ณ€ํ™”๊ฐ€ ์žˆ๋Š” ์‹œ๊ณ„์—ด (ํ‰๊ท  ๋ณ€ํ™”) data = [10] * 50 + [20] * 50 # 50์ผ ํ›„ ํ‰๊ท  ๋ณ€ํ™” breaks = anomaly_tool._detect_structural_breaks(data) assert len(breaks) > 0 # ๋ณ€ํ™”์ ์ด ๋Œ€๋žต 50 ๊ทผ์ฒ˜์—์„œ ๋ฐœ๊ฒฌ๋˜์–ด์•ผ ํ•จ assert any(40 < break_info["breakpoint"] < 60 for break_info in breaks) @pytest.mark.asyncio async def test_realtime_alert_system(self, anomaly_tool): """์‹ค์‹œ๊ฐ„ ์•Œ๋ฆผ ์‹œ์Šคํ…œ ํ…Œ์ŠคํŠธ""" # ์‹ฌ๊ฐํ•œ ์ด์ƒ ์ง•ํ›„ ๋ฐ์ดํ„ฐ severe_anomaly_data = [{ "date": datetime.now().date(), "market": "KOSPI", "close_price": 1000, # ๊ธ‰๋ฝ "volume": 1500000000, # ๊ฑฐ๋ž˜๋Ÿ‰ ๊ธ‰์ฆ "volatility": 0.08, # ๋†’์€ ๋ณ€๋™์„ฑ "daily_return": -0.15, # ํฐ ํ•˜๋ฝ "vix": 45, # ๊ณตํฌ์ง€์ˆ˜ ์ƒ์Šน "put_call_ratio": 2.0 # Put ๊ธ‰์ฆ }] anomaly_tool.cache_manager.get.return_value = None anomaly_tool.db_manager.fetch_all.return_value = severe_anomaly_data # ์‹คํ–‰ (์‹ค์‹œ๊ฐ„ ์•Œ๋ฆผ ํฌํ•จ) result = await anomaly_tool.execute({ "market": "KOSPI", "detection_methods": ["statistical"], "include_realtime_alerts": True, "alert_threshold": "high" }) content = result[0] import json data = json.loads(content.text) # ์‹ค์‹œ๊ฐ„ ์•Œ๋ฆผ ์ •๋ณด ํ™•์ธ if "realtime_alerts" in data: alerts = data["realtime_alerts"] assert "critical_anomalies" in alerts assert "alert_level" in alerts assert "recommended_actions" in alerts @pytest.mark.asyncio async def test_cache_functionality(self, anomaly_tool): """์บ์‹œ ๊ธฐ๋Šฅ ํ…Œ์ŠคํŠธ""" # ์บ์‹œ ํžˆํŠธ ์‹œ๋‚˜๋ฆฌ์˜ค cached_data = { "timestamp": datetime.now().isoformat(), "market": "KOSPI", "anomaly_detection_results": { "statistical": {"anomaly_count": 5} } } anomaly_tool.cache_manager.get.return_value = cached_data # ์‹คํ–‰ result = await anomaly_tool.execute({ "market": "KOSPI", "detection_methods": ["statistical"], "lookback_period": "30d" }) # ์บ์‹œ์—์„œ ๋ฐ์ดํ„ฐ ๋ฐ˜ํ™˜ ํ™•์ธ content = result[0] import json data = json.loads(content.text) assert data == cached_data # ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ํ˜ธ์ถœ ์—†์Œ ํ™•์ธ anomaly_tool.db_manager.fetch_all.assert_not_called() @pytest.mark.asyncio async def test_error_handling(self, anomaly_tool): """์—๋Ÿฌ ์ฒ˜๋ฆฌ ํ…Œ์ŠคํŠธ""" anomaly_tool.cache_manager.get.return_value = None anomaly_tool.db_manager.fetch_all.side_effect = DatabaseConnectionError("DB ์—ฐ๊ฒฐ ์‹คํŒจ") with pytest.raises(DatabaseConnectionError): await anomaly_tool.execute({ "market": "KOSPI", "detection_methods": ["statistical"] }) @pytest.mark.asyncio async def test_invalid_parameters(self, anomaly_tool): """์ž˜๋ชป๋œ ํŒŒ๋ผ๋ฏธํ„ฐ ํ…Œ์ŠคํŠธ""" # ์ž˜๋ชป๋œ ์‹œ์žฅ with pytest.raises(ValueError, match="Invalid market"): await anomaly_tool.execute({ "market": "INVALID", "detection_methods": ["statistical"] }) # ๋นˆ ํƒ์ง€ ๋ฐฉ๋ฒ• ๋ชฉ๋ก with pytest.raises(ValueError, match="At least one detection method"): await anomaly_tool.execute({ "market": "KOSPI", "detection_methods": [] }) # ์ž˜๋ชป๋œ ๋ฏผ๊ฐ๋„ ๊ฐ’ with pytest.raises(ValueError, match="Invalid sensitivity"): await anomaly_tool.execute({ "market": "KOSPI", "detection_methods": ["statistical"], "sensitivity": -1.0 }) @pytest.mark.asyncio async def test_insufficient_data_handling(self, anomaly_tool): """๋ฐ์ดํ„ฐ ๋ถ€์กฑ ์ฒ˜๋ฆฌ ํ…Œ์ŠคํŠธ""" # ๋ฐ์ดํ„ฐ๊ฐ€ ๋ถ€์กฑํ•œ ๊ฒฝ์šฐ (5์ผ ๋ฐ์ดํ„ฐ) insufficient_data = [ { "date": datetime.now().date() - timedelta(days=i), "market": "KOSPI", "close_price": 2650, "volume": 450000000, "volatility": 0.015 } for i in range(5) ] anomaly_tool.cache_manager.get.return_value = None anomaly_tool.db_manager.fetch_all.return_value = insufficient_data result = await anomaly_tool.execute({ "market": "KOSPI", "detection_methods": ["statistical"], "lookback_period": "30d" }) content = result[0] import json data = json.loads(content.text) assert "warning" in data or "insufficient data" in str(data).lower() def test_feature_extraction(self, anomaly_tool, sample_market_data): """ํ”ผ์ฒ˜ ์ถ”์ถœ ํ…Œ์ŠคํŠธ""" features = anomaly_tool._extract_features(sample_market_data[:30]) assert len(features) == 30 assert len(features[0]) > 5 # ์ตœ์†Œ 5๊ฐœ ํ”ผ์ฒ˜ assert all(isinstance(row, list) for row in features) assert all(isinstance(val, (int, float)) for row in features for val in row) def test_risk_scoring(self, anomaly_tool): """๋ฆฌ์Šคํฌ ์Šค์ฝ”์–ด๋ง ํ…Œ์ŠคํŠธ""" # ๋‹ค์–‘ํ•œ ์ด์ƒ ํƒ์ง€ ๊ฒฐ๊ณผ detection_results = { "statistical": { "anomaly_count": 10, "severe_anomalies": 3 }, "isolation_forest": { "anomalies_detected": 8, "avg_anomaly_score": -0.15 } } risk_assessment = anomaly_tool._assess_risk_level(detection_results) assert "overall_risk_level" in risk_assessment assert "risk_factors" in risk_assessment assert "confidence_score" in risk_assessment assert risk_assessment["overall_risk_level"] in ["low", "medium", "high", "critical"]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server