Skip to main content
Glama

VitalDB MCP Server

by yejelim
server.py7.77 kB
#!/usr/bin/env python3 """ VitalDB MCP Server - Advanced Version 고급 데이터 분석 및 자연어 탐색을 위한 MCP 서버 """ import asyncio import json import logging import os from logging.handlers import RotatingFileHandler from typing import Any # MCP SDK from mcp.server import Server from mcp.types import TextContent from mcp.server.stdio import stdio_server # 로컬 모듈은 지연 로딩(lazy import)로 각 분기에서만 임포트합니다. # 로깅 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger("vitaldb-mcp-server") # 파일 로깅 핸들러 (프로젝트 루트에 로그를 남깁니다) log_path = os.path.join(os.path.dirname(__file__), "vitaldb-mcp-server.log") file_handler = RotatingFileHandler(log_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8') file_handler.setLevel(logging.INFO) formatter = logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s") file_handler.setFormatter(formatter) # 루트 로거에 파일 핸들러를 추가하면 다른 모듈의 로거들도 파일에 남깁니다 root_logger = logging.getLogger() root_logger.addHandler(file_handler) # MCP 서버 인스턴스 app = Server("vitaldb-mcp-server-advanced") @app.list_tools() async def list_tools(): """사용 가능한 도구 목록 반환""" # 지연 임포트: 호출 시점에만 가져옵니다 from tools import get_all_tools return get_all_tools() @app.call_tool() async def call_tool(name: str, arguments: Any): """도구 호출 처리""" try: # 기본 도구들 if name == "find_cases": track_names = arguments["track_names"] logger.info(f"Finding cases with tracks: {track_names}") # 지연 임포트 + 블로킹 작업은 스레드로 import importlib vitaldb = importlib.import_module("vitaldb") caseids = await asyncio.to_thread(vitaldb.find_cases, track_names) result = { "track_names": track_names, "total_cases": len(caseids), "case_ids": caseids, } return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] elif name == "get_case_info": case_id = arguments["case_id"] logger.info(f"Getting info for case {case_id}") import importlib import traceback import numpy as np vitaldb = importlib.import_module("vitaldb") def _get_track_info(_case_id: int): logger.info(f"Attempting to get track info for case {_case_id}") try: # VitalFile의 케이스 ID 기반 로드가 미구현이므로 다른 방법 사용 # 먼저 find_cases를 통해 이 케이스가 존재하는지 확인 all_cases = vitaldb.find_cases([]) if _case_id not in all_cases: raise ValueError(f"Case {_case_id} not found in VitalDB") # 모든 트랙을 찾기 위해 짧은 구간만 로드 # maxlen=1로 최소 데이터만 가져오기 vf = vitaldb.VitalFile(_case_id, maxlen=1) tracks = sorted(vf.get_track_names()) logger.info(f"Successfully loaded {len(tracks)} tracks") return tracks except Exception as e: logger.error(f"Error loading case {_case_id}: {type(e).__name__}: {str(e)}") logger.error(f"Full traceback: {traceback.format_exc()}") raise try: track_names = await asyncio.to_thread(_get_track_info, case_id) result = { "case_id": case_id, "total_tracks": len(track_names), "available_tracks": track_names, } return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] except Exception as e: error_msg = f"케이스 {case_id}를 찾을 수 없습니다: {type(e).__name__}: {str(e)}" logger.error(error_msg) return [TextContent(type="text", text=error_msg)] elif name == "search_available_tracks": # 지연 임포트 from utils import get_common_tracks common_tracks = get_common_tracks() result = { "common_tracks": common_tracks, "usage_note": "실제 사용 가능한 트랙은 케이스마다 다를 수 있습니다.", } return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] # 고급 분석 도구들 (필요 시점에만 임포트) elif name == "filter_cases_by_statistics": from handlers import handle_filter_cases_by_statistics return await handle_filter_cases_by_statistics(arguments) elif name == "batch_analyze_cases": from handlers import handle_batch_analyze_cases return await handle_batch_analyze_cases(arguments) elif name == "analyze_correlation": from handlers import handle_analyze_correlation return await handle_analyze_correlation(arguments) elif name == "compare_groups": from handlers import handle_compare_groups return await handle_compare_groups(arguments) elif name == "detect_anomalies": # 시각화가 포함된 핸들러이므로 백엔드 설정 후 임포트 os.environ.setdefault("MPLBACKEND", "Agg") from handlers import handle_detect_anomalies return await handle_detect_anomalies(arguments) elif name == "time_window_analysis": from handlers import handle_time_window_analysis return await handle_time_window_analysis(arguments) elif name == "export_to_csv": from handlers import handle_export_to_csv return await handle_export_to_csv(arguments) # 시각화 도구들: 임포트 전에 Agg 백엔드 설정 elif name == "plot_multiple_cases": os.environ.setdefault("MPLBACKEND", "Agg") from visualization import handle_plot_multiple_cases return await handle_plot_multiple_cases(arguments) elif name == "plot_distribution": os.environ.setdefault("MPLBACKEND", "Agg") from visualization import handle_plot_distribution return await handle_plot_distribution(arguments) elif name == "plot_scatter_correlation": os.environ.setdefault("MPLBACKEND", "Agg") from visualization import handle_plot_scatter_correlation return await handle_plot_scatter_correlation(arguments) elif name == "plot_heatmap": os.environ.setdefault("MPLBACKEND", "Agg") from visualization import handle_plot_heatmap return await handle_plot_heatmap(arguments) else: return [TextContent(type="text", text=f"알 수 없는 도구: {name}")] except Exception as e: logger.error(f"Error in {name}: {str(e)}", exc_info=True) return [TextContent(type="text", text=f"오류 발생: {str(e)}")] async def main(): """MCP 서버 시작""" logger.info("Starting VitalDB MCP Server (Advanced)...") async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yejelim/vitaldb-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server