server.py•7.77 kB
#!/usr/bin/env python3
"""
VitalDB MCP Server - Advanced Version
고급 데이터 분석 및 자연어 탐색을 위한 MCP 서버
"""
import asyncio
import json
import logging
import os
from logging.handlers import RotatingFileHandler
from typing import Any
# MCP SDK
from mcp.server import Server
from mcp.types import TextContent
from mcp.server.stdio import stdio_server
# 로컬 모듈은 지연 로딩(lazy import)로 각 분기에서만 임포트합니다.
# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("vitaldb-mcp-server")
# 파일 로깅 핸들러 (프로젝트 루트에 로그를 남깁니다)
log_path = os.path.join(os.path.dirname(__file__), "vitaldb-mcp-server.log")
file_handler = RotatingFileHandler(log_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s")
file_handler.setFormatter(formatter)
# 루트 로거에 파일 핸들러를 추가하면 다른 모듈의 로거들도 파일에 남깁니다
root_logger = logging.getLogger()
root_logger.addHandler(file_handler)
# MCP 서버 인스턴스
app = Server("vitaldb-mcp-server-advanced")
@app.list_tools()
async def list_tools():
"""사용 가능한 도구 목록 반환"""
# 지연 임포트: 호출 시점에만 가져옵니다
from tools import get_all_tools
return get_all_tools()
@app.call_tool()
async def call_tool(name: str, arguments: Any):
"""도구 호출 처리"""
try:
# 기본 도구들
if name == "find_cases":
track_names = arguments["track_names"]
logger.info(f"Finding cases with tracks: {track_names}")
# 지연 임포트 + 블로킹 작업은 스레드로
import importlib
vitaldb = importlib.import_module("vitaldb")
caseids = await asyncio.to_thread(vitaldb.find_cases, track_names)
result = {
"track_names": track_names,
"total_cases": len(caseids),
"case_ids": caseids,
}
return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))]
elif name == "get_case_info":
case_id = arguments["case_id"]
logger.info(f"Getting info for case {case_id}")
import importlib
import traceback
import numpy as np
vitaldb = importlib.import_module("vitaldb")
def _get_track_info(_case_id: int):
logger.info(f"Attempting to get track info for case {_case_id}")
try:
# VitalFile의 케이스 ID 기반 로드가 미구현이므로 다른 방법 사용
# 먼저 find_cases를 통해 이 케이스가 존재하는지 확인
all_cases = vitaldb.find_cases([])
if _case_id not in all_cases:
raise ValueError(f"Case {_case_id} not found in VitalDB")
# 모든 트랙을 찾기 위해 짧은 구간만 로드
# maxlen=1로 최소 데이터만 가져오기
vf = vitaldb.VitalFile(_case_id, maxlen=1)
tracks = sorted(vf.get_track_names())
logger.info(f"Successfully loaded {len(tracks)} tracks")
return tracks
except Exception as e:
logger.error(f"Error loading case {_case_id}: {type(e).__name__}: {str(e)}")
logger.error(f"Full traceback: {traceback.format_exc()}")
raise
try:
track_names = await asyncio.to_thread(_get_track_info, case_id)
result = {
"case_id": case_id,
"total_tracks": len(track_names),
"available_tracks": track_names,
}
return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))]
except Exception as e:
error_msg = f"케이스 {case_id}를 찾을 수 없습니다: {type(e).__name__}: {str(e)}"
logger.error(error_msg)
return [TextContent(type="text", text=error_msg)]
elif name == "search_available_tracks":
# 지연 임포트
from utils import get_common_tracks
common_tracks = get_common_tracks()
result = {
"common_tracks": common_tracks,
"usage_note": "실제 사용 가능한 트랙은 케이스마다 다를 수 있습니다.",
}
return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))]
# 고급 분석 도구들 (필요 시점에만 임포트)
elif name == "filter_cases_by_statistics":
from handlers import handle_filter_cases_by_statistics
return await handle_filter_cases_by_statistics(arguments)
elif name == "batch_analyze_cases":
from handlers import handle_batch_analyze_cases
return await handle_batch_analyze_cases(arguments)
elif name == "analyze_correlation":
from handlers import handle_analyze_correlation
return await handle_analyze_correlation(arguments)
elif name == "compare_groups":
from handlers import handle_compare_groups
return await handle_compare_groups(arguments)
elif name == "detect_anomalies":
# 시각화가 포함된 핸들러이므로 백엔드 설정 후 임포트
os.environ.setdefault("MPLBACKEND", "Agg")
from handlers import handle_detect_anomalies
return await handle_detect_anomalies(arguments)
elif name == "time_window_analysis":
from handlers import handle_time_window_analysis
return await handle_time_window_analysis(arguments)
elif name == "export_to_csv":
from handlers import handle_export_to_csv
return await handle_export_to_csv(arguments)
# 시각화 도구들: 임포트 전에 Agg 백엔드 설정
elif name == "plot_multiple_cases":
os.environ.setdefault("MPLBACKEND", "Agg")
from visualization import handle_plot_multiple_cases
return await handle_plot_multiple_cases(arguments)
elif name == "plot_distribution":
os.environ.setdefault("MPLBACKEND", "Agg")
from visualization import handle_plot_distribution
return await handle_plot_distribution(arguments)
elif name == "plot_scatter_correlation":
os.environ.setdefault("MPLBACKEND", "Agg")
from visualization import handle_plot_scatter_correlation
return await handle_plot_scatter_correlation(arguments)
elif name == "plot_heatmap":
os.environ.setdefault("MPLBACKEND", "Agg")
from visualization import handle_plot_heatmap
return await handle_plot_heatmap(arguments)
else:
return [TextContent(type="text", text=f"알 수 없는 도구: {name}")]
except Exception as e:
logger.error(f"Error in {name}: {str(e)}", exc_info=True)
return [TextContent(type="text", text=f"오류 발생: {str(e)}")]
async def main():
"""MCP 서버 시작"""
logger.info("Starting VitalDB MCP Server (Advanced)...")
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())