- mcp-excel-server
- mcp_excel_server
import os
import io
import json
import pandas as pd
import numpy as np
from typing import Optional, Dict, List, Union, Tuple, Any
from dataclasses import dataclass
import base64
from datetime import datetime
from mcp.server.fastmcp import FastMCP, Context, Image
# Create the MCP server
mcp = FastMCP("Excel Data Manager")
# Helper functions
def _read_excel_file(file_path: str) -> Tuple[pd.DataFrame, str]:
"""
Read an Excel file and return a DataFrame and the file extension.
Supports .xlsx, .xls, .csv, and other formats pandas can read.
"""
# Check if file exists
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Get file extension
_, ext = os.path.splitext(file_path)
ext = ext.lower()
# Read based on file extension
if ext in ['.xlsx', '.xls', '.xlsm']:
df = pd.read_excel(file_path)
elif ext == '.csv':
df = pd.read_csv(file_path)
elif ext == '.tsv':
df = pd.read_csv(file_path, sep='\t')
elif ext == '.json':
df = pd.read_json(file_path)
else:
raise ValueError(f"Unsupported file extension: {ext}")
return df, ext
def _get_dataframe_info(df: pd.DataFrame) -> Dict[str, Any]:
"""Generate summary information about a DataFrame."""
# Basic info
info = {
"shape": df.shape,
"columns": list(df.columns),
"dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
"missing_values": df.isnull().sum().to_dict(),
"total_memory_usage": df.memory_usage(deep=True).sum(),
}
# Sample data (first 5 rows)
info["sample"] = df.head(5).to_dict(orient='records')
# Numeric column stats
numeric_cols = df.select_dtypes(include=['number']).columns
if len(numeric_cols) > 0:
info["numeric_stats"] = {}
for col in numeric_cols:
info["numeric_stats"][col] = {
"min": float(df[col].min()) if not pd.isna(df[col].min()) else None,
"max": float(df[col].max()) if not pd.isna(df[col].max()) else None,
"mean": float(df[col].mean()) if not pd.isna(df[col].mean()) else None,
"median": float(df[col].median()) if not pd.isna(df[col].median()) else None,
"std": float(df[col].std()) if not pd.isna(df[col].std()) else None
}
return info
# Resource Handlers
@mcp.resource("excel://{file_path}")
def get_excel_file(file_path: str) -> str:
"""
Retrieve content of an Excel file as a formatted text representation.
Args:
file_path: Path to the Excel file to read
Returns:
String representation of the Excel data
"""
df, _ = _read_excel_file(file_path)
return df.to_string(index=False)
@mcp.resource("excel://{file_path}/info")
def get_excel_info(file_path: str) -> str:
"""
Retrieve information about an Excel file including structure and stats.
Args:
file_path: Path to the Excel file to analyze
Returns:
JSON string with information about the Excel file
"""
df, ext = _read_excel_file(file_path)
info = _get_dataframe_info(df)
info["file_path"] = file_path
info["file_type"] = ext
return json.dumps(info, indent=2, default=str)
@mcp.resource("excel://{file_path}/sheet_names")
def get_sheet_names(file_path: str) -> str:
"""
Get the names of all sheets in an Excel workbook.
Args:
file_path: Path to the Excel file
Returns:
JSON string with sheet names
"""
_, ext = os.path.splitext(file_path)
ext = ext.lower()
if ext not in ['.xlsx', '.xls', '.xlsm']:
return json.dumps({"error": "File is not an Excel workbook"})
xls = pd.ExcelFile(file_path)
return json.dumps({"sheet_names": xls.sheet_names})
@mcp.resource("excel://{file_path}/preview")
def get_excel_preview(file_path: str) -> Image:
"""
Generate a visual preview of an Excel file.
Args:
file_path: Path to the Excel file
Returns:
Image of the data visualization
"""
import matplotlib.pyplot as plt
import seaborn as sns
df, _ = _read_excel_file(file_path)
# Create a styled preview
plt.figure(figsize=(10, 6))
# If DataFrame is small enough, show as a table
if df.shape[0] <= 10 and df.shape[1] <= 10:
plt.axis('tight')
plt.axis('off')
table = plt.table(cellText=df.values,
colLabels=df.columns,
cellLoc='center',
loc='center')
table.auto_set_font_size(False)
table.set_fontsize(9)
table.scale(1.2, 1.2)
else:
# For larger DataFrames, show a heatmap of the first 10x10 section
preview_df = df.iloc[:10, :10]
sns.heatmap(preview_df.select_dtypes(include=['number']),
cmap='viridis',
annot=False,
linewidths=.5)
plt.title(f"Preview of {os.path.basename(file_path)}")
# Save to bytes buffer
buf = io.BytesIO()
plt.savefig(buf, format='png', bbox_inches='tight')
buf.seek(0)
# Convert to Image
plt.close()
return Image(data=buf.getvalue(), format="png")
# Tool Handlers
@mcp.tool()
def read_excel(file_path: str, sheet_name: Optional[str] = None,
nrows: Optional[int] = None, header: Optional[int] = 0) -> str:
"""
Read an Excel file and return its contents as a string.
Args:
file_path: Path to the Excel file
sheet_name: Name of the sheet to read (only for .xlsx, .xls)
nrows: Maximum number of rows to read
header: Row to use as header (0-indexed)
Returns:
String representation of the Excel data
"""
_, ext = os.path.splitext(file_path)
ext = ext.lower()
read_params = {"header": header}
if nrows is not None:
read_params["nrows"] = nrows
if ext in ['.xlsx', '.xls', '.xlsm']:
if sheet_name is not None:
read_params["sheet_name"] = sheet_name
df = pd.read_excel(file_path, **read_params)
elif ext == '.csv':
df = pd.read_csv(file_path, **read_params)
elif ext == '.tsv':
df = pd.read_csv(file_path, sep='\t', **read_params)
elif ext == '.json':
df = pd.read_json(file_path)
else:
return f"Unsupported file extension: {ext}"
return df.to_string(index=False)
@mcp.tool()
def write_excel(file_path: str, data: str, sheet_name: Optional[str] = "Sheet1",
format: Optional[str] = "csv") -> str:
"""
Write data to an Excel file.
Args:
file_path: Path to save the Excel file
data: Data in CSV or JSON format
sheet_name: Name of the sheet (for Excel files)
format: Format of the input data ('csv' or 'json')
Returns:
Confirmation message
"""
try:
if format.lower() == 'csv':
df = pd.read_csv(io.StringIO(data))
elif format.lower() == 'json':
df = pd.read_json(io.StringIO(data))
else:
return f"Unsupported data format: {format}"
_, ext = os.path.splitext(file_path)
ext = ext.lower()
if ext in ['.xlsx', '.xls', '.xlsm']:
df.to_excel(file_path, sheet_name=sheet_name, index=False)
elif ext == '.csv':
df.to_csv(file_path, index=False)
elif ext == '.tsv':
df.to_csv(file_path, sep='\t', index=False)
elif ext == '.json':
df.to_json(file_path, orient='records')
else:
return f"Unsupported output file extension: {ext}"
return f"Successfully wrote data to {file_path}"
except Exception as e:
return f"Error writing data: {str(e)}"
@mcp.tool()
def update_excel(file_path: str, data: str, sheet_name: Optional[str] = "Sheet1",
format: Optional[str] = "csv") -> str:
"""
Update an existing Excel file with new data.
Args:
file_path: Path to the Excel file to update
data: New data in CSV or JSON format
sheet_name: Name of the sheet to update (for Excel files)
format: Format of the input data ('csv' or 'json')
Returns:
Confirmation message
"""
try:
# Check if file exists
if not os.path.exists(file_path):
return f"File not found: {file_path}"
# Load new data
if format.lower() == 'csv':
new_df = pd.read_csv(io.StringIO(data))
elif format.lower() == 'json':
new_df = pd.read_json(io.StringIO(data))
else:
return f"Unsupported data format: {format}"
# Get file extension
_, ext = os.path.splitext(file_path)
ext = ext.lower()
# Read existing file
if ext in ['.xlsx', '.xls', '.xlsm']:
# For Excel files, we need to read all sheets
excel_file = pd.ExcelFile(file_path)
with pd.ExcelWriter(file_path) as writer:
# Copy all existing sheets
for sheet in excel_file.sheet_names:
if sheet != sheet_name:
df = pd.read_excel(excel_file, sheet_name=sheet)
df.to_excel(writer, sheet_name=sheet, index=False)
# Write new data to specified sheet
new_df.to_excel(writer, sheet_name=sheet_name, index=False)
elif ext == '.csv':
new_df.to_csv(file_path, index=False)
elif ext == '.tsv':
new_df.to_csv(file_path, sep='\t', index=False)
elif ext == '.json':
new_df.to_json(file_path, orient='records')
else:
return f"Unsupported file extension: {ext}"
return f"Successfully updated {file_path}"
except Exception as e:
return f"Error updating file: {str(e)}"
@mcp.tool()
def analyze_excel(file_path: str, columns: Optional[str] = None,
sheet_name: Optional[str] = None) -> str:
"""
Perform statistical analysis on Excel data.
Args:
file_path: Path to the Excel file
columns: Comma-separated list of columns to analyze (analyzes all numeric columns if None)
sheet_name: Name of the sheet to analyze (for Excel files)
Returns:
JSON string with statistical analysis
"""
try:
# Read file
_, ext = os.path.splitext(file_path)
ext = ext.lower()
read_params = {}
if ext in ['.xlsx', '.xls', '.xlsm'] and sheet_name is not None:
read_params["sheet_name"] = sheet_name
if ext in ['.xlsx', '.xls', '.xlsm']:
df = pd.read_excel(file_path, **read_params)
elif ext == '.csv':
df = pd.read_csv(file_path)
elif ext == '.tsv':
df = pd.read_csv(file_path, sep='\t')
elif ext == '.json':
df = pd.read_json(file_path)
else:
return f"Unsupported file extension: {ext}"
# Filter columns if specified
if columns:
column_list = [c.strip() for c in columns.split(',')]
df = df[column_list]
# Select only numeric columns for analysis
numeric_df = df.select_dtypes(include=['number'])
if numeric_df.empty:
return json.dumps({"error": "No numeric columns found for analysis"})
# Perform analysis
analysis = {
"descriptive_stats": numeric_df.describe().to_dict(),
"correlation": numeric_df.corr().to_dict(),
"missing_values": numeric_df.isnull().sum().to_dict(),
"unique_values": {col: int(numeric_df[col].nunique()) for col in numeric_df.columns}
}
return json.dumps(analysis, indent=2, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
def filter_excel(file_path: str, query: str, sheet_name: Optional[str] = None) -> str:
"""
Filter Excel data using a pandas query string.
Args:
file_path: Path to the Excel file
query: Pandas query string (e.g., "Age > 30 and Department == 'Sales'")
sheet_name: Name of the sheet to filter (for Excel files)
Returns:
Filtered data as string
"""
try:
# Read file
_, ext = os.path.splitext(file_path)
ext = ext.lower()
read_params = {}
if ext in ['.xlsx', '.xls', '.xlsm'] and sheet_name is not None:
read_params["sheet_name"] = sheet_name
if ext in ['.xlsx', '.xls', '.xlsm']:
df = pd.read_excel(file_path, **read_params)
elif ext == '.csv':
df = pd.read_csv(file_path)
elif ext == '.tsv':
df = pd.read_csv(file_path, sep='\t')
elif ext == '.json':
df = pd.read_json(file_path)
else:
return f"Unsupported file extension: {ext}"
# Apply filter
filtered_df = df.query(query)
# Return results
if filtered_df.empty:
return "No data matches the filter criteria."
return filtered_df.to_string(index=False)
except Exception as e:
return f"Error filtering data: {str(e)}"
@mcp.tool()
def pivot_table(file_path: str, index: str, columns: Optional[str] = None,
values: str = None, aggfunc: str = "mean",
sheet_name: Optional[str] = None) -> str:
"""
Create a pivot table from Excel data.
Args:
file_path: Path to the Excel file
index: Column to use as the pivot table index
columns: Optional column to use as the pivot table columns
values: Column to use as the pivot table values
aggfunc: Aggregation function ('mean', 'sum', 'count', etc.)
sheet_name: Name of the sheet to pivot (for Excel files)
Returns:
Pivot table as string
"""
try:
# Read file
_, ext = os.path.splitext(file_path)
ext = ext.lower()
read_params = {}
if ext in ['.xlsx', '.xls', '.xlsm'] and sheet_name is not None:
read_params["sheet_name"] = sheet_name
if ext in ['.xlsx', '.xls', '.xlsm']:
df = pd.read_excel(file_path, **read_params)
elif ext == '.csv':
df = pd.read_csv(file_path)
elif ext == '.tsv':
df = pd.read_csv(file_path, sep='\t')
elif ext == '.json':
df = pd.read_json(file_path)
else:
return f"Unsupported file extension: {ext}"
# Configure pivot table params
pivot_params = {"index": index}
if columns:
pivot_params["columns"] = columns
if values:
pivot_params["values"] = values
# Map string aggfunc to actual function
if aggfunc == "mean":
pivot_params["aggfunc"] = np.mean
elif aggfunc == "sum":
pivot_params["aggfunc"] = np.sum
elif aggfunc == "count":
pivot_params["aggfunc"] = len
elif aggfunc == "min":
pivot_params["aggfunc"] = np.min
elif aggfunc == "max":
pivot_params["aggfunc"] = np.max
else:
return f"Unsupported aggregation function: {aggfunc}"
# Create pivot table
pivot = pd.pivot_table(df, **pivot_params)
return pivot.to_string()
except Exception as e:
return f"Error creating pivot table: {str(e)}"
@mcp.tool()
def export_chart(file_path: str, x_column: str, y_column: str,
chart_type: str = "line", sheet_name: Optional[str] = None) -> Image:
"""
Create a chart from Excel data and return as an image.
Args:
file_path: Path to the Excel file
x_column: Column to use for x-axis
y_column: Column to use for y-axis
chart_type: Type of chart ('line', 'bar', 'scatter', 'hist')
sheet_name: Name of the sheet to chart (for Excel files)
Returns:
Chart as image
"""
import matplotlib.pyplot as plt
import seaborn as sns
try:
# Read file
_, ext = os.path.splitext(file_path)
ext = ext.lower()
read_params = {}
if ext in ['.xlsx', '.xls', '.xlsm'] and sheet_name is not None:
read_params["sheet_name"] = sheet_name
if ext in ['.xlsx', '.xls', '.xlsm']:
df = pd.read_excel(file_path, **read_params)
elif ext == '.csv':
df = pd.read_csv(file_path)
elif ext == '.tsv':
df = pd.read_csv(file_path, sep='\t')
elif ext == '.json':
df = pd.read_json(file_path)
else:
raise ValueError(f"Unsupported file extension: {ext}")
# Create chart
plt.figure(figsize=(10, 6))
if chart_type == "line":
sns.lineplot(data=df, x=x_column, y=y_column)
elif chart_type == "bar":
sns.barplot(data=df, x=x_column, y=y_column)
elif chart_type == "scatter":
sns.scatterplot(data=df, x=x_column, y=y_column)
elif chart_type == "hist":
df[y_column].hist()
plt.xlabel(y_column)
else:
raise ValueError(f"Unsupported chart type: {chart_type}")
plt.title(f"{chart_type.capitalize()} Chart: {y_column} by {x_column}")
plt.tight_layout()
# Save to bytes buffer
buf = io.BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
# Convert to Image
plt.close()
return Image(data=buf.getvalue(), format="png")
except Exception as e:
# Return error image
plt.figure(figsize=(8, 2))
plt.text(0.5, 0.5, f"Error creating chart: {str(e)}",
horizontalalignment='center', fontsize=12, color='red')
plt.axis('off')
buf = io.BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
plt.close()
return Image(data=buf.getvalue(), format="png")
@mcp.tool()
def data_summary(file_path: str, sheet_name: Optional[str] = None) -> str:
"""
Generate a comprehensive summary of the data in an Excel file.
Args:
file_path: Path to the Excel file
sheet_name: Name of the sheet to summarize (for Excel files)
Returns:
Comprehensive data summary as string
"""
try:
# Read file
_, ext = os.path.splitext(file_path)
ext = ext.lower()
read_params = {}
if ext in ['.xlsx', '.xls', '.xlsm'] and sheet_name is not None:
read_params["sheet_name"] = sheet_name
if ext in ['.xlsx', '.xls', '.xlsm']:
df = pd.read_excel(file_path, **read_params)
elif ext == '.csv':
df = pd.read_csv(file_path)
elif ext == '.tsv':
df = pd.read_csv(file_path, sep='\t')
elif ext == '.json':
df = pd.read_json(file_path)
else:
return f"Unsupported file extension: {ext}"
# Basic file info
file_info = {
"file_name": os.path.basename(file_path),
"file_type": ext,
"file_size": f"{os.path.getsize(file_path) / 1024:.2f} KB",
"last_modified": datetime.fromtimestamp(os.path.getmtime(file_path)).strftime('%Y-%m-%d %H:%M:%S')
}
# Data structure
data_structure = {
"rows": df.shape[0],
"columns": df.shape[1],
"column_names": list(df.columns),
"column_types": {col: str(dtype) for col, dtype in df.dtypes.items()},
"memory_usage": f"{df.memory_usage(deep=True).sum() / 1024:.2f} KB"
}
# Data quality
data_quality = {
"missing_values": {col: int(count) for col, count in df.isnull().sum().items()},
"missing_percentage": {col: f"{count/len(df)*100:.2f}%" for col, count in df.isnull().sum().items()},
"duplicate_rows": int(df.duplicated().sum()),
"unique_values": {col: int(df[col].nunique()) for col in df.columns}
}
# Statistical summary
numeric_cols = df.select_dtypes(include=['number']).columns
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
datetime_cols = df.select_dtypes(include=['datetime', 'datetime64']).columns
statistics = {}
if len(numeric_cols) > 0:
statistics["numeric"] = df[numeric_cols].describe().to_dict()
if len(categorical_cols) > 0:
statistics["categorical"] = {
col: {
"unique_values": int(df[col].nunique()),
"top_values": df[col].value_counts().head(5).to_dict()
} for col in categorical_cols
}
if len(datetime_cols) > 0:
statistics["datetime"] = {
col: {
"min": df[col].min().strftime('%Y-%m-%d') if pd.notna(df[col].min()) else None,
"max": df[col].max().strftime('%Y-%m-%d') if pd.notna(df[col].max()) else None,
"range_days": (df[col].max() - df[col].min()).days if pd.notna(df[col].min()) and pd.notna(df[col].max()) else None
} for col in datetime_cols
}
# Combine all info
summary = {
"file_info": file_info,
"data_structure": data_structure,
"data_quality": data_quality,
"statistics": statistics
}
return json.dumps(summary, indent=2, default=str)
except Exception as e:
return f"Error generating summary: {str(e)}"
# Add prompt templates for common Excel operations
@mcp.prompt()
def analyze_excel_data(file_path: str) -> str:
"""
Create a prompt for analyzing Excel data
"""
return f"""
I have an Excel file at {file_path} that I'd like to analyze.
Could you help me understand the data structure, perform basic statistical analysis,
and identify any patterns or insights in the data?
"""
@mcp.prompt()
def create_chart(file_path: str) -> str:
"""
Create a prompt for generating charts from Excel data
"""
return f"""
I have an Excel file at {file_path} and I want to create some visualizations.
Could you suggest some appropriate charts based on the data and help me create them?
"""
@mcp.prompt()
def data_cleaning(file_path: str) -> str:
"""
Create a prompt for cleaning and preprocessing Excel data
"""
return f"""
I have an Excel file at {file_path} that needs some cleaning and preprocessing.
Could you help me identify and fix issues like missing values, outliers,
inconsistent formatting, and other data quality problems?
"""
def main():
mcp.run()
# Main function to run server
if __name__ == "__main__":
main()