Skip to main content
Glama
K02D

MCP Tabular Data Analysis Server

by K02D
server.py59.5 kB
""" MCP Server for Tabular Data Analysis. Provides tools for: - Dataset description and statistics - Anomaly detection - Correlation computation - Row filtering - SQLite querying - Pivot tables - Data quality reports - Time series analysis - Chart generation - Dataset merging - Statistical testing - Auto-insights - Data export """ import base64 import io import json import sqlite3 from datetime import datetime from pathlib import Path from typing import Any import matplotlib matplotlib.use('Agg') # Non-interactive backend for server use import matplotlib.pyplot as plt import numpy as np import pandas as pd from mcp.server.fastmcp import FastMCP from scipy import stats # Initialize MCP server mcp = FastMCP( "Tabular Data Analysis", dependencies=["pandas", "numpy", "scipy"], ) # Store loaded datasets in memory for efficient re-use _datasets: dict[str, pd.DataFrame] = {} # Get project root directory (parent of src/) _PROJECT_ROOT = Path(__file__).parent.parent.parent def _resolve_path(file_path: str) -> Path: """ Resolve file path relative to project root if it's a relative path. Args: file_path: Absolute or relative file path Returns: Resolved absolute Path """ path = Path(file_path) # If absolute path, use as-is if path.is_absolute(): return path # Otherwise, resolve relative to project root resolved = _PROJECT_ROOT / path return resolved.resolve() def _load_data(file_path: str) -> pd.DataFrame: """Load data from CSV or SQLite file.""" path = _resolve_path(file_path) if not path.exists(): raise FileNotFoundError( f"File not found: {file_path}\n" f"Resolved to: {path}\n" f"Project root: {_PROJECT_ROOT}\n" f"Current working directory: {Path.cwd()}" ) suffix = path.suffix.lower() if suffix == ".csv": return pd.read_csv(str(path)) elif suffix in (".db", ".sqlite", ".sqlite3"): # For SQLite, list tables or load first table conn = sqlite3.connect(str(path)) tables = pd.read_sql_query( "SELECT name FROM sqlite_master WHERE type='table'", conn ) if tables.empty: conn.close() raise ValueError(f"No tables found in SQLite database: {file_path}") first_table = tables.iloc[0]["name"] df = pd.read_sql_query(f"SELECT * FROM {first_table}", conn) conn.close() return df else: raise ValueError(f"Unsupported file format: {suffix}. Use .csv or .db/.sqlite") def _get_numeric_columns(df: pd.DataFrame) -> list[str]: """Get list of numeric column names.""" return df.select_dtypes(include=[np.number]).columns.tolist() @mcp.tool() def describe_dataset(file_path: str, include_all: bool = False) -> dict[str, Any]: """ Generate comprehensive statistics for a tabular dataset. Args: file_path: Path to CSV or SQLite file include_all: If True, include statistics for all columns (not just numeric) Returns: Dictionary containing: - shape: (rows, columns) - columns: List of column names with their types - numeric_stats: Descriptive statistics for numeric columns - missing_values: Count of missing values per column - sample: First 5 rows as preview """ df = _load_data(file_path) # Basic info result = { "shape": {"rows": len(df), "columns": len(df.columns)}, "columns": { col: str(df[col].dtype) for col in df.columns }, "missing_values": df.isnull().sum().to_dict(), } # Numeric statistics numeric_cols = _get_numeric_columns(df) if numeric_cols: stats_df = df[numeric_cols].describe() # Add additional stats stats_df.loc["median"] = df[numeric_cols].median() stats_df.loc["skew"] = df[numeric_cols].skew() stats_df.loc["kurtosis"] = df[numeric_cols].kurtosis() result["numeric_stats"] = stats_df.to_dict() # Categorical columns info cat_cols = df.select_dtypes(include=["object", "category"]).columns.tolist() if cat_cols: result["categorical_columns"] = { col: { "unique_values": df[col].nunique(), "top_values": df[col].value_counts().head(5).to_dict() } for col in cat_cols } # Sample data result["sample"] = df.head(5).to_dict(orient="records") return result @mcp.tool() def detect_anomalies( file_path: str, column: str, method: str = "zscore", threshold: float = 3.0, ) -> dict[str, Any]: """ Detect anomalies/outliers in a numeric column. Args: file_path: Path to CSV or SQLite file column: Name of the numeric column to analyze method: Detection method - 'zscore' (default), 'iqr', or 'isolation_forest' threshold: Threshold for anomaly detection (default 3.0 for zscore, 1.5 for IQR) Returns: Dictionary containing: - method: Detection method used - anomaly_count: Number of anomalies found - anomaly_indices: Row indices of anomalies - anomalies: The anomalous rows - statistics: Column statistics """ df = _load_data(file_path) if column not in df.columns: raise ValueError(f"Column '{column}' not found. Available: {df.columns.tolist()}") if not np.issubdtype(df[column].dtype, np.number): raise ValueError(f"Column '{column}' is not numeric") col_data = df[column].dropna() if method == "zscore": # Z-score method z_scores = np.abs(stats.zscore(col_data)) anomaly_mask = z_scores > threshold anomaly_indices = col_data[anomaly_mask].index.tolist() elif method == "iqr": # Interquartile Range method q1 = col_data.quantile(0.25) q3 = col_data.quantile(0.75) iqr = q3 - q1 lower_bound = q1 - threshold * iqr upper_bound = q3 + threshold * iqr anomaly_mask = (col_data < lower_bound) | (col_data > upper_bound) anomaly_indices = col_data[anomaly_mask].index.tolist() else: raise ValueError(f"Unknown method: {method}. Use 'zscore' or 'iqr'") anomalies_df = df.loc[anomaly_indices] return { "method": method, "threshold": threshold, "column": column, "anomaly_count": len(anomaly_indices), "anomaly_percentage": round(len(anomaly_indices) / len(col_data) * 100, 2), "anomaly_indices": anomaly_indices, "anomalies": anomalies_df.to_dict(orient="records"), "statistics": { "mean": float(col_data.mean()), "std": float(col_data.std()), "min": float(col_data.min()), "max": float(col_data.max()), "median": float(col_data.median()), } } @mcp.tool() def compute_correlation( file_path: str, columns: list[str] | None = None, method: str = "pearson", ) -> dict[str, Any]: """ Compute correlation matrix between numeric columns. Args: file_path: Path to CSV or SQLite file columns: List of columns to include (default: all numeric columns) method: Correlation method - 'pearson' (default), 'spearman', or 'kendall' Returns: Dictionary containing: - method: Correlation method used - correlation_matrix: Full correlation matrix - top_correlations: Top 10 strongest correlations (excluding self-correlations) """ df = _load_data(file_path) # Get numeric columns if columns: # Validate provided columns invalid = [c for c in columns if c not in df.columns] if invalid: raise ValueError(f"Columns not found: {invalid}") numeric_df = df[columns].select_dtypes(include=[np.number]) else: numeric_df = df.select_dtypes(include=[np.number]) if len(numeric_df.columns) < 2: raise ValueError("Need at least 2 numeric columns for correlation") # Compute correlation matrix corr_matrix = numeric_df.corr(method=method) # Find top correlations (excluding diagonal) correlations = [] for i, col1 in enumerate(corr_matrix.columns): for j, col2 in enumerate(corr_matrix.columns): if i < j: # Upper triangle only corr_value = corr_matrix.loc[col1, col2] if not np.isnan(corr_value): correlations.append({ "column1": col1, "column2": col2, "correlation": round(float(corr_value), 4), "strength": _interpret_correlation(abs(corr_value)) }) # Sort by absolute correlation correlations.sort(key=lambda x: abs(x["correlation"]), reverse=True) return { "method": method, "columns_analyzed": corr_matrix.columns.tolist(), "correlation_matrix": corr_matrix.round(4).to_dict(), "top_correlations": correlations[:10], } def _interpret_correlation(value: float) -> str: """Interpret correlation strength.""" if value >= 0.9: return "very_strong" elif value >= 0.7: return "strong" elif value >= 0.5: return "moderate" elif value >= 0.3: return "weak" else: return "negligible" @mcp.tool() def filter_rows( file_path: str, column: str, operator: str, value: str | float | int, limit: int = 100, ) -> dict[str, Any]: """ Filter rows based on a condition. Args: file_path: Path to CSV or SQLite file column: Column name to filter on operator: Comparison operator - 'eq', 'ne', 'gt', 'gte', 'lt', 'lte', 'contains', 'startswith', 'endswith' value: Value to compare against limit: Maximum number of rows to return (default 100) Returns: Dictionary containing: - filter_applied: Description of the filter - original_count: Number of rows before filtering - filtered_count: Number of rows after filtering - rows: Filtered rows (up to limit) """ df = _load_data(file_path) if column not in df.columns: raise ValueError(f"Column '{column}' not found. Available: {df.columns.tolist()}") original_count = len(df) # Apply filter based on operator if operator == "eq": mask = df[column] == value elif operator == "ne": mask = df[column] != value elif operator == "gt": mask = df[column] > float(value) elif operator == "gte": mask = df[column] >= float(value) elif operator == "lt": mask = df[column] < float(value) elif operator == "lte": mask = df[column] <= float(value) elif operator == "contains": mask = df[column].astype(str).str.contains(str(value), case=False, na=False) elif operator == "startswith": mask = df[column].astype(str).str.startswith(str(value), na=False) elif operator == "endswith": mask = df[column].astype(str).str.endswith(str(value), na=False) else: raise ValueError( f"Unknown operator: {operator}. Use: eq, ne, gt, gte, lt, lte, contains, startswith, endswith" ) filtered_df = df[mask] return { "filter_applied": f"{column} {operator} {value}", "original_count": original_count, "filtered_count": len(filtered_df), "rows": filtered_df.head(limit).to_dict(orient="records"), "truncated": len(filtered_df) > limit, } @mcp.tool() def query_sqlite( db_path: str, query: str, limit: int = 100, ) -> dict[str, Any]: """ Execute a SQL query on a SQLite database. Args: db_path: Path to SQLite database file query: SQL query to execute (SELECT queries only for safety) limit: Maximum number of rows to return (default 100) Returns: Dictionary containing: - query: The executed query - row_count: Number of rows returned - columns: List of column names - rows: Query results """ # Basic safety check - only allow SELECT query_upper = query.strip().upper() if not query_upper.startswith("SELECT"): raise ValueError("Only SELECT queries are allowed for safety") path = _resolve_path(db_path) if not path.exists(): raise FileNotFoundError( f"Database not found: {db_path}\n" f"Resolved to: {path}\n" f"Project root: {_PROJECT_ROOT}" ) conn = sqlite3.connect(str(path)) try: # Add LIMIT if not present if "LIMIT" not in query_upper: query = f"{query.rstrip(';')} LIMIT {limit}" df = pd.read_sql_query(query, conn) return { "query": query, "row_count": len(df), "columns": df.columns.tolist(), "rows": df.to_dict(orient="records"), } finally: conn.close() @mcp.tool() def list_tables(db_path: str) -> dict[str, Any]: """ List all tables in a SQLite database. Args: db_path: Path to SQLite database file Returns: Dictionary containing table names and their schemas """ path = _resolve_path(db_path) if not path.exists(): raise FileNotFoundError( f"Database not found: {db_path}\n" f"Resolved to: {path}\n" f"Project root: {_PROJECT_ROOT}" ) conn = sqlite3.connect(str(path)) try: # Get table names tables = pd.read_sql_query( "SELECT name FROM sqlite_master WHERE type='table'", conn ) result = {"tables": {}} for table_name in tables["name"]: # Get schema for each table schema = pd.read_sql_query( f"PRAGMA table_info({table_name})", conn ) # Get row count count = pd.read_sql_query( f"SELECT COUNT(*) as cnt FROM {table_name}", conn ).iloc[0]["cnt"] result["tables"][table_name] = { "row_count": int(count), "columns": [ { "name": row["name"], "type": row["type"], "nullable": not row["notnull"], "primary_key": bool(row["pk"]), } for _, row in schema.iterrows() ] } return result finally: conn.close() @mcp.tool() def group_aggregate( file_path: str, group_by: list[str], aggregations: dict[str, list[str]], ) -> dict[str, Any]: """ Group data and compute aggregations. Args: file_path: Path to CSV or SQLite file group_by: Columns to group by aggregations: Dictionary mapping column names to list of aggregation functions (e.g., {"sales": ["sum", "mean"], "quantity": ["count", "max"]}) Supported: sum, mean, median, min, max, count, std, var Returns: Dictionary containing grouped and aggregated data """ df = _load_data(file_path) # Validate group_by columns invalid = [c for c in group_by if c not in df.columns] if invalid: raise ValueError(f"Group-by columns not found: {invalid}") # Validate aggregation columns for col in aggregations: if col not in df.columns: raise ValueError(f"Aggregation column '{col}' not found") # Perform groupby grouped = df.groupby(group_by).agg(aggregations) # Flatten column names grouped.columns = ["_".join(col).strip() for col in grouped.columns] grouped = grouped.reset_index() return { "group_by": group_by, "aggregations": aggregations, "group_count": len(grouped), "result": grouped.to_dict(orient="records"), } @mcp.tool() def list_data_files(data_dir: str = "data") -> dict[str, Any]: """ List available data files in the project data directory. Args: data_dir: Relative path to data directory (default: "data") Returns: Dictionary containing list of available CSV and SQLite files """ data_path = _resolve_path(data_dir) if not data_path.exists(): return { "data_directory": str(data_path), "exists": False, "files": [] } csv_files = [] db_files = [] for file_path in sorted(data_path.iterdir()): if file_path.is_file(): suffix = file_path.suffix.lower() file_info = { "name": file_path.name, "path": str(file_path.relative_to(_PROJECT_ROOT)), "size_bytes": file_path.stat().st_size, } if suffix == ".csv": # Try to get basic info about CSV try: df = pd.read_csv(str(file_path), nrows=0) file_info["columns"] = df.columns.tolist() file_info["column_count"] = len(df.columns) except Exception: pass csv_files.append(file_info) elif suffix in (".db", ".sqlite", ".sqlite3"): db_files.append(file_info) return { "data_directory": str(data_path.relative_to(_PROJECT_ROOT)), "absolute_path": str(data_path), "csv_files": csv_files, "sqlite_files": db_files, "total_files": len(csv_files) + len(db_files), } # ============================================================================ # NEW TOOLS: Advanced Analytics & Visualization # ============================================================================ @mcp.tool() def create_pivot_table( file_path: str, index: list[str], columns: list[str] | None = None, values: str | None = None, aggfunc: str = "mean", fill_value: float | None = None, ) -> dict[str, Any]: """ Create a pivot table from tabular data - the most common business analysis operation. Args: file_path: Path to CSV or SQLite file index: Column(s) to use as row labels (grouping) columns: Column(s) to use as column headers (optional) values: Column to aggregate (default: first numeric column) aggfunc: Aggregation function - 'sum', 'mean', 'count', 'min', 'max', 'median', 'std' fill_value: Value to replace missing entries (default: None = show as null) Returns: Dictionary containing the pivot table data and metadata Example: create_pivot_table( file_path="data/sales.csv", index=["region"], columns=["category"], values="revenue", aggfunc="sum" ) """ df = _load_data(file_path) # Validate index columns invalid = [c for c in index if c not in df.columns] if invalid: raise ValueError(f"Index columns not found: {invalid}. Available: {df.columns.tolist()}") # Validate columns if provided if columns: invalid = [c for c in columns if c not in df.columns] if invalid: raise ValueError(f"Column headers not found: {invalid}") # Default to first numeric column if values not specified if values is None: numeric_cols = _get_numeric_columns(df) if not numeric_cols: raise ValueError("No numeric columns found for aggregation") values = numeric_cols[0] elif values not in df.columns: raise ValueError(f"Values column '{values}' not found") # Map aggfunc string to function agg_map = { "sum": "sum", "mean": "mean", "count": "count", "min": "min", "max": "max", "median": "median", "std": "std", } if aggfunc not in agg_map: raise ValueError(f"Unknown aggfunc: {aggfunc}. Use: {list(agg_map.keys())}") # Create pivot table pivot = pd.pivot_table( df, values=values, index=index, columns=columns, aggfunc=agg_map[aggfunc], fill_value=fill_value, ) # Reset index for cleaner output pivot_reset = pivot.reset_index() return { "index": index, "columns": columns, "values": values, "aggfunc": aggfunc, "shape": {"rows": len(pivot), "columns": len(pivot.columns)}, "pivot_table": pivot_reset.to_dict(orient="records"), "summary": { "total": float(pivot.values.sum()) if np.issubdtype(pivot.values.dtype, np.number) else None, "grand_mean": float(pivot.values.mean()) if np.issubdtype(pivot.values.dtype, np.number) else None, } } @mcp.tool() def data_quality_report(file_path: str) -> dict[str, Any]: """ Generate a comprehensive data quality assessment report. Essential for understanding data health before analysis. Args: file_path: Path to CSV or SQLite file Returns: Dictionary containing: - completeness: Missing value analysis per column - uniqueness: Duplicate detection - validity: Data type consistency and outlier counts - overall_score: Data quality score (0-100) """ df = _load_data(file_path) total_cells = df.size total_rows = len(df) # Completeness Analysis missing_per_column = df.isnull().sum().to_dict() missing_pct_per_column = (df.isnull().sum() / total_rows * 100).round(2).to_dict() total_missing = df.isnull().sum().sum() completeness_score = 100 - (total_missing / total_cells * 100) # Uniqueness Analysis duplicate_rows = df.duplicated().sum() duplicate_pct = (duplicate_rows / total_rows * 100) if total_rows > 0 else 0 uniqueness_score = 100 - duplicate_pct # Column-level uniqueness column_uniqueness = { col: { "unique_count": df[col].nunique(), "unique_pct": round(df[col].nunique() / total_rows * 100, 2) if total_rows > 0 else 0, "is_potential_id": df[col].nunique() == total_rows, } for col in df.columns } # Validity Analysis validity_issues = [] numeric_cols = _get_numeric_columns(df) for col in numeric_cols: col_data = df[col].dropna() if len(col_data) > 0: # Check for outliers using IQR q1, q3 = col_data.quantile([0.25, 0.75]) iqr = q3 - q1 outlier_count = ((col_data < q1 - 1.5 * iqr) | (col_data > q3 + 1.5 * iqr)).sum() if outlier_count > 0: validity_issues.append({ "column": col, "issue": "outliers", "count": int(outlier_count), "pct": round(outlier_count / len(col_data) * 100, 2), }) # Check for negative values in typically positive columns if col_data.min() < 0: neg_count = (col_data < 0).sum() validity_issues.append({ "column": col, "issue": "negative_values", "count": int(neg_count), "min_value": float(col_data.min()), }) # Check for empty strings in text columns text_cols = df.select_dtypes(include=["object"]).columns for col in text_cols: empty_strings = (df[col] == "").sum() if empty_strings > 0: validity_issues.append({ "column": col, "issue": "empty_strings", "count": int(empty_strings), }) validity_score = max(0, 100 - len(validity_issues) * 5) # Overall Data Quality Score overall_score = round((completeness_score * 0.4 + uniqueness_score * 0.3 + validity_score * 0.3), 1) # Quality grade if overall_score >= 90: grade = "A" recommendation = "Excellent data quality. Ready for analysis." elif overall_score >= 80: grade = "B" recommendation = "Good data quality. Minor cleaning recommended." elif overall_score >= 70: grade = "C" recommendation = "Moderate data quality. Cleaning needed before analysis." elif overall_score >= 60: grade = "D" recommendation = "Poor data quality. Significant cleaning required." else: grade = "F" recommendation = "Critical data quality issues. Major data cleaning needed." return { "file": file_path, "shape": {"rows": total_rows, "columns": len(df.columns)}, "overall_quality": { "score": overall_score, "grade": grade, "recommendation": recommendation, }, "completeness": { "score": round(completeness_score, 1), "total_missing_cells": int(total_missing), "missing_by_column": missing_per_column, "missing_pct_by_column": missing_pct_per_column, "columns_with_missing": [col for col, pct in missing_pct_per_column.items() if pct > 0], }, "uniqueness": { "score": round(uniqueness_score, 1), "duplicate_rows": int(duplicate_rows), "duplicate_pct": round(duplicate_pct, 2), "column_uniqueness": column_uniqueness, }, "validity": { "score": validity_score, "issues": validity_issues, }, } @mcp.tool() def analyze_time_series( file_path: str, date_column: str, value_column: str, freq: str = "D", include_forecast: bool = False, ) -> dict[str, Any]: """ Perform time series analysis including trend detection, seasonality, and statistics. Args: file_path: Path to CSV or SQLite file date_column: Name of the date/datetime column value_column: Name of the numeric column to analyze freq: Frequency for resampling - 'D' (daily), 'W' (weekly), 'M' (monthly), 'Q' (quarterly), 'Y' (yearly) include_forecast: If True, include simple moving average forecast Returns: Dictionary containing: - trend: Overall trend direction and statistics - statistics: Time series statistics - moving_averages: 7, 30, 90 period moving averages - seasonality: Day of week / month patterns - forecast: Simple forecast if requested """ df = _load_data(file_path) if date_column not in df.columns: raise ValueError(f"Date column '{date_column}' not found. Available: {df.columns.tolist()}") if value_column not in df.columns: raise ValueError(f"Value column '{value_column}' not found") # Parse dates df[date_column] = pd.to_datetime(df[date_column], errors='coerce') df = df.dropna(subset=[date_column, value_column]) df = df.sort_values(date_column) # Create time series ts = df.set_index(date_column)[value_column] if len(ts) < 3: raise ValueError("Need at least 3 data points for time series analysis") # Basic statistics date_range = { "start": str(ts.index.min().date()), "end": str(ts.index.max().date()), "periods": len(ts), "span_days": (ts.index.max() - ts.index.min()).days, } # Trend analysis using linear regression x = np.arange(len(ts)) slope, intercept, r_value, p_value, std_err = stats.linregress(x, ts.values) trend_direction = "increasing" if slope > 0 else "decreasing" if slope < 0 else "flat" trend_strength = abs(r_value) trend = { "direction": trend_direction, "slope": round(float(slope), 4), "r_squared": round(float(r_value ** 2), 4), "strength": "strong" if trend_strength > 0.7 else "moderate" if trend_strength > 0.4 else "weak", "pct_change_total": round(float((ts.iloc[-1] - ts.iloc[0]) / ts.iloc[0] * 100), 2) if ts.iloc[0] != 0 else None, } # Calculate moving averages ma_result = {} for window in [7, 30, 90]: if len(ts) >= window: ma = ts.rolling(window=window).mean() ma_result[f"ma_{window}"] = { "current": round(float(ma.iloc[-1]), 2) if not pd.isna(ma.iloc[-1]) else None, "min": round(float(ma.min()), 2), "max": round(float(ma.max()), 2), } # Resample by frequency resampled = ts.resample(freq).agg(['mean', 'sum', 'count', 'min', 'max']) # Seasonality analysis (if enough data) seasonality = {} if len(df) >= 7: df['dow'] = df[date_column].dt.day_name() dow_stats = df.groupby('dow')[value_column].mean().reindex([ 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday' ]) seasonality['day_of_week'] = dow_stats.round(2).to_dict() best_day = dow_stats.idxmax() worst_day = dow_stats.idxmin() seasonality['insights'] = { 'best_day': best_day, 'worst_day': worst_day, 'variation_pct': round(float((dow_stats.max() - dow_stats.min()) / dow_stats.mean() * 100), 1) if dow_stats.mean() != 0 else 0, } if len(df) >= 30: df['month'] = df[date_column].dt.month_name() month_stats = df.groupby('month')[value_column].mean() seasonality['monthly'] = month_stats.round(2).to_dict() # Statistics statistics = { "mean": round(float(ts.mean()), 2), "median": round(float(ts.median()), 2), "std": round(float(ts.std()), 2), "min": round(float(ts.min()), 2), "max": round(float(ts.max()), 2), "volatility": round(float(ts.std() / ts.mean() * 100), 2) if ts.mean() != 0 else 0, } # Simple forecast using moving average forecast = None if include_forecast and len(ts) >= 7: forecast_window = min(7, len(ts)) forecast_value = ts.tail(forecast_window).mean() forecast = { "method": f"{forecast_window}-period moving average", "next_period_estimate": round(float(forecast_value), 2), "confidence_note": "Simple estimate based on recent average", } # Recent data sample recent_data = ts.tail(10).reset_index() recent_data.columns = ['date', 'value'] recent_data['date'] = recent_data['date'].dt.strftime('%Y-%m-%d') return { "date_column": date_column, "value_column": value_column, "date_range": date_range, "trend": trend, "statistics": statistics, "moving_averages": ma_result, "seasonality": seasonality, "resampled_by": freq, "resampled_periods": len(resampled), "forecast": forecast, "recent_data": recent_data.to_dict(orient='records'), } @mcp.tool() def generate_chart( file_path: str, chart_type: str, x_column: str | None = None, y_column: str | None = None, group_by: str | None = None, title: str | None = None, output_format: str = "base64", ) -> dict[str, Any]: """ Generate a chart/visualization from tabular data. Returns chart as base64-encoded PNG for display. Args: file_path: Path to CSV or SQLite file chart_type: Type of chart - 'bar', 'line', 'scatter', 'histogram', 'pie', 'box' x_column: Column for X-axis (not needed for histogram/pie) y_column: Column for Y-axis values group_by: Optional column for grouping/coloring title: Chart title (auto-generated if not provided) output_format: 'base64' (default) or 'file' (saves to data/charts/) Returns: Dictionary containing chart data as base64 or file path """ df = _load_data(file_path) valid_types = ['bar', 'line', 'scatter', 'histogram', 'pie', 'box'] if chart_type not in valid_types: raise ValueError(f"Unknown chart_type: {chart_type}. Use: {valid_types}") # Set up the figure with a clean style plt.style.use('seaborn-v0_8-whitegrid') fig, ax = plt.subplots(figsize=(10, 6)) # Generate chart based on type if chart_type == 'histogram': if y_column is None: numeric_cols = _get_numeric_columns(df) if not numeric_cols: raise ValueError("No numeric columns found for histogram") y_column = numeric_cols[0] ax.hist(df[y_column].dropna(), bins=30, edgecolor='black', alpha=0.7, color='#4C72B0') ax.set_xlabel(y_column) ax.set_ylabel('Frequency') auto_title = f'Distribution of {y_column}' elif chart_type == 'pie': if x_column is None: cat_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() if not cat_cols: raise ValueError("No categorical columns found for pie chart") x_column = cat_cols[0] value_counts = df[x_column].value_counts().head(10) colors = plt.cm.Set3(np.linspace(0, 1, len(value_counts))) ax.pie(value_counts.values, labels=value_counts.index, autopct='%1.1f%%', colors=colors) auto_title = f'Distribution of {x_column}' elif chart_type == 'box': if y_column is None: numeric_cols = _get_numeric_columns(df) if not numeric_cols: raise ValueError("No numeric columns found for box plot") y_column = numeric_cols[0] if group_by and group_by in df.columns: groups = df[group_by].unique()[:10] # Limit to 10 groups data = [df[df[group_by] == g][y_column].dropna() for g in groups] ax.boxplot(data, labels=groups) ax.set_xlabel(group_by) else: ax.boxplot(df[y_column].dropna()) ax.set_ylabel(y_column) auto_title = f'Box Plot of {y_column}' elif chart_type in ['bar', 'line', 'scatter']: if x_column is None or y_column is None: raise ValueError(f"Both x_column and y_column are required for {chart_type} chart") if x_column not in df.columns: raise ValueError(f"x_column '{x_column}' not found") if y_column not in df.columns: raise ValueError(f"y_column '{y_column}' not found") if group_by and group_by in df.columns: for name, group in df.groupby(group_by): if chart_type == 'bar': ax.bar(group[x_column].astype(str), group[y_column], label=str(name), alpha=0.7) elif chart_type == 'line': ax.plot(group[x_column], group[y_column], label=str(name), marker='o', markersize=4) else: # scatter ax.scatter(group[x_column], group[y_column], label=str(name), alpha=0.7) ax.legend(title=group_by, loc='best') else: if chart_type == 'bar': # For bar charts, aggregate if needed if df[x_column].dtype == 'object': agg = df.groupby(x_column)[y_column].mean() ax.bar(agg.index.astype(str), agg.values, color='#4C72B0', alpha=0.7) else: ax.bar(df[x_column].astype(str).head(50), df[y_column].head(50), color='#4C72B0', alpha=0.7) elif chart_type == 'line': ax.plot(df[x_column], df[y_column], marker='o', markersize=4, color='#4C72B0') else: # scatter ax.scatter(df[x_column], df[y_column], alpha=0.6, color='#4C72B0') ax.set_xlabel(x_column) ax.set_ylabel(y_column) auto_title = f'{y_column} by {x_column}' # Rotate x labels if needed if df[x_column].dtype == 'object' or chart_type == 'bar': plt.xticks(rotation=45, ha='right') # Set title ax.set_title(title or auto_title, fontsize=14, fontweight='bold') plt.tight_layout() # Save chart if output_format == 'file': charts_dir = _PROJECT_ROOT / 'data' / 'charts' charts_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'{chart_type}_{timestamp}.png' filepath = charts_dir / filename plt.savefig(filepath, dpi=150, bbox_inches='tight') plt.close() return { "chart_type": chart_type, "file_path": str(filepath.relative_to(_PROJECT_ROOT)), "absolute_path": str(filepath), } else: # Return as base64 buffer = io.BytesIO() plt.savefig(buffer, format='png', dpi=150, bbox_inches='tight') plt.close() buffer.seek(0) image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') return { "chart_type": chart_type, "format": "png", "encoding": "base64", "image_data": image_base64, "display_note": "Use this base64 string to display the image", } @mcp.tool() def merge_datasets( file_path_left: str, file_path_right: str, on: list[str] | None = None, left_on: str | None = None, right_on: str | None = None, how: str = "inner", preview_limit: int = 50, ) -> dict[str, Any]: """ Merge/join two datasets together - essential for combining data sources. Args: file_path_left: Path to left/primary dataset file_path_right: Path to right/secondary dataset on: Column(s) to join on (if same name in both datasets) left_on: Column name in left dataset to join on right_on: Column name in right dataset to join on how: Join type - 'inner', 'left', 'right', 'outer' preview_limit: Number of rows to return in preview Returns: Dictionary containing merged data preview and statistics """ df_left = _load_data(file_path_left) df_right = _load_data(file_path_right) valid_how = ['inner', 'left', 'right', 'outer'] if how not in valid_how: raise ValueError(f"Unknown join type: {how}. Use: {valid_how}") # Determine join keys if on: invalid_left = [c for c in on if c not in df_left.columns] invalid_right = [c for c in on if c not in df_right.columns] if invalid_left: raise ValueError(f"Columns {invalid_left} not found in left dataset") if invalid_right: raise ValueError(f"Columns {invalid_right} not found in right dataset") merged = pd.merge(df_left, df_right, on=on, how=how) elif left_on and right_on: if left_on not in df_left.columns: raise ValueError(f"Column '{left_on}' not found in left dataset") if right_on not in df_right.columns: raise ValueError(f"Column '{right_on}' not found in right dataset") merged = pd.merge(df_left, df_right, left_on=left_on, right_on=right_on, how=how) else: # Try to find common columns common_cols = list(set(df_left.columns) & set(df_right.columns)) if not common_cols: raise ValueError("No common columns found. Specify 'on', or 'left_on' and 'right_on'") on = common_cols[:1] # Use first common column merged = pd.merge(df_left, df_right, on=on, how=how) # Statistics about the merge left_rows = len(df_left) right_rows = len(df_right) merged_rows = len(merged) merge_stats = { "left_rows": left_rows, "right_rows": right_rows, "merged_rows": merged_rows, "join_type": how, "join_keys": on if on else {"left": left_on, "right": right_on}, } if how == "inner": merge_stats["left_match_pct"] = round(merged_rows / left_rows * 100, 1) if left_rows > 0 else 0 merge_stats["right_match_pct"] = round(merged_rows / right_rows * 100, 1) if right_rows > 0 else 0 return { "merge_stats": merge_stats, "merged_columns": merged.columns.tolist(), "merged_shape": {"rows": merged_rows, "columns": len(merged.columns)}, "preview": merged.head(preview_limit).to_dict(orient="records"), "has_more": merged_rows > preview_limit, } @mcp.tool() def statistical_test( file_path: str, test_type: str, column1: str, column2: str | None = None, group_column: str | None = None, alpha: float = 0.05, ) -> dict[str, Any]: """ Perform statistical hypothesis tests on data. Args: file_path: Path to CSV or SQLite file test_type: Type of test: - 'ttest_ind': Independent samples t-test (compare 2 groups) - 'ttest_paired': Paired samples t-test - 'chi_squared': Chi-squared test for categorical independence - 'anova': One-way ANOVA (compare 3+ groups) - 'mann_whitney': Non-parametric alternative to t-test - 'pearson': Pearson correlation test - 'spearman': Spearman correlation test column1: First column for analysis column2: Second column (required for correlation, optional for t-test) group_column: Column defining groups (for t-test, ANOVA) alpha: Significance level (default 0.05) Returns: Dictionary containing test statistic, p-value, and interpretation """ df = _load_data(file_path) if column1 not in df.columns: raise ValueError(f"Column '{column1}' not found") result = { "test_type": test_type, "alpha": alpha, "columns_tested": [column1], } if test_type == "ttest_ind": # Independent samples t-test if group_column is None: raise ValueError("group_column is required for independent t-test") if group_column not in df.columns: raise ValueError(f"Group column '{group_column}' not found") groups = df[group_column].unique() if len(groups) != 2: raise ValueError(f"t-test requires exactly 2 groups, found {len(groups)}: {groups.tolist()}") group1_data = df[df[group_column] == groups[0]][column1].dropna() group2_data = df[df[group_column] == groups[1]][column1].dropna() t_stat, p_value = stats.ttest_ind(group1_data, group2_data) result.update({ "groups": groups.tolist(), "group_means": {str(groups[0]): float(group1_data.mean()), str(groups[1]): float(group2_data.mean())}, "group_sizes": {str(groups[0]): len(group1_data), str(groups[1]): len(group2_data)}, "t_statistic": float(t_stat), "p_value": float(p_value), "significant": p_value < alpha, "interpretation": f"The difference between groups is {'statistically significant' if p_value < alpha else 'not statistically significant'} at α={alpha}", }) elif test_type == "ttest_paired": if column2 is None: raise ValueError("column2 is required for paired t-test") if column2 not in df.columns: raise ValueError(f"Column '{column2}' not found") data1 = df[column1].dropna() data2 = df[column2].dropna() # Align data mask = df[column1].notna() & df[column2].notna() data1 = df.loc[mask, column1] data2 = df.loc[mask, column2] t_stat, p_value = stats.ttest_rel(data1, data2) result.update({ "columns_tested": [column1, column2], "means": {column1: float(data1.mean()), column2: float(data2.mean())}, "sample_size": len(data1), "mean_difference": float(data1.mean() - data2.mean()), "t_statistic": float(t_stat), "p_value": float(p_value), "significant": p_value < alpha, "interpretation": f"The paired difference is {'statistically significant' if p_value < alpha else 'not statistically significant'} at α={alpha}", }) elif test_type == "chi_squared": if column2 is None: raise ValueError("column2 is required for chi-squared test") if column2 not in df.columns: raise ValueError(f"Column '{column2}' not found") contingency_table = pd.crosstab(df[column1], df[column2]) chi2, p_value, dof, expected = stats.chi2_contingency(contingency_table) result.update({ "columns_tested": [column1, column2], "chi2_statistic": float(chi2), "degrees_of_freedom": int(dof), "p_value": float(p_value), "significant": p_value < alpha, "interpretation": f"The variables are {'dependent (associated)' if p_value < alpha else 'independent'} at α={alpha}", "contingency_table_shape": contingency_table.shape, }) elif test_type == "anova": if group_column is None: raise ValueError("group_column is required for ANOVA") if group_column not in df.columns: raise ValueError(f"Group column '{group_column}' not found") groups = df[group_column].unique() if len(groups) < 3: raise ValueError(f"ANOVA requires 3+ groups, found {len(groups)}. Use t-test for 2 groups.") group_data = [df[df[group_column] == g][column1].dropna() for g in groups] f_stat, p_value = stats.f_oneway(*group_data) group_means = {str(g): float(df[df[group_column] == g][column1].mean()) for g in groups} result.update({ "groups": groups.tolist(), "group_means": group_means, "group_sizes": {str(g): len(df[df[group_column] == g]) for g in groups}, "f_statistic": float(f_stat), "p_value": float(p_value), "significant": p_value < alpha, "interpretation": f"At least one group mean is {'significantly different' if p_value < alpha else 'not significantly different'} at α={alpha}", }) elif test_type == "mann_whitney": if group_column is None: raise ValueError("group_column is required for Mann-Whitney test") groups = df[group_column].unique() if len(groups) != 2: raise ValueError(f"Mann-Whitney requires exactly 2 groups, found {len(groups)}") group1_data = df[df[group_column] == groups[0]][column1].dropna() group2_data = df[df[group_column] == groups[1]][column1].dropna() u_stat, p_value = stats.mannwhitneyu(group1_data, group2_data, alternative='two-sided') result.update({ "groups": groups.tolist(), "group_medians": {str(groups[0]): float(group1_data.median()), str(groups[1]): float(group2_data.median())}, "u_statistic": float(u_stat), "p_value": float(p_value), "significant": p_value < alpha, "interpretation": f"The distributions are {'significantly different' if p_value < alpha else 'not significantly different'} at α={alpha}", }) elif test_type in ["pearson", "spearman"]: if column2 is None: raise ValueError(f"column2 is required for {test_type} correlation") if column2 not in df.columns: raise ValueError(f"Column '{column2}' not found") mask = df[column1].notna() & df[column2].notna() data1 = df.loc[mask, column1] data2 = df.loc[mask, column2] if test_type == "pearson": corr, p_value = stats.pearsonr(data1, data2) else: corr, p_value = stats.spearmanr(data1, data2) result.update({ "columns_tested": [column1, column2], "correlation": float(corr), "strength": _interpret_correlation(abs(corr)), "direction": "positive" if corr > 0 else "negative" if corr < 0 else "none", "p_value": float(p_value), "significant": p_value < alpha, "sample_size": len(data1), "interpretation": f"There is a {_interpret_correlation(abs(corr))} {'positive' if corr > 0 else 'negative'} correlation that is {'statistically significant' if p_value < alpha else 'not statistically significant'} at α={alpha}", }) else: valid_tests = ['ttest_ind', 'ttest_paired', 'chi_squared', 'anova', 'mann_whitney', 'pearson', 'spearman'] raise ValueError(f"Unknown test_type: {test_type}. Use: {valid_tests}") return result @mcp.tool() def auto_insights(file_path: str, max_insights: int = 10) -> dict[str, Any]: """ Automatically generate interesting insights about a dataset. Perfect for quick data exploration and understanding. Args: file_path: Path to CSV or SQLite file max_insights: Maximum number of insights to generate (default 10) Returns: Dictionary containing automatically discovered insights """ df = _load_data(file_path) insights = [] numeric_cols = _get_numeric_columns(df) cat_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() # 1. Dataset overview insights.append({ "category": "overview", "title": "Dataset Size", "insight": f"The dataset contains {len(df):,} rows and {len(df.columns)} columns ({len(numeric_cols)} numeric, {len(cat_cols)} categorical).", "importance": "high", }) # 2. Missing data insights missing_total = df.isnull().sum().sum() if missing_total > 0: most_missing_col = df.isnull().sum().idxmax() most_missing_pct = df[most_missing_col].isnull().mean() * 100 insights.append({ "category": "data_quality", "title": "Missing Values Alert", "insight": f"Found {missing_total:,} missing values total. Column '{most_missing_col}' has the most missing data ({most_missing_pct:.1f}%).", "importance": "high" if most_missing_pct > 20 else "medium", }) # 3. Numeric column insights for col in numeric_cols[:5]: # Limit to first 5 col_data = df[col].dropna() if len(col_data) == 0: continue # Skewness insight skew = col_data.skew() if abs(skew) > 1: direction = "right (positive)" if skew > 0 else "left (negative)" insights.append({ "category": "distribution", "title": f"Skewed Distribution: {col}", "insight": f"'{col}' is highly skewed to the {direction} (skewness: {skew:.2f}). Consider log transformation for analysis.", "importance": "medium", }) # Outlier insight q1, q3 = col_data.quantile([0.25, 0.75]) iqr = q3 - q1 outliers = ((col_data < q1 - 1.5 * iqr) | (col_data > q3 + 1.5 * iqr)).sum() if outliers > 0: outlier_pct = outliers / len(col_data) * 100 if outlier_pct > 5: insights.append({ "category": "outliers", "title": f"Outliers Detected: {col}", "insight": f"'{col}' has {outliers} outliers ({outlier_pct:.1f}% of data). Range: {col_data.min():.2f} to {col_data.max():.2f}.", "importance": "medium", }) # 4. Top correlations if len(numeric_cols) >= 2: corr_matrix = df[numeric_cols].corr() for i, col1 in enumerate(corr_matrix.columns): for j, col2 in enumerate(corr_matrix.columns): if i < j: corr_val = corr_matrix.loc[col1, col2] if abs(corr_val) > 0.7 and not np.isnan(corr_val): direction = "positive" if corr_val > 0 else "negative" insights.append({ "category": "correlation", "title": f"Strong Correlation Found", "insight": f"'{col1}' and '{col2}' have a strong {direction} correlation ({corr_val:.2f}).", "importance": "high", }) # 5. Categorical column insights for col in cat_cols[:3]: # Limit to first 3 unique_count = df[col].nunique() value_counts = df[col].value_counts() if unique_count <= 1: insights.append({ "category": "data_quality", "title": f"Constant Column: {col}", "insight": f"'{col}' has only {unique_count} unique value(s). Consider removing.", "importance": "medium", }) elif unique_count == len(df): insights.append({ "category": "data_structure", "title": f"Unique Identifier: {col}", "insight": f"'{col}' has all unique values - likely a primary key/identifier.", "importance": "low", }) else: top_value = value_counts.index[0] top_pct = value_counts.iloc[0] / len(df) * 100 if top_pct > 50: insights.append({ "category": "distribution", "title": f"Dominant Category: {col}", "insight": f"'{col}' is dominated by '{top_value}' ({top_pct:.1f}% of data).", "importance": "medium", }) # 6. Date column detection for col in df.columns: if df[col].dtype == 'object': try: parsed = pd.to_datetime(df[col], errors='coerce') if parsed.notna().sum() > len(df) * 0.5: # More than 50% valid dates date_range = parsed.dropna() insights.append({ "category": "temporal", "title": f"Date Column Detected: {col}", "insight": f"'{col}' contains dates from {date_range.min().date()} to {date_range.max().date()} ({(date_range.max() - date_range.min()).days} days span).", "importance": "medium", }) break # Only report first date column except: pass # 7. Summary statistics insight if numeric_cols: main_col = numeric_cols[0] main_data = df[main_col].dropna() if len(main_data) > 0: insights.append({ "category": "statistics", "title": f"Key Metric Summary: {main_col}", "insight": f"'{main_col}' ranges from {main_data.min():,.2f} to {main_data.max():,.2f} with mean {main_data.mean():,.2f} and median {main_data.median():,.2f}.", "importance": "medium", }) # Sort by importance and limit importance_order = {"high": 0, "medium": 1, "low": 2} insights.sort(key=lambda x: importance_order.get(x["importance"], 2)) insights = insights[:max_insights] return { "file": file_path, "total_insights": len(insights), "insights": insights, "recommendation": "Use describe_dataset() for detailed statistics, or specific tools like detect_anomalies() to investigate further.", } @mcp.tool() def export_data( file_path: str, output_name: str, filter_column: str | None = None, filter_operator: str | None = None, filter_value: str | float | None = None, columns: list[str] | None = None, sort_by: str | None = None, sort_ascending: bool = True, limit: int | None = None, ) -> dict[str, Any]: """ Export filtered/transformed data to a new CSV file. Args: file_path: Path to source CSV or SQLite file output_name: Name for output file (without extension, saved to data/ folder) filter_column: Optional column to filter on filter_operator: Filter operator - 'eq', 'ne', 'gt', 'gte', 'lt', 'lte', 'contains' filter_value: Value to filter by columns: List of columns to include (default: all) sort_by: Column to sort by sort_ascending: Sort direction (default: ascending) limit: Maximum rows to export Returns: Dictionary containing export details and file path """ df = _load_data(file_path) original_count = len(df) # Apply filter if specified if filter_column and filter_operator and filter_value is not None: if filter_column not in df.columns: raise ValueError(f"Filter column '{filter_column}' not found") if filter_operator == "eq": df = df[df[filter_column] == filter_value] elif filter_operator == "ne": df = df[df[filter_column] != filter_value] elif filter_operator == "gt": df = df[df[filter_column] > float(filter_value)] elif filter_operator == "gte": df = df[df[filter_column] >= float(filter_value)] elif filter_operator == "lt": df = df[df[filter_column] < float(filter_value)] elif filter_operator == "lte": df = df[df[filter_column] <= float(filter_value)] elif filter_operator == "contains": df = df[df[filter_column].astype(str).str.contains(str(filter_value), case=False, na=False)] else: raise ValueError(f"Unknown operator: {filter_operator}") # Select columns if columns: invalid = [c for c in columns if c not in df.columns] if invalid: raise ValueError(f"Columns not found: {invalid}") df = df[columns] # Sort if sort_by: if sort_by not in df.columns: raise ValueError(f"Sort column '{sort_by}' not found") df = df.sort_values(sort_by, ascending=sort_ascending) # Limit rows if limit: df = df.head(limit) # Save file output_dir = _PROJECT_ROOT / 'data' output_dir.mkdir(parents=True, exist_ok=True) # Clean output name and add timestamp clean_name = "".join(c for c in output_name if c.isalnum() or c in ('-', '_')) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_file = output_dir / f"{clean_name}_{timestamp}.csv" df.to_csv(output_file, index=False) return { "success": True, "source_file": file_path, "output_file": str(output_file.relative_to(_PROJECT_ROOT)), "absolute_path": str(output_file), "original_rows": original_count, "exported_rows": len(df), "exported_columns": df.columns.tolist(), "filter_applied": f"{filter_column} {filter_operator} {filter_value}" if filter_column else None, } def main(): """Run the MCP server.""" mcp.run() if __name__ == "__main__": main()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/K02D/mcp-tabular'

If you have feedback or need assistance with the MCP directory API, please join our Discord server