Skip to main content
Glama
tableau_converter_MCP.py20.9 kB
""" Tableau CSV Converter MCP 서버 (D1 패키지명 포함) - Platform 기반 CPU 추출 (파일명 의존성 제거) - Device Type, Product Type, Platform 자동 추출 - D1 셀 패키지명을 파일명에 포함 (예: AP196#1_com.pubg.krmobile) - 프로젝트명 기반 파일명: fps_{project_name}.csv, summary_{project_name}.csv """ import asyncio import json import sys import os from pathlib import Path import logging # Windows UTF-8 설정 if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') import pandas as pd from mcp.server import Server from mcp.server.stdio import stdio_server import mcp.types as types # 로깅 LOG_FILE = Path("C:/Users/kimjeonghyun/Desktop/설치/MCP/tableau_converter.log") logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(message)s', handlers=[ logging.FileHandler(LOG_FILE, encoding='utf-8'), logging.StreamHandler() ] ) log = logging.getLogger(__name__) server = Server("tableau-converter") # ======================================== # 스키마 정의 # ======================================== EXPECTED_SUMMARY_COLS = [ "Filename","Measure Date","Project Name","Version", "Device Type","Product Type","Platform", "Avg FPS","Median FPS","Max FPS","1% Low FPS", "MedRange(FPS)[%]","Stability (%)","Std(FPS)","Smooth Index","Drop FPS (/h)", "Jank(/10min)","BigJank(/10min)", "Avg Memory [MB]","Peak Memory [MB]", "Avg CPU [%]","Max CPU [%]", ] EXPECTED_TS_COLS = [ "TimeSec","FPS","Smooth","1%Low(FPS)","Stutter[%]", "AppCPU[%]","TotalCPU[%]","Memory[MB]","AvailableMemory[MB]", "BigJank","Jank","SwapMemory[MB]","GL[MB]","Gfx[MB]","EGL mtrack[MB]", "BTemp[℃]","GTemp[℃]","ScreenShot", "Filename","Version","Measure Date","Project Name","Processing", "Device Type","Product Type","Platform", "Recv[KB/s]","Send[KB/s]","Render[%]","Tiler[%]", "CPUEnergy","GPUEnergy","DisplayEnergy","OverheadEnergy","NetworkEnergy", "CSwitch" ] # ======================================== # 핵심 추출 함수들 # ======================================== def extract_package_name(df_all): """D1 셀에서 패키지명 추출""" try: d1_value = df_all.iloc[0, 3] # D1 = 0행 3열 if pd.notna(d1_value): package_name = str(d1_value).strip() if package_name and package_name.lower() not in ['nan', 'none', '']: return package_name except Exception as e: log.warning(f"D1 셀 패키지명 추출 실패: {e}") return None def extract_device_info(df_all): """DeviceInfo 섹션에서 Device Type과 Product Type 추출""" device_info = { "Device Type": None, "Product Type": None, "Platform": None } deviceinfo_row = None for i in range(min(30, len(df_all))): if str(df_all.iat[i, 0]).strip().lower() == "deviceinfo": deviceinfo_row = i break if deviceinfo_row is None: return device_info try: headers = df_all.iloc[deviceinfo_row + 1].tolist() values = df_all.iloc[deviceinfo_row + 2].tolist() for idx, header in enumerate(headers): header_str = str(header).strip() if header_str == "Device Type" and idx < len(values): device_type = str(values[idx]).strip() if device_type not in ["nan", "None", "", "unknown"]: device_info["Device Type"] = device_type elif header_str == "Product Type" and idx < len(values): product_type = str(values[idx]).strip() if product_type not in ["nan", "None", "", "unknown"]: device_info["Product Type"] = product_type # Platform 판단 if device_info["Product Type"]: device_info["Platform"] = "iOS" elif device_info["Device Type"]: device_info["Platform"] = "Android" except Exception as e: log.warning(f"Device Info 추출 오류: {e}") return device_info def extract_summary_cpu_and_fps_summary_row(path): """Summary 데이터 추출 - Platform 기반 CPU 추출 + D1 패키지명 포함""" df_all = pd.read_excel(path, sheet_name="all", header=None) device_info = extract_device_info(df_all) package_name = extract_package_name(df_all) def _find_appcpu_values(df): """AppCPU[%] 컬럼에서 Avg, Max 추출 (Normalized 제외)""" appcpu_col = None for c in range(df.shape[1]): for r in range(df.shape[0]): v = str(df.iat[r, c]).replace("\n", "").replace(" ", "").lower() if "appcpu" in v and "%" in v and "normalized" not in v: appcpu_col = c break if appcpu_col is not None: break avg_val = max_val = None if appcpu_col is not None: for r in range(df.shape[0]): label = str(df.iat[r, 0]).strip().lower() try: if avg_val is None and label.startswith("avg"): avg_val = float(df.iat[r, appcpu_col]) elif max_val is None and label.startswith("max"): max_val = float(df.iat[r, appcpu_col]) except Exception: continue if avg_val is not None and max_val is not None: break return avg_val, max_val def _find_normalized_appcpu_values(df): """AppCPU[%] (Normalized) 컬럼에서 Avg, Max 추출""" norm_col = None for c in range(df.shape[1]): for r in range(df.shape[0]): v = str(df.iat[r, c]).replace("\n", "").replace(" ", "").lower() if "appcpu" in v and "normalized" in v: norm_col = c break if norm_col is not None: break avg_val = max_val = None if norm_col is not None: for r in range(df.shape[0]): label = str(df.iat[r, 0]).strip().lower() try: if avg_val is None and label.startswith("avg"): avg_val = float(df.iat[r, norm_col]) elif max_val is None and label.startswith("max"): max_val = float(df.iat[r, norm_col]) except Exception: continue if avg_val is not None and max_val is not None: break return avg_val, max_val def _find_cpu_core_count(df): """CPU CoreNum 찾기""" core = None for c in range(df.shape[1]): for r in range(df.shape[0]): if str(df.iat[r, c]).strip().lower() == "cpu corenum": try: core = float(df.iat[r + 1, c]) except Exception: core = None break if core is not None: break return core # 메타데이터 measure_date = str(df_all.iloc[0, 0]).strip() project_name = str(df_all.iloc[0, 1]).strip() version = str(df_all.iloc[0, 2]).strip() # 파일명 생성: Case명_패키지명 (예: "IP036#1_com.pubg.krmobile") full_filename = os.path.basename(path).replace(".xlsx", "") if "_" in full_filename: # 마지막 underscore 이후만 추출 (Case명) case_name = full_filename.split("_")[-1] else: case_name = full_filename # 패키지명이 있으면 추가 if package_name: filename = f"{case_name}_{package_name}" else: filename = case_name metrics = { "Filename": filename, "Measure Date": measure_date, "Project Name": project_name, "Version": version, "Device Type": device_info["Device Type"], "Product Type": device_info["Product Type"], "Platform": device_info["Platform"] } # FPS 메트릭 summary_row = None for i in range(len(df_all)): if str(df_all.iat[i, 0]).strip().lower() == "summary": summary_row = i break if summary_row is not None: fps_col = None for c in range(df_all.shape[1]): if str(df_all.iat[summary_row, c]).strip().lower() == "fps": fps_col = c break if fps_col is not None: try: metrics["Avg FPS"] = float(df_all.iat[summary_row + 1, fps_col]) metrics["Max FPS"] = float(df_all.iat[summary_row + 2, fps_col]) except Exception: pass # 라벨 매핑 mapping = { "Median(FPS)": "Median FPS", "1%Low(FPS)": "1% Low FPS", "MedRange(FPS)[%]": "MedRange(FPS)[%]", "Std(FPS)": "Std(FPS)", "Jank(/10min)": "Jank(/10min)", "BigJank(/10min)": "BigJank(/10min)", "Smooth": "Smooth Index", "Drop(FPS)[/h]": "Drop FPS (/h)", "Avg(Memory)[MB]": "Avg Memory [MB]", "Peak(Memory)[MB]": "Peak Memory [MB]", } str_df = df_all.applymap(str) for key, field in mapping.items(): m = str_df.eq(key) if m.any().any(): r, c = m.stack()[lambda x: x].index[0] try: v = float(df_all.iat[r + 1, c]) if field not in ["1% Low FPS","Smooth Index"]: v = round(v, 2) metrics[field] = v except Exception: pass # ======================================== # CPU 메트릭 (Platform 기반) # ======================================== platform = device_info.get("Platform") if platform == "iOS": # iOS: AppCPU * Core 수 avg_val, max_val = _find_appcpu_values(df_all) core = _find_cpu_core_count(df_all) if core and avg_val is not None: metrics["Avg CPU [%]"] = round(avg_val * core, 2) if core and max_val is not None: metrics["Max CPU [%]"] = round(max_val * core, 2) elif platform == "Android": # Android: Normalized AppCPU 우선, 없으면 AppCPU * Core norm_avg, norm_max = _find_normalized_appcpu_values(df_all) if norm_avg is not None: # Normalized 값이 있으면 그대로 사용 metrics["Avg CPU [%]"] = round(norm_avg, 2) metrics["Max CPU [%]"] = round(norm_max, 2) if norm_max is not None else None else: # Normalized 없으면 AppCPU * Core avg_val, max_val = _find_appcpu_values(df_all) core = _find_cpu_core_count(df_all) if core and avg_val is not None: metrics["Avg CPU [%]"] = round(avg_val * core, 2) if core and max_val is not None: metrics["Max CPU [%]"] = round(max_val * core, 2) # 누락 컬럼 기본값 for col in EXPECTED_SUMMARY_COLS: metrics.setdefault(col, None) return pd.DataFrame([metrics])[EXPECTED_SUMMARY_COLS] def extract_timeseries_cleaned(path): """TimeSeries 데이터 추출 - D1 패키지명 포함""" df_raw = pd.read_excel(path, sheet_name="all", header=None) device_info = extract_device_info(df_raw) package_name = extract_package_name(df_raw) header_row_index = None for i in range(min(60, len(df_raw))): row_values = set(str(v).strip() for v in df_raw.iloc[i]) if {"time","FPS","AppCPU[%]","Memory[MB]"}.intersection(row_values): header_row_index = i break if header_row_index is None: raise ValueError("시계열 헤더 행을 찾지 못했습니다.") df = pd.read_excel(path, sheet_name="all", header=header_row_index) if "time" in df.columns: df["TimeSec"] = pd.to_numeric(df["time"], errors="coerce") / 1000.0 meta = pd.read_excel(path, sheet_name="all", header=None, nrows=1, usecols="A:C") measure_date = str(meta.iloc[0, 0]).strip() project_name = str(meta.iloc[0, 1]).strip() version = str(meta.iloc[0, 2]).strip() # 파일명 생성: Case명_패키지명 (예: "IP036#1_com.pubg.krmobile") full_filename = os.path.basename(path).replace(".xlsx", "") if "_" in full_filename: case_name = full_filename.split("_")[-1] else: case_name = full_filename # 패키지명이 있으면 추가 if package_name: filename = f"{case_name}_{package_name}" else: filename = case_name processing = os.path.basename(path) out = pd.DataFrame() for col in EXPECTED_TS_COLS: if col in {"Filename","Version","Measure Date","Project Name","Processing", "Device Type","Product Type","Platform"}: continue out[col] = df[col] if col in df.columns else pd.NA keep = [c for c in ["TimeSec","FPS"] if c in out.columns] if keep: out = out.dropna(how="all", subset=keep) out = out.reset_index(drop=True) n = len(out) out["Filename"] = [filename] * n out["Version"] = [version] * n out["Measure Date"] = [measure_date] * n out["Project Name"] = [project_name] * n out["Processing"] = [processing] * n out["Device Type"] = [device_info["Device Type"]] * n out["Product Type"] = [device_info["Product Type"]] * n out["Platform"] = [device_info["Platform"]] * n out = out[EXPECTED_TS_COLS] return out # ======================================== # MCP 도구 # ======================================== @server.list_tools() async def list_tools(): return [ types.Tool( name="convert_to_tableau", description="PerfDog Excel 파일들을 Tableau용 CSV로 변환 (Summary + TimeSeries). 저장 위치와 프로젝트명을 반드시 지정해야 합니다.", inputSchema={ "type": "object", "properties": { "excel_folder": { "type": "string", "description": "PerfDog Excel 파일들이 있는 폴더 경로" }, "output_folder": { "type": "string", "description": "CSV 파일을 저장할 폴더 경로 (필수 - 매번 다른 위치 지정 가능)" }, "project_name": { "type": "string", "description": "프로젝트명 (파일명에 사용됨: fps_{project_name}.csv, summary_{project_name}.csv)" }, "summary_filename": { "type": "string", "description": "Summary CSV 파일명 (선택 - 미지정시 summary_{project_name}.csv 사용)" }, "timeseries_filename": { "type": "string", "description": "TimeSeries CSV 파일명 (선택 - 미지정시 fps_{project_name}.csv 사용)" } }, "required": ["excel_folder", "output_folder", "project_name"] } ) ] @server.call_tool() async def call_tool(name, args): log.info(f"[TOOL] {name}") try: if name == "convert_to_tableau": project_name = args["project_name"] # 기본 파일명 summary_filename = args.get("summary_filename", f"summary_{project_name}.csv") timeseries_filename = args.get("timeseries_filename", f"fps_{project_name}.csv") result = await convert_to_tableau( args["excel_folder"], args["output_folder"], summary_filename, timeseries_filename, project_name ) else: result = {"status": "error", "msg": f"Unknown tool: {name}"} return [types.TextContent( type="text", text=json.dumps(result, ensure_ascii=False, indent=2) )] except Exception as e: log.error(f"[ERROR] {e}", exc_info=True) return [types.TextContent( type="text", text=json.dumps({"status": "error", "msg": str(e)}, ensure_ascii=False) )] async def convert_to_tableau( excel_folder, output_folder, summary_filename, timeseries_filename, project_name ): """Excel → CSV 변환 메인 로직""" log.info(f"[CONVERT] 시작") log.info(f" 프로젝트: {project_name}") log.info(f" Excel 폴더: {excel_folder}") log.info(f" 출력 폴더: {output_folder}") try: excel_path = Path(excel_folder) output_path = Path(output_folder) if not excel_path.exists(): return {"status": "error", "msg": f"Excel 폴더 없음: {excel_folder}"} output_path.mkdir(parents=True, exist_ok=True) excel_files = [f for f in excel_path.glob("*.xlsx") if not f.name.startswith("~$")] excel_files.sort(key=lambda p: p.name.lower()) if not excel_files: return {"status": "error", "msg": "Excel 파일 없음"} log.info(f"[CONVERT] {len(excel_files)}개 파일 발견") summary_all = [] ts_all = [] errors = [] for i, file_path in enumerate(excel_files, start=1): filename = file_path.name log.info(f"[{i}/{len(excel_files)}] 처리 중: {filename}") try: df_summary = extract_summary_cpu_and_fps_summary_row(str(file_path)) df_ts = extract_timeseries_cleaned(str(file_path)) summary_all.append(df_summary) ts_all.append(df_ts) log.info(f" ✅ {filename}") except Exception as e: log.error(f" ❌ {filename}: {e}") errors.append({"file": filename, "error": str(e)}) # CSV 저장 summary_path = output_path / summary_filename ts_path = output_path / timeseries_filename summary_rows = 0 timeseries_rows = 0 if summary_all: df_summary = pd.concat(summary_all, ignore_index=True) for col in EXPECTED_SUMMARY_COLS: if col not in df_summary.columns: df_summary[col] = pd.NA df_summary = df_summary[EXPECTED_SUMMARY_COLS] df_summary.to_csv(summary_path, index=False, encoding="utf-8-sig") summary_rows = len(df_summary) log.info(f"[SAVE] ✅ Summary: {summary_path} ({summary_rows}행)") if ts_all: df_ts_all = pd.concat(ts_all, ignore_index=True) for col in EXPECTED_TS_COLS: if col not in df_ts_all.columns: df_ts_all[col] = pd.NA df_ts_all = df_ts_all[EXPECTED_TS_COLS] df_ts_all.to_csv(ts_path, index=False, encoding="utf-8-sig") timeseries_rows = len(df_ts_all) log.info(f"[SAVE] ✅ TimeSeries: {ts_path} ({timeseries_rows}행)") result = { "status": "success", "project_name": project_name, "total_files": len(excel_files), "processed": len(summary_all), "failed": len(errors), "summary_csv": str(summary_path) if summary_all else None, "timeseries_csv": str(ts_path) if ts_all else None, "summary_rows": summary_rows, "timeseries_rows": timeseries_rows, "output_folder": str(output_path) } if errors: result["errors"] = errors[:5] log.info(f"[CONVERT] ✅ 완료: {len(summary_all)}/{len(excel_files)}") return result except Exception as e: log.error(f"[CONVERT ERROR] {e}", exc_info=True) return {"status": "error", "msg": str(e)} # ======================================== # 서버 시작 # ======================================== async def main(): log.info("=" * 60) log.info("Tableau CSV Converter MCP Server (D1 패키지명 포함)") log.info("Platform 기반 CPU 추출") log.info("=" * 60) async with stdio_server() as (read, write): log.info("✅ MCP server ready") await server.run(read, write, server.create_initialization_options()) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kimjeonghyun225-cpu/perfdog_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server