Claude MCP Data Explorer

by tofunori
Verified
import os import asyncio import logging import pandas as pd import numpy as np import traceback from mcp.types import TextContent # Store loaded dataframes _dataframes = {} _df_counter = 1 async def handle_load_csv(arguments): """Handle the load-csv tool""" global _df_counter csv_path = arguments.get("csv_path") df_name = arguments.get("df_name") if not csv_path: return [TextContent(type="text", text="Error: csv_path is required")] # Normalize path for Windows csv_path = os.path.normpath(csv_path) # Generate a default name if none provided if not df_name: df_name = f"df_{_df_counter}" _df_counter += 1 try: # Get file size file_size_mb = os.path.getsize(csv_path) / (1024 * 1024) logging.info(f"Loading CSV file: {csv_path} ({file_size_mb:.2f} MB)") # Use chunking for large files if file_size_mb > 100: return await _load_large_csv(csv_path, df_name) else: return await _load_small_csv(csv_path, df_name) except FileNotFoundError: return [TextContent( type="text", text=f"Error: File not found: {csv_path}" )] except Exception as e: error_message = f"Error loading CSV: {str(e)}\n{traceback.format_exc()}" logging.error(error_message) return [TextContent( type="text", text=f"Error loading CSV: {str(e)}" )] async def _load_small_csv(csv_path, df_name): """Load a small CSV file directly""" loop = asyncio.get_event_loop() # Run pandas operations in a thread pool df = await loop.run_in_executor( None, lambda: pd.read_csv(csv_path, low_memory=False) ) # Store the dataframe globally _dataframes[df_name] = df # Generate summary statistics summary = await _generate_summary(df) return [TextContent( type="text", text=f"Successfully loaded {csv_path} as {df_name}\n\n{summary}" )] async def _load_large_csv(csv_path, df_name): """Load a large CSV file using chunking""" loop = asyncio.get_event_loop() # Get the number of rows def count_rows(): total_rows = 0 for chunk in pd.read_csv(csv_path, chunksize=100000): total_rows += len(chunk) return total_rows try: total_rows = await loop.run_in_executor(None, count_rows) logging.info(f"Total rows in {csv_path}: {total_rows}") except Exception as e: logging.error(f"Error counting rows: {e}") total_rows = "Unknown" # Sample the file to get column information def sample_file(): sample = pd.read_csv(csv_path, nrows=10000) return sample sample_df = await loop.run_in_executor(None, sample_file) # Load the entire file in chunks def load_full_file(): chunks = [] for chunk in pd.read_csv(csv_path, chunksize=100000): chunks.append(chunk) return pd.concat(chunks) logging.info(f"Loading full file: {csv_path}") df = await loop.run_in_executor(None, load_full_file) # Store the full dataframe globally _dataframes[df_name] = df # Generate summary statistics from sample summary = await _generate_summary(df) return [TextContent( type="text", text=f"Successfully loaded {csv_path} as {df_name} ({total_rows} rows)\n\n{summary}" )] async def _generate_summary(df): """Generate a summary of the dataframe""" loop = asyncio.get_event_loop() def create_summary(): # Basic information summary = [] summary.append(f"Shape: {df.shape[0]} rows × {df.shape[1]} columns") # Column information summary.append("\nColumns:") for col in df.columns: dtype = df[col].dtype n_unique = df[col].nunique() n_missing = df[col].isna().sum() if np.issubdtype(dtype, np.number): summary.append(f" - {col}: {dtype} (unique: {n_unique}, missing: {n_missing}, " f"min: {df[col].min()}, max: {df[col].max()}, " f"mean: {df[col].mean():.2f})") else: summary.append(f" - {col}: {dtype} (unique: {n_unique}, missing: {n_missing})") return "\n".join(summary) return await loop.run_in_executor(None, create_summary) # Make dataframes accessible to other modules def get_dataframe(name): """Get a loaded dataframe by name""" return _dataframes.get(name) def get_all_dataframes(): """Get all loaded dataframes""" return _dataframes