Skip to main content
Glama
test_mcp_tools_direct.py26.4 kB
#!/usr/bin/env python3 """ Прямое тестирование всех MCP инструментов Вызывает методы напрямую без MCP протокола для быстрой проверки """ import asyncio import json import sys from pathlib import Path from datetime import datetime from typing import Dict, List, Any, Optional import traceback import pandas as pd # Добавляем путь к mcp_server sys.path.insert(0, str(Path(__file__).parent / "mcp_server")) from loguru import logger from trading_operations import TradingOperations, get_all_account_balances from technical_analysis import TechnicalAnalysis from market_scanner import MarketScanner from position_monitor import PositionMonitor from bybit_client import BybitClient from signal_tracker import SignalTracker from signal_price_monitor import SignalPriceMonitor from quality_metrics import QualityMetrics from signal_reports import SignalReports # Настройка логирования logger.remove() logger.add( sys.stderr, level="INFO", format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>" ) # Глобальные клиенты trading_ops: Optional[TradingOperations] = None technical_analysis: Optional[TechnicalAnalysis] = None market_scanner: Optional[MarketScanner] = None position_monitor: Optional[PositionMonitor] = None bybit_client: Optional[BybitClient] = None signal_tracker: Optional[SignalTracker] = None signal_monitor: Optional[SignalPriceMonitor] = None quality_metrics: Optional[QualityMetrics] = None signal_reports: Optional[SignalReports] = None def load_credentials() -> Dict[str, Any]: """Загрузка credentials""" config_path = Path(__file__).parent / "config" / "credentials.json" try: with open(config_path, 'r') as f: return json.load(f) except FileNotFoundError: logger.error(f"Credentials not found: {config_path}") raise except json.JSONDecodeError as e: logger.error(f"Invalid JSON: {e}") raise async def initialize_components(): """Инициализация всех компонентов""" global trading_ops, technical_analysis, market_scanner, position_monitor, bybit_client global signal_tracker, signal_monitor, quality_metrics, signal_reports logger.info("Инициализация компонентов...") credentials = load_credentials() bybit_creds = credentials["bybit"] trading_ops = TradingOperations( api_key=bybit_creds["api_key"], api_secret=bybit_creds["api_secret"], testnet=bybit_creds.get("testnet", False) ) bybit_client = BybitClient( api_key=bybit_creds["api_key"], api_secret=bybit_creds["api_secret"], testnet=bybit_creds.get("testnet", False) ) technical_analysis = TechnicalAnalysis(bybit_client) market_scanner = MarketScanner(bybit_client, technical_analysis) position_monitor = PositionMonitor( api_key=bybit_creds["api_key"], api_secret=bybit_creds["api_secret"], testnet=bybit_creds.get("testnet", False) ) signal_tracker = SignalTracker() signal_monitor = SignalPriceMonitor(signal_tracker, bybit_client, check_interval=300) quality_metrics = QualityMetrics(signal_tracker) signal_reports = SignalReports(signal_tracker, quality_metrics) logger.info("✅ Все компоненты инициализированы") async def test_tool_direct(tool_name: str, test_func, *args, **kwargs) -> Dict[str, Any]: """Прямое тестирование инструмента""" result = { "tool": tool_name, "status": "pending", "error": None, "response_time": None, "has_data": False, "sample_response": None } try: start_time = datetime.now() # Вызываем функцию response = await test_func(*args, **kwargs) end_time = datetime.now() response_time = (end_time - start_time).total_seconds() result["response_time"] = round(response_time, 2) result["has_data"] = response is not None # Проверяем тип ответа if isinstance(response, dict): result["sample_response"] = json.dumps(response, indent=2, ensure_ascii=False)[:500] # Проверяем на ошибки if response.get("success") == False: result["status"] = "error" result["error"] = response.get("error", "Unknown error") elif "error" in response and response["error"]: result["status"] = "error" result["error"] = response["error"] else: result["status"] = "success" elif isinstance(response, list): result["sample_response"] = f"List with {len(response)} items" result["status"] = "success" if len(response) > 0 else "warning" if len(response) == 0: result["error"] = "Empty list returned" else: result["sample_response"] = str(response)[:500] result["status"] = "success" except Exception as e: result["status"] = "error" result["error"] = str(e) result["traceback"] = traceback.format_exc() logger.error(f"Ошибка при тестировании {tool_name}: {e}") return result # Тестовые функции для каждого инструмента TEST_FUNCTIONS = { # ═══════════════════════════════════════ # 📊 РЫНОЧНЫЕ ДАННЫЕ # ═══════════════════════════════════════ "get_market_overview": { "func": lambda: trading_ops.get_market_overview("spot"), "description": "Полный обзор рынка" }, "get_all_tickers": { "func": lambda: bybit_client.get_all_tickers("spot", "volume"), "description": "Все торговые пары" }, "get_asset_price": { "func": lambda: bybit_client.get_asset_price("BTC/USDT"), "description": "Текущая цена актива" }, # ═══════════════════════════════════════ # 📈 ТЕХНИЧЕСКИЙ АНАЛИЗ # ═══════════════════════════════════════ "analyze_asset": { "func": lambda: technical_analysis.analyze_asset("BTC/USDT", ["1h", "4h"], True), "description": "Полный анализ актива" }, "calculate_indicators": { "func": lambda: technical_analysis._analyze_timeframe("BTC/USDT", "1h", False), "description": "Расчет индикаторов" }, "detect_patterns": { "async_func": lambda: _test_detect_patterns(), "description": "Поиск паттернов" }, "find_support_resistance": { "async_func": lambda: _test_find_support_resistance(), "description": "Уровни поддержки и сопротивления" }, "get_btc_correlation": { "func": lambda: technical_analysis.get_btc_correlation("ETH/USDT", 24, "1h"), "description": "Корреляция с BTC" }, "get_funding_rate": { "func": lambda: bybit_client.get_funding_rate("BTC/USDT:USDT"), "description": "Funding rate для фьючерсов" }, "check_tf_alignment": { "func": lambda: technical_analysis.check_tf_alignment("BTC/USDT", ["1h", "4h", "1d"]), "description": "Проверка alignment таймфреймов" }, # ═══════════════════════════════════════ # 🔍 СКАНИРОВАНИЕ РЫНКА # ═══════════════════════════════════════ "scan_market": { "func": lambda: market_scanner.scan_market( {"market_type": "spot", "min_volume_24h": 1000000}, limit=5 ), "description": "Сканирование рынка" }, "find_oversold_assets": { "func": lambda: market_scanner.find_oversold_assets("spot", 1000000), "description": "Перепроданные активы" }, "find_overbought_assets": { "func": lambda: market_scanner.find_overbought_assets("spot", 1000000), "description": "Перекупленные активы" }, "find_breakout_opportunities": { "func": lambda: market_scanner.find_breakout_opportunities("spot", 1000000), "description": "Возможности пробоя" }, "find_trend_reversals": { "func": lambda: market_scanner.find_trend_reversals("spot", 1000000), "description": "Развороты тренда" }, # ═══════════════════════════════════════ # 🎯 ВАЛИДАЦИЯ ВХОДА # ═══════════════════════════════════════ "check_liquidity": { "func": lambda: technical_analysis.check_liquidity("BTC/USDT"), "description": "Проверка ликвидности" }, "validate_entry": { "func": lambda: technical_analysis.validate_entry( "BTC/USDT", "long", 50000, 49000, 52000, 0.01, signal_tracker ), "description": "Валидация точки входа" }, "get_open_interest": { "func": lambda: bybit_client.get_open_interest("BTCUSDT", "linear"), "description": "Open Interest для futures" }, # ═══════════════════════════════════════ # 💰 СЧЁТ И ПОЗИЦИИ # ═══════════════════════════════════════ "get_account_info": { "async_func": lambda: _test_get_account_info(), "description": "Информация о счёте" }, "get_open_positions": { "async_func": lambda: _test_get_open_positions(), "description": "Открытые позиции" }, "get_order_history": { "async_func": lambda: _test_get_order_history(), "description": "История ордеров" }, # ═══════════════════════════════════════ # 📡 REAL-TIME МОНИТОРИНГ # ═══════════════════════════════════════ "start_position_monitoring": { "async_func": lambda: _test_start_monitoring(), "description": "Запуск мониторинга позиций" }, "stop_position_monitoring": { "func": lambda: position_monitor.stop_monitoring(), "description": "Остановка мониторинга" }, # ═══════════════════════════════════════ # 🛠️ ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ # ═══════════════════════════════════════ "move_to_breakeven": { "func": lambda: trading_ops.move_to_breakeven("BTCUSDT", 50000, "linear"), "description": "Перевод SL в breakeven", "skip_if_no_position": True }, "activate_trailing_stop": { "func": lambda: trading_ops.activate_trailing_stop("BTCUSDT", 2.0, "linear"), "description": "Активация trailing stop", "skip_if_no_position": True }, # ═══════════════════════════════════════ # 📊 КОНТРОЛЬ КАЧЕСТВА СИГНАЛОВ # ═══════════════════════════════════════ "track_signal": { "func": lambda: signal_tracker.record_signal( "BTC/USDT", "long", 50000, 49000, 52000, 8.5, 0.75 ), "description": "Запись сигнала" }, "get_signal_quality_metrics": { "func": lambda: quality_metrics.calculate_overall_metrics(days=30), "description": "Метрики качества сигналов" }, "get_signal_performance_report": { "func": lambda: signal_reports.generate_summary_report(days=30, format="summary"), "description": "Отчет о производительности" }, "get_active_signals": { "func": lambda: signal_tracker.get_active_signals(), "description": "Активные сигналы" }, "get_signal_details": { "async_func": lambda: _test_get_signal_details(), "description": "Детали сигнала", "skip_if_no_signals": True }, } async def _test_detect_patterns(): """Вспомогательная функция для detect_patterns""" ohlcv = await bybit_client.get_ohlcv("BTC/USDT", "1h") df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) return technical_analysis._detect_patterns(df) async def _test_find_support_resistance(): """Вспомогательная функция для find_support_resistance""" ohlcv = await bybit_client.get_ohlcv("BTC/USDT", "1h", limit=50) df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) return technical_analysis._find_support_resistance(df) async def _test_get_account_info(): """Вспомогательная функция для get_account_info""" all_balances = get_all_account_balances(trading_ops.session, coin="USDT") positions = [] test_symbols = ["BTCUSDT", "ETHUSDT"] for test_symbol in test_symbols: try: test_response = trading_ops.session.get_positions(category="linear", symbol=test_symbol) if test_response.get("retCode") == 0: positions_list = test_response.get("result", {}).get("list", []) positions.extend([p for p in positions_list if float(p.get("size", 0)) != 0]) except: continue positions = [p for p in positions if float(p.get("size", 0)) != 0] total_equity = all_balances.get("total", 0.0) available = all_balances.get("available", 0.0) used_margin = sum(float(p.get("positionIM", 0) or 0) for p in positions) unrealized_pnl = sum(float(p.get("unrealisedPnl", 0) or 0) for p in positions) return { "balance": { "spot": all_balances.get("spot", {"total": 0.0, "available": 0.0, "success": False}), "contract": all_balances.get("contract", {"total": 0.0, "available": 0.0, "success": False}), "unified": all_balances.get("unified", {"total": 0.0, "available": 0.0, "success": False}), "total": total_equity, "available": available, "used_margin": used_margin, "unrealized_pnl": unrealized_pnl }, "positions": positions, "risk_metrics": { "total_risk_pct": (used_margin / total_equity * 100) if total_equity > 0 else 0, "positions_count": len(positions), "max_drawdown": "N/A" } } async def _test_get_open_positions(): """Вспомогательная функция для get_open_positions""" all_positions = [] test_symbols = ["BTCUSDT", "ETHUSDT"] for test_symbol in test_symbols: try: test_response = trading_ops.session.get_positions(category="linear", symbol=test_symbol) if test_response.get("retCode") == 0: positions_list = test_response.get("result", {}).get("list", []) all_positions.extend([p for p in positions_list if float(p.get("size", 0)) != 0]) except: continue if all_positions: open_positions = [ { "symbol": p.get("symbol"), "side": p.get("side"), "size": float(p.get("size", 0)), "entry_price": float(p.get("avgPrice", 0)), "current_price": float(p.get("markPrice", 0)), "unrealized_pnl": float(p.get("unrealisedPnl", 0)), "unrealized_pnl_pct": float(p.get("unrealisedPnl", 0)) / float(p.get("positionValue", 1)) * 100 if float(p.get("positionValue", 0)) != 0 else 0, "leverage": p.get("leverage"), "margin": float(p.get("positionIM", 0)), "liquidation_price": float(p.get("liqPrice", 0)) } for p in all_positions if float(p.get("size", 0)) != 0 ] return open_positions else: return [] async def _test_get_order_history(): """Вспомогательная функция для get_order_history""" response = trading_ops.session.get_order_history(category="spot", limit="10") if response.get("retCode") == 0: return response.get("result", {}) else: return {"success": False, "error": response.get("retMsg", "Unknown error")} async def _test_start_monitoring(): """Вспомогательная функция для start_position_monitoring""" await position_monitor.start_monitoring({ "move_to_breakeven_at": 1.0, "enable_trailing_at": 2.0, "exit_on_reversal": True, "max_time_in_trade": 12 }) return {"success": True, "message": "Position monitoring started"} async def _test_get_signal_details(): """Вспомогательная функция для get_signal_details""" active_signals = await signal_tracker.get_active_signals() if not active_signals: return {"success": False, "error": "No active signals"} signal_id = active_signals[0]["signal_id"] signal = await signal_tracker.get_signal(signal_id) if signal: snapshots = await signal_tracker.get_price_snapshots(signal_id, limit=100) return { "success": True, "signal": signal, "price_snapshots": snapshots, "snapshots_count": len(snapshots) } else: return {"success": False, "error": f"Signal {signal_id} not found"} async def main(): """Главная функция тестирования""" logger.info("=" * 70) logger.info("🔍 КОМПЛЕКСНОЕ ТЕСТИРОВАНИЕ ВСЕХ MCP ИНСТРУМЕНТОВ") logger.info("=" * 70) # Инициализация await initialize_components() # Результаты тестирования results = { "timestamp": datetime.now().isoformat(), "total_tools": len(TEST_FUNCTIONS), "tested": 0, "success": 0, "error": 0, "warning": 0, "skipped": 0, "tools": [] } logger.info(f"\nНачинаем тестирование {len(TEST_FUNCTIONS)} инструментов...\n") # Тестируем каждый инструмент for i, (tool_name, test_config) in enumerate(TEST_FUNCTIONS.items(), 1): logger.info(f"[{i}/{len(TEST_FUNCTIONS)}] Тестирую: {tool_name}...") try: # Проверяем нужно ли пропустить if test_config.get("skip_if_no_position"): try: positions = await _test_get_open_positions() if not positions: result = { "tool": tool_name, "status": "skipped", "error": "Нет открытых позиций для тестирования", "description": test_config.get("description", "") } results["tools"].append(result) results["skipped"] += 1 results["tested"] += 1 logger.info(f" ⏭️ {tool_name}: Пропущен (нет позиций)") continue except: pass if test_config.get("skip_if_no_signals") and tool_name == "get_signal_details": try: active_signals = await signal_tracker.get_active_signals() if not active_signals: result = { "tool": tool_name, "status": "skipped", "error": "Нет активных сигналов для тестирования", "description": test_config.get("description", "") } results["tools"].append(result) results["skipped"] += 1 results["tested"] += 1 logger.info(f" ⏭️ {tool_name}: Пропущен (нет сигналов)") continue except: pass # Вызываем функцию if "async_func" in test_config: result = await test_tool_direct(tool_name, test_config["async_func"]) else: result = await test_tool_direct(tool_name, test_config["func"]) result["description"] = test_config.get("description", "") results["tools"].append(result) results["tested"] += 1 if result["status"] == "success": results["success"] += 1 logger.info(f" ✅ {tool_name}: Успешно ({result['response_time']}s)") elif result["status"] == "warning": results["warning"] += 1 logger.warning(f" ⚠️ {tool_name}: Предупреждение - {result.get('error', '')}") else: results["error"] += 1 logger.error(f" ❌ {tool_name}: Ошибка - {result.get('error', 'Unknown')}") except Exception as e: logger.error(f" ❌ {tool_name}: Критическая ошибка - {e}") results["tools"].append({ "tool": tool_name, "status": "error", "error": str(e), "traceback": traceback.format_exc(), "description": test_config.get("description", "") }) results["error"] += 1 results["tested"] += 1 # Небольшая задержка между тестами await asyncio.sleep(0.5) # Сохраняем результаты report_file = Path(__file__).parent / f"mcp_tools_test_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(report_file, 'w', encoding='utf-8') as f: json.dump(results, f, indent=2, ensure_ascii=False) # Выводим итоговый отчет logger.info("\n" + "=" * 70) logger.info("📊 ИТОГОВЫЙ ОТЧЕТ") logger.info("=" * 70) logger.info(f"Всего инструментов: {results['total_tools']}") logger.info(f"Протестировано: {results['tested']}") logger.info(f"✅ Успешно: {results['success']}") logger.info(f"⚠️ Предупреждения: {results['warning']}") logger.info(f"⏭️ Пропущено: {results['skipped']}") logger.info(f"❌ Ошибки: {results['error']}") success_rate = (results['success'] / results['tested'] * 100) if results['tested'] > 0 else 0 logger.info(f"\n📈 Успешность: {success_rate:.1f}%") logger.info(f"\n💾 Отчет сохранен: {report_file}") # Детальный отчет по ошибкам if results["error"] > 0: logger.info("\n" + "=" * 70) logger.info("❌ ДЕТАЛИ ОШИБОК") logger.info("=" * 70) for tool_result in results["tools"]: if tool_result["status"] == "error": logger.error(f"\n🔴 {tool_result['tool']}:") logger.error(f" Описание: {tool_result.get('description', 'N/A')}") logger.error(f" Ошибка: {tool_result.get('error', 'Unknown')}") # Детальный отчет по предупреждениям if results["warning"] > 0: logger.info("\n" + "=" * 70) logger.info("⚠️ ДЕТАЛИ ПРЕДУПРЕЖДЕНИЙ") logger.info("=" * 70) for tool_result in results["tools"]: if tool_result["status"] == "warning": logger.warning(f"\n🟡 {tool_result['tool']}:") logger.warning(f" Описание: {tool_result.get('description', 'N/A')}") logger.warning(f" Предупреждение: {tool_result.get('error', 'Unknown')}") logger.info("\n" + "=" * 70) # Закрываем соединения if bybit_client: await bybit_client.close() return results if __name__ == "__main__": try: results = asyncio.run(main()) sys.exit(0 if results["error"] == 0 else 1) except KeyboardInterrupt: logger.info("\n⚠️ Тестирование прервано пользователем") sys.exit(1) except Exception as e: logger.error(f"❌ Критическая ошибка: {e}", exc_info=True) sys.exit(1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheMacroeconomicDao/bybit-ai-trader'

If you have feedback or need assistance with the MCP directory API, please join our Discord server