Skip to main content
Glama
main.py•12.2 kB
# main.py """ MCP Server Implementation for COMS 6998 - Supports CSV / Parquet summarization - Includes synthetic dataset generator - Provides analysis tools (describe, head, info, columns) """ import os import pandas as pd import pyarrow as pa import pyarrow.parquet as pq from mcp.server.fastmcp import FastMCP from pathlib import Path DATA_DIR = str(Path(__file__).resolve().parent / "data_files") os.makedirs(DATA_DIR, exist_ok=True) # Add to main.py after line 18 # Enhanced error handling utilities def validate_file_exists(file_name: str) -> str: """Validate file exists and return full path.""" file_path = os.path.join(DATA_DIR, file_name) if not os.path.exists(file_path): raise FileNotFoundError(f"File '{file_name}' not found in data directory") if not os.path.isfile(file_path): raise ValueError(f"'{file_name}' is not a file") return file_path def validate_file_format(file_name: str, expected_format: str): """Validate file has correct extension.""" if not file_name.lower().endswith(f".{expected_format.lower()}"): raise ValueError(f"File '{file_name}' is not a {expected_format.upper()} file") # ------------------------------- # Utility Functions # ------------------------------- def load_csv(path): """Load CSV with error handling.""" try: return pd.read_csv(path) except Exception as e: raise RuntimeError(f"Error reading CSV: {e}") def load_parquet(path): """Load Parquet with error handling.""" try: return pd.read_parquet(path) except Exception as e: raise RuntimeError(f"Error reading Parquet: {e}") def create_sample_data(): """Create a synthetic dataset on first run.""" df = pd.DataFrame({ "id": range(1, 6), "value": [10, 20, 30, 25, 40], "category": ["A", "B", "A", "C", "B"], }) df.to_csv(f"{DATA_DIR}/sample.csv", index=False) df.to_parquet(f"{DATA_DIR}/sample.parquet") return "Sample CSV + Parquet created." # ------------------------------- # MCP Server Definition # ------------------------------- server = FastMCP("coms6998-mcp-server") @server.tool() def list_data_files() -> dict: """Return all data files available with metadata.""" try: files = os.listdir(DATA_DIR) csv_files = [f for f in files if f.endswith('.csv')] parquet_files = [f for f in files if f.endswith('.parquet')] return { "total_files": len(files), "csv_files": csv_files, "parquet_files": parquet_files, "all_files": files } except Exception as e: return {"error": f"Failed to list files: {str(e)}"} @server.tool() def summarize_csv(file_name: str) -> dict: """Summarize CSV content (rows, columns, head preview).""" try: validate_file_format(file_name, "csv") file_path = validate_file_exists(file_name) df = load_csv(file_path) return { "file_name": file_name, "rows": len(df), "columns": df.columns.tolist(), "column_count": len(df.columns), "head": df.head().to_dict(), "dtypes": df.dtypes.astype(str).to_dict(), "memory_usage_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2) } except FileNotFoundError as e: return {"error": str(e)} except ValueError as e: return {"error": str(e)} except Exception as e: return {"error": f"Unexpected error: {str(e)}"} @server.tool() def summarize_parquet(file_name: str) -> dict: """Summarize Parquet file.""" df = load_parquet(os.path.join(DATA_DIR, file_name)) return { "rows": len(df), "columns": df.columns.tolist(), "head": df.head().to_dict() } @server.tool() def analyze_csv(file_name: str, operation: str) -> dict: """Perform analysis: describe, head, info, columns.""" try: validate_file_format(file_name, "csv") file_path = validate_file_exists(file_name) df = load_csv(file_path) valid_operations = ["describe", "head", "columns", "info", "shape", "nulls"] if operation not in valid_operations: return { "error": f"Invalid operation '{operation}'. Valid operations: {', '.join(valid_operations)}" } if operation == "describe": return {"describe": df.describe().to_dict()} elif operation == "head": return {"head": df.head().to_dict()} elif operation == "columns": return { "columns": df.columns.tolist(), "column_info": {col: str(dtype) for col, dtype in df.dtypes.items()} } elif operation == "info": buffer = [] df.info(buf=buffer.append) return {"info": "\n".join(buffer)} elif operation == "shape": return {"shape": {"rows": len(df), "columns": len(df.columns)}} elif operation == "nulls": return {"null_counts": df.isnull().sum().to_dict()} except FileNotFoundError as e: return {"error": str(e)} except ValueError as e: return {"error": str(e)} except Exception as e: return {"error": f"Analysis failed: {str(e)}"} @server.tool() def comprehensive_analysis(file_name: str) -> dict: """ Perform comprehensive multi-step analysis on a CSV file. Returns: summary, statistics, data types, null counts, and sample data. """ try: validate_file_format(file_name, "csv") file_path = validate_file_exists(file_name) df = load_csv(file_path) # Step 1: Basic summary summary = { "rows": len(df), "columns": len(df.columns), "column_names": df.columns.tolist() } # Step 2: Statistical summary (only for numeric columns) numeric_cols = df.select_dtypes(include=['number']).columns statistics = {} if len(numeric_cols) > 0: statistics = df[numeric_cols].describe().to_dict() # Step 3: Data types data_types = {col: str(dtype) for col, dtype in df.dtypes.items()} # Step 4: Null value analysis null_counts = df.isnull().sum().to_dict() null_percentages = {col: round((count/len(df))*100, 2) for col, count in null_counts.items() if count > 0} # Step 5: Sample data sample_data = df.head(5).to_dict() # Step 6: Memory usage memory_mb = round(df.memory_usage(deep=True).sum() / 1024**2, 2) return { "file_name": file_name, "summary": summary, "statistics": statistics, "data_types": data_types, "null_analysis": { "null_counts": null_counts, "columns_with_nulls": null_percentages, "total_null_values": sum(null_counts.values()) }, "sample_data": sample_data, "memory_usage_mb": memory_mb } except FileNotFoundError as e: return {"error": str(e)} except ValueError as e: return {"error": str(e)} except Exception as e: return {"error": f"Comprehensive analysis failed: {str(e)}"} @server.tool() def compare_files(file1: str, file2: str) -> dict: """ Compare two CSV files side by side. Returns: comparison of structure, columns, and basic statistics. """ try: validate_file_format(file1, "csv") validate_file_format(file2, "csv") file_path1 = validate_file_exists(file1) file_path2 = validate_file_exists(file2) df1 = load_csv(file_path1) df2 = load_csv(file_path2) # Compare structure structure_comparison = { "file1": {"rows": len(df1), "columns": len(df1.columns)}, "file2": {"rows": len(df2), "columns": len(df2.columns)} } # Compare columns cols1 = set(df1.columns) cols2 = set(df2.columns) column_comparison = { "common_columns": list(cols1 & cols2), "file1_only": list(cols1 - cols2), "file2_only": list(cols2 - cols1) } return { "file1": file1, "file2": file2, "structure": structure_comparison, "columns": column_comparison, "same_structure": len(df1.columns) == len(df2.columns) and cols1 == cols2 } except FileNotFoundError as e: return {"error": str(e)} except ValueError as e: return {"error": str(e)} except Exception as e: return {"error": f"Comparison failed: {str(e)}"} @server.tool() def create_custom_dataset( rows: int, file_name: str, columns: list = None, data_types: dict = None ) -> dict: """ Create a custom dataset with specified parameters. Args: rows: Number of rows to generate file_name: Output filename (must end with .csv or .parquet) columns: List of column names (optional) data_types: Dict mapping columns to types: 'int', 'float', 'str', 'date', 'bool' """ try: if rows < 1 or rows > 100000: return {"error": "Rows must be between 1 and 100,000"} if not (file_name.endswith('.csv') or file_name.endswith('.parquet')): return {"error": "File name must end with .csv or .parquet"} # Default columns if not provided if columns is None: columns = ["id", "value", "category", "date", "active"] if data_types is None: data_types = { "id": "int", "value": "float", "category": "str", "date": "date", "active": "bool" } import numpy as np from datetime import datetime, timedelta data = {} np.random.seed(42) for col in columns: dtype = data_types.get(col, "str") if dtype == "int": data[col] = np.random.randint(1, 1000, rows) elif dtype == "float": data[col] = np.round(np.random.uniform(0, 100, rows), 2) elif dtype == "str": data[col] = [f"item_{i}" for i in range(rows)] elif dtype == "date": start_date = datetime(2024, 1, 1) data[col] = [start_date + timedelta(days=np.random.randint(0, 365)) for _ in range(rows)] elif dtype == "bool": data[col] = np.random.choice([True, False], rows) else: data[col] = [f"value_{i}" for i in range(rows)] df = pd.DataFrame(data) file_path = os.path.join(DATA_DIR, file_name) if file_name.endswith('.csv'): df.to_csv(file_path, index=False) else: df.to_parquet(file_path, index=False) return { "success": True, "file_name": file_name, "rows": len(df), "columns": list(df.columns), "message": f"Created {file_name} with {rows} rows" } except ValueError as e: return {"error": str(e)} except Exception as e: return {"error": f"Failed to create dataset: {str(e)}"} @server.tool() def create_sample() -> dict: """Generate synthetic dataset with enhanced information.""" try: result = create_sample_data() files = os.listdir(DATA_DIR) sample_files = [f for f in files if f.startswith('sample.')] return { "success": True, "message": result, "created_files": sample_files, "location": DATA_DIR } except Exception as e: return {"error": f"Failed to create sample data: {str(e)}"} if __name__ == "__main__": print("🚀 Starting MCP server...") server.run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/iramk11/claude-data-buddy'

If you have feedback or need assistance with the MCP directory API, please join our Discord server