# main.py
"""
MCP Server Implementation for COMS 6998
- Supports CSV / Parquet summarization
- Includes synthetic dataset generator
- Provides analysis tools (describe, head, info, columns)
"""
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from mcp.server.fastmcp import FastMCP
from pathlib import Path
DATA_DIR = str(Path(__file__).resolve().parent / "data_files")
os.makedirs(DATA_DIR, exist_ok=True)
# Add to main.py after line 18
# Enhanced error handling utilities
def validate_file_exists(file_name: str) -> str:
"""Validate file exists and return full path."""
file_path = os.path.join(DATA_DIR, file_name)
if not os.path.exists(file_path):
raise FileNotFoundError(f"File '{file_name}' not found in data directory")
if not os.path.isfile(file_path):
raise ValueError(f"'{file_name}' is not a file")
return file_path
def validate_file_format(file_name: str, expected_format: str):
"""Validate file has correct extension."""
if not file_name.lower().endswith(f".{expected_format.lower()}"):
raise ValueError(f"File '{file_name}' is not a {expected_format.upper()} file")
# -------------------------------
# Utility Functions
# -------------------------------
def load_csv(path):
"""Load CSV with error handling."""
try:
return pd.read_csv(path)
except Exception as e:
raise RuntimeError(f"Error reading CSV: {e}")
def load_parquet(path):
"""Load Parquet with error handling."""
try:
return pd.read_parquet(path)
except Exception as e:
raise RuntimeError(f"Error reading Parquet: {e}")
def create_sample_data():
"""Create a synthetic dataset on first run."""
df = pd.DataFrame({
"id": range(1, 6),
"value": [10, 20, 30, 25, 40],
"category": ["A", "B", "A", "C", "B"],
})
df.to_csv(f"{DATA_DIR}/sample.csv", index=False)
df.to_parquet(f"{DATA_DIR}/sample.parquet")
return "Sample CSV + Parquet created."
# -------------------------------
# MCP Server Definition
# -------------------------------
server = FastMCP("coms6998-mcp-server")
@server.tool()
def list_data_files() -> dict:
"""Return all data files available with metadata."""
try:
files = os.listdir(DATA_DIR)
csv_files = [f for f in files if f.endswith('.csv')]
parquet_files = [f for f in files if f.endswith('.parquet')]
return {
"total_files": len(files),
"csv_files": csv_files,
"parquet_files": parquet_files,
"all_files": files
}
except Exception as e:
return {"error": f"Failed to list files: {str(e)}"}
@server.tool()
def summarize_csv(file_name: str) -> dict:
"""Summarize CSV content (rows, columns, head preview)."""
try:
validate_file_format(file_name, "csv")
file_path = validate_file_exists(file_name)
df = load_csv(file_path)
return {
"file_name": file_name,
"rows": len(df),
"columns": df.columns.tolist(),
"column_count": len(df.columns),
"head": df.head().to_dict(),
"dtypes": df.dtypes.astype(str).to_dict(),
"memory_usage_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2)
}
except FileNotFoundError as e:
return {"error": str(e)}
except ValueError as e:
return {"error": str(e)}
except Exception as e:
return {"error": f"Unexpected error: {str(e)}"}
@server.tool()
def summarize_parquet(file_name: str) -> dict:
"""Summarize Parquet file."""
df = load_parquet(os.path.join(DATA_DIR, file_name))
return {
"rows": len(df),
"columns": df.columns.tolist(),
"head": df.head().to_dict()
}
@server.tool()
def analyze_csv(file_name: str, operation: str) -> dict:
"""Perform analysis: describe, head, info, columns."""
try:
validate_file_format(file_name, "csv")
file_path = validate_file_exists(file_name)
df = load_csv(file_path)
valid_operations = ["describe", "head", "columns", "info", "shape", "nulls"]
if operation not in valid_operations:
return {
"error": f"Invalid operation '{operation}'. Valid operations: {', '.join(valid_operations)}"
}
if operation == "describe":
return {"describe": df.describe().to_dict()}
elif operation == "head":
return {"head": df.head().to_dict()}
elif operation == "columns":
return {
"columns": df.columns.tolist(),
"column_info": {col: str(dtype) for col, dtype in df.dtypes.items()}
}
elif operation == "info":
buffer = []
df.info(buf=buffer.append)
return {"info": "\n".join(buffer)}
elif operation == "shape":
return {"shape": {"rows": len(df), "columns": len(df.columns)}}
elif operation == "nulls":
return {"null_counts": df.isnull().sum().to_dict()}
except FileNotFoundError as e:
return {"error": str(e)}
except ValueError as e:
return {"error": str(e)}
except Exception as e:
return {"error": f"Analysis failed: {str(e)}"}
@server.tool()
def comprehensive_analysis(file_name: str) -> dict:
"""
Perform comprehensive multi-step analysis on a CSV file.
Returns: summary, statistics, data types, null counts, and sample data.
"""
try:
validate_file_format(file_name, "csv")
file_path = validate_file_exists(file_name)
df = load_csv(file_path)
# Step 1: Basic summary
summary = {
"rows": len(df),
"columns": len(df.columns),
"column_names": df.columns.tolist()
}
# Step 2: Statistical summary (only for numeric columns)
numeric_cols = df.select_dtypes(include=['number']).columns
statistics = {}
if len(numeric_cols) > 0:
statistics = df[numeric_cols].describe().to_dict()
# Step 3: Data types
data_types = {col: str(dtype) for col, dtype in df.dtypes.items()}
# Step 4: Null value analysis
null_counts = df.isnull().sum().to_dict()
null_percentages = {col: round((count/len(df))*100, 2)
for col, count in null_counts.items() if count > 0}
# Step 5: Sample data
sample_data = df.head(5).to_dict()
# Step 6: Memory usage
memory_mb = round(df.memory_usage(deep=True).sum() / 1024**2, 2)
return {
"file_name": file_name,
"summary": summary,
"statistics": statistics,
"data_types": data_types,
"null_analysis": {
"null_counts": null_counts,
"columns_with_nulls": null_percentages,
"total_null_values": sum(null_counts.values())
},
"sample_data": sample_data,
"memory_usage_mb": memory_mb
}
except FileNotFoundError as e:
return {"error": str(e)}
except ValueError as e:
return {"error": str(e)}
except Exception as e:
return {"error": f"Comprehensive analysis failed: {str(e)}"}
@server.tool()
def compare_files(file1: str, file2: str) -> dict:
"""
Compare two CSV files side by side.
Returns: comparison of structure, columns, and basic statistics.
"""
try:
validate_file_format(file1, "csv")
validate_file_format(file2, "csv")
file_path1 = validate_file_exists(file1)
file_path2 = validate_file_exists(file2)
df1 = load_csv(file_path1)
df2 = load_csv(file_path2)
# Compare structure
structure_comparison = {
"file1": {"rows": len(df1), "columns": len(df1.columns)},
"file2": {"rows": len(df2), "columns": len(df2.columns)}
}
# Compare columns
cols1 = set(df1.columns)
cols2 = set(df2.columns)
column_comparison = {
"common_columns": list(cols1 & cols2),
"file1_only": list(cols1 - cols2),
"file2_only": list(cols2 - cols1)
}
return {
"file1": file1,
"file2": file2,
"structure": structure_comparison,
"columns": column_comparison,
"same_structure": len(df1.columns) == len(df2.columns) and cols1 == cols2
}
except FileNotFoundError as e:
return {"error": str(e)}
except ValueError as e:
return {"error": str(e)}
except Exception as e:
return {"error": f"Comparison failed: {str(e)}"}
@server.tool()
def create_custom_dataset(
rows: int,
file_name: str,
columns: list = None,
data_types: dict = None
) -> dict:
"""
Create a custom dataset with specified parameters.
Args:
rows: Number of rows to generate
file_name: Output filename (must end with .csv or .parquet)
columns: List of column names (optional)
data_types: Dict mapping columns to types: 'int', 'float', 'str', 'date', 'bool'
"""
try:
if rows < 1 or rows > 100000:
return {"error": "Rows must be between 1 and 100,000"}
if not (file_name.endswith('.csv') or file_name.endswith('.parquet')):
return {"error": "File name must end with .csv or .parquet"}
# Default columns if not provided
if columns is None:
columns = ["id", "value", "category", "date", "active"]
if data_types is None:
data_types = {
"id": "int",
"value": "float",
"category": "str",
"date": "date",
"active": "bool"
}
import numpy as np
from datetime import datetime, timedelta
data = {}
np.random.seed(42)
for col in columns:
dtype = data_types.get(col, "str")
if dtype == "int":
data[col] = np.random.randint(1, 1000, rows)
elif dtype == "float":
data[col] = np.round(np.random.uniform(0, 100, rows), 2)
elif dtype == "str":
data[col] = [f"item_{i}" for i in range(rows)]
elif dtype == "date":
start_date = datetime(2024, 1, 1)
data[col] = [start_date + timedelta(days=np.random.randint(0, 365))
for _ in range(rows)]
elif dtype == "bool":
data[col] = np.random.choice([True, False], rows)
else:
data[col] = [f"value_{i}" for i in range(rows)]
df = pd.DataFrame(data)
file_path = os.path.join(DATA_DIR, file_name)
if file_name.endswith('.csv'):
df.to_csv(file_path, index=False)
else:
df.to_parquet(file_path, index=False)
return {
"success": True,
"file_name": file_name,
"rows": len(df),
"columns": list(df.columns),
"message": f"Created {file_name} with {rows} rows"
}
except ValueError as e:
return {"error": str(e)}
except Exception as e:
return {"error": f"Failed to create dataset: {str(e)}"}
@server.tool()
def create_sample() -> dict:
"""Generate synthetic dataset with enhanced information."""
try:
result = create_sample_data()
files = os.listdir(DATA_DIR)
sample_files = [f for f in files if f.startswith('sample.')]
return {
"success": True,
"message": result,
"created_files": sample_files,
"location": DATA_DIR
}
except Exception as e:
return {"error": f"Failed to create sample data: {str(e)}"}
if __name__ == "__main__":
print("🚀 Starting MCP server...")
server.run()