Skip to main content
Glama

VitalDB MCP Server

by yejelim
handlers.py13.1 kB
""" VitalDB MCP Server - Analysis Handlers 고급 분석 기능 구현 """ import logging import asyncio import json logger = logging.getLogger("vitaldb-handlers") async def handle_filter_cases_by_statistics(arguments): """통계 조건으로 케이스 필터링""" from mcp.types import TextContent track_name = arguments["track_name"] statistic = arguments["statistic"] condition = arguments["condition"] candidate_cases = arguments.get("candidate_cases") max_cases = arguments.get("max_cases", 100) interval = arguments.get("interval", 1.0) logger.info(f"Filtering cases by {track_name} {statistic} {condition}") def _work(): import importlib import numpy as np # noqa: F401 (used implicitly via utils computations) vitaldb = importlib.import_module("vitaldb") from utils import load_case_cached, compute_statistics, evaluate_condition # 후보 케이스 결정 nonlocal candidate_cases if candidate_cases is None: candidate_cases = vitaldb.find_cases([track_name]) if len(candidate_cases) > max_cases: local_candidates = candidate_cases[:max_cases] else: local_candidates = candidate_cases filtered_cases = [] case_stats = {} for case_id in local_candidates: try: vals = load_case_cached(case_id, [track_name], interval) track_data = vals[:, 0] stats_dict = compute_statistics(track_data) if "error" not in stats_dict: stat_value = stats_dict.get(statistic) if stat_value is not None and evaluate_condition(stat_value, condition): filtered_cases.append(case_id) case_stats[case_id] = stats_dict except Exception: # 개별 케이스 실패는 무시하고 계속 진행 continue return { "filter_criteria": { "track": track_name, "statistic": statistic, "condition": condition, }, "total_checked": len(candidate_cases), "matched_cases": len(filtered_cases), "case_ids": filtered_cases, "statistics": case_stats, } result = await asyncio.to_thread(_work) return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] async def handle_batch_analyze_cases(arguments): """일괄 케이스 분석""" from mcp.types import TextContent case_ids = arguments["case_ids"] track_names = arguments["track_names"] interval = arguments.get("interval", 1.0) logger.info(f"Batch analyzing {len(case_ids)} cases") def _work(): from utils import load_case_cached, compute_statistics results = {} for case_id in case_ids: try: vals = load_case_cached(case_id, track_names, interval) results[case_id] = {} for i, tname in enumerate(track_names): track_data = vals[:, i] stats_dict = compute_statistics(track_data) results[case_id][tname] = stats_dict except Exception as e: results[case_id] = {"error": str(e)} return results results = await asyncio.to_thread(_work) return [TextContent(type="text", text=json.dumps(results, indent=2, ensure_ascii=False))] async def handle_analyze_correlation(arguments): """상관관계 분석""" from mcp.types import TextContent case_ids = arguments["case_ids"] track1 = arguments["track1"] track2 = arguments["track2"] interval = arguments.get("interval", 1.0) logger.info(f"Analyzing correlation between {track1} and {track2}") def _work(): import numpy as np from utils import load_case_cached all_data1 = [] all_data2 = [] case_correlations = {} for case_id in case_ids: try: vals = load_case_cached(case_id, [track1, track2], interval) data1 = vals[:, 0] data2 = vals[:, 1] mask = ~(np.isnan(data1) | np.isnan(data2)) valid1 = data1[mask] valid2 = data2[mask] if len(valid1) > 10: corr = float(np.corrcoef(valid1, valid2)[0, 1]) case_correlations[case_id] = corr all_data1.extend(valid1.tolist()) all_data2.extend(valid2.tolist()) except Exception: continue overall_corr = ( float(np.corrcoef(all_data1, all_data2)[0, 1]) if len(all_data1) > 0 else None ) return { "track1": track1, "track2": track2, "overall_correlation": overall_corr, "case_correlations": case_correlations, "total_samples": len(all_data1), } result = await asyncio.to_thread(_work) return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] async def handle_compare_groups(arguments): """그룹 비교""" from mcp.types import TextContent group1_cases = arguments["group1_cases"] group2_cases = arguments["group2_cases"] track_name = arguments["track_name"] interval = arguments.get("interval", 1.0) logger.info(f"Comparing groups for {track_name}") def _work(): import numpy as np from scipy import stats from utils import load_case_cached, compute_statistics group1_data = [] group2_data = [] for case_id in group1_cases: try: vals = load_case_cached(case_id, [track_name], interval) valid_data = vals[~np.isnan(vals[:, 0]), 0] group1_data.extend(valid_data.tolist()) except Exception: pass for case_id in group2_cases: try: vals = load_case_cached(case_id, [track_name], interval) valid_data = vals[~np.isnan(vals[:, 0]), 0] group2_data.extend(valid_data.tolist()) except Exception: pass group1_stats = compute_statistics(np.array(group1_data)) group2_stats = compute_statistics(np.array(group2_data)) t_stat, p_value = stats.ttest_ind(group1_data, group2_data, equal_var=False, nan_policy='omit') return { "track_name": track_name, "group1": { "n_cases": len(group1_cases), "n_samples": len(group1_data), "statistics": group1_stats, }, "group2": { "n_cases": len(group2_cases), "n_samples": len(group2_data), "statistics": group2_stats, }, "statistical_test": { "test": "Welch t-test", "t_statistic": float(t_stat), "p_value": float(p_value), "significant": bool(p_value < 0.05), }, } result = await asyncio.to_thread(_work) return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] async def handle_detect_anomalies(arguments): """이상치 탐지""" from mcp.types import TextContent, ImageContent case_id = arguments["case_id"] track_name = arguments["track_name"] method = arguments.get("method", "zscore") threshold = arguments.get("threshold", 3.0) interval = arguments.get("interval", 0.01) logger.info(f"Detecting anomalies in case {case_id}, {track_name}") def _work(): import os os.environ.setdefault("MPLBACKEND", "Agg") import numpy as np import matplotlib.pyplot as plt from utils import load_case_cached, create_plot_image vals = load_case_cached(case_id, [track_name], interval) track_data = vals[:, 0] valid_mask = ~np.isnan(track_data) valid_data = track_data[valid_mask] if method == "zscore": mean = np.mean(valid_data) std = np.std(valid_data) if np.std(valid_data) != 0 else 1e-9 z_scores = np.abs((valid_data - mean) / std) anomalies_idx = np.where(z_scores > threshold)[0] elif method == "iqr": q1 = np.percentile(valid_data, 25) q3 = np.percentile(valid_data, 75) iqr = q3 - q1 lower_bound = q1 - threshold * iqr upper_bound = q3 + threshold * iqr anomalies_idx = np.where((valid_data < lower_bound) | (valid_data > upper_bound))[0] else: return {"error": f"지원하지 않는 방법: {method}"}, None # 원본 인덱스로 변환 import numpy as np valid_indices = np.where(valid_mask)[0] anomaly_indices = valid_indices[anomalies_idx] anomaly_values = track_data[anomaly_indices] # 시각화 fig, ax = plt.subplots(figsize=(15, 6)) time_axis = np.arange(len(track_data)) * interval ax.plot(time_axis, track_data, linewidth=0.5, alpha=0.7, label='Normal') ax.scatter(time_axis[anomaly_indices], anomaly_values, color='red', s=20, zorder=5, label='Anomalies') ax.set_xlabel('Time (seconds)', fontsize=12) ax.set_ylabel(track_name, fontsize=12) ax.set_title(f'Anomaly Detection - Case {case_id}, {track_name}\n{len(anomaly_indices)} anomalies detected', fontsize=14) ax.legend() ax.grid(True, alpha=0.3) plt.tight_layout() img_base64 = create_plot_image(fig) result = { "case_id": case_id, "track_name": track_name, "method": method, "threshold": threshold, "total_samples": int(len(track_data)), "anomalies_detected": int(len(anomaly_indices)), "anomaly_ratio": f"{(len(anomaly_indices)/max(1, len(valid_data))*100):.2f}%", "anomaly_times": (time_axis[anomaly_indices][:20]).tolist(), } return result, img_base64 result, img = await asyncio.to_thread(_work) if isinstance(result, dict) and "error" in result: return [TextContent(type="text", text=result["error"])] return [ TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False)), ImageContent(type="image", data=img, mimeType="image/png"), ] async def handle_time_window_analysis(arguments): """시간 구간 분석""" from mcp.types import TextContent case_id = arguments["case_id"] track_name = arguments["track_name"] start_time = arguments["start_time"] end_time = arguments["end_time"] interval = arguments.get("interval", 1.0) logger.info(f"Time window analysis: case {case_id}, {start_time}s - {end_time}s") def _work(): from utils import load_case_cached, compute_statistics vals = load_case_cached(case_id, [track_name], interval) start_idx = int(start_time / interval) end_idx = int(end_time / interval) end_idx = min(end_idx, vals.shape[0]) window_data = vals[start_idx:end_idx, 0] stats_dict = compute_statistics(window_data) return { "case_id": case_id, "track_name": track_name, "time_window": { "start": start_time, "end": end_time, "duration": end_time - start_time, }, "statistics": stats_dict, } result = await asyncio.to_thread(_work) return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] async def handle_export_to_csv(arguments): """CSV 내보내기""" from mcp.types import TextContent case_ids = arguments["case_ids"] track_names = arguments["track_names"] output_path = arguments["output_path"] interval = arguments.get("interval", 1.0) logger.info(f"Exporting data to {output_path}") def _work(): import pandas as pd from utils import load_case_cached all_rows = [] for case_id in case_ids: try: vals = load_case_cached(case_id, track_names, interval) for i in range(vals.shape[0]): row = {"case_id": case_id, "time": i * interval} for j, tname in enumerate(track_names): row[tname] = vals[i, j] all_rows.append(row) except Exception: continue df = pd.DataFrame(all_rows) df.to_csv(output_path, index=False) return { "output_path": output_path, "total_cases": len(case_ids), "total_rows": len(all_rows), "columns": list(df.columns), } result = await asyncio.to_thread(_work) return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yejelim/vitaldb-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server