Excel MCP Server
by fish0710
Verified
import logging
import sys
import os
from typing import Any, List, Dict
from mcp.server.fastmcp import FastMCP
# Import exceptions
from excel_mcp.exceptions import (
ValidationError,
WorkbookError,
SheetError,
DataError,
FormattingError,
CalculationError,
PivotError,
ChartError
)
# Import from excel_mcp package with consistent _impl suffixes
from excel_mcp.validation import (
validate_formula_in_cell_operation as validate_formula_impl,
validate_range_in_sheet_operation as validate_range_impl
)
from excel_mcp.chart import create_chart_in_sheet as create_chart_impl
from excel_mcp.workbook import get_workbook_info
from excel_mcp.data import write_data
from excel_mcp.pivot import create_pivot_table as create_pivot_table_impl
from excel_mcp.sheet import (
copy_sheet,
delete_sheet,
rename_sheet,
merge_range,
unmerge_range,
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("excel-mcp.log")
],
force=True
)
logger = logging.getLogger("excel-mcp")
# Get Excel files path from environment or use default
EXCEL_FILES_PATH = os.environ.get("EXCEL_FILES_PATH", "./excel_files")
# Create the directory if it doesn't exist
os.makedirs(EXCEL_FILES_PATH, exist_ok=True)
# Initialize FastMCP server
mcp = FastMCP(
"excel-mcp",
version="0.1.0",
description="Excel MCP Server for manipulating Excel files",
dependencies=["openpyxl>=3.1.2"],
env_vars={
"EXCEL_FILES_PATH": {
"description": "Path to Excel files directory",
"required": False,
"default": EXCEL_FILES_PATH
}
}
)
def get_excel_path(filename: str) -> str:
"""Get full path to Excel file.
Args:
filename: Name of Excel file
Returns:
Full path to Excel file
"""
# If filename is already an absolute path, return it
if os.path.isabs(filename):
return filename
# Use the configured Excel files path
return os.path.join(EXCEL_FILES_PATH, filename)
@mcp.tool()
def apply_formula(
filepath: str,
sheet_name: str,
cell: str,
formula: str,
) -> str:
"""Apply Excel formula to cell."""
try:
full_path = get_excel_path(filepath)
# First validate the formula
validation = validate_formula_impl(full_path, sheet_name, cell, formula)
if isinstance(validation, dict) and "error" in validation:
return f"Error: {validation['error']}"
# If valid, apply the formula
from excel_mcp.calculations import apply_formula as apply_formula_impl
result = apply_formula_impl(full_path, sheet_name, cell, formula)
return result["message"]
except (ValidationError, CalculationError) as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error applying formula: {e}")
raise
@mcp.tool()
def validate_formula_syntax(
filepath: str,
sheet_name: str,
cell: str,
formula: str,
) -> str:
"""Validate Excel formula syntax without applying it."""
try:
full_path = get_excel_path(filepath)
result = validate_formula_impl(full_path, sheet_name, cell, formula)
return result["message"]
except (ValidationError, CalculationError) as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error validating formula: {e}")
raise
@mcp.tool()
def format_range(
filepath: str,
sheet_name: str,
start_cell: str,
end_cell: str = None,
bold: bool = False,
italic: bool = False,
underline: bool = False,
font_size: int = None,
font_color: str = None,
bg_color: str = None,
border_style: str = None,
border_color: str = None,
number_format: str = None,
alignment: str = None,
wrap_text: bool = False,
merge_cells: bool = False,
protection: Dict[str, Any] = None,
conditional_format: Dict[str, Any] = None
) -> str:
"""Apply formatting to a range of cells."""
try:
full_path = get_excel_path(filepath)
from excel_mcp.formatting import format_range as format_range_func
result = format_range_func(
filepath=full_path,
sheet_name=sheet_name,
start_cell=start_cell,
end_cell=end_cell,
bold=bold,
italic=italic,
underline=underline,
font_size=font_size,
font_color=font_color,
bg_color=bg_color,
border_style=border_style,
border_color=border_color,
number_format=number_format,
alignment=alignment,
wrap_text=wrap_text,
merge_cells=merge_cells,
protection=protection,
conditional_format=conditional_format
)
return "Range formatted successfully"
except (ValidationError, FormattingError) as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error formatting range: {e}")
raise
@mcp.tool()
def read_data_from_excel(
filepath: str,
sheet_name: str,
start_cell: str = "A1",
end_cell: str = None,
preview_only: bool = False
) -> str:
"""Read data from Excel worksheet."""
try:
full_path = get_excel_path(filepath)
from excel_mcp.data import read_excel_range
result = read_excel_range(full_path, sheet_name, start_cell, end_cell, preview_only)
if not result:
return "No data found in specified range"
# Convert the list of dicts to a formatted string
data_str = "\n".join([str(row) for row in result])
return data_str
except Exception as e:
logger.error(f"Error reading data: {e}")
raise
@mcp.tool()
def write_data_to_excel(
filepath: str,
sheet_name: str,
data: List[Dict],
start_cell: str = "A1",
write_headers: bool = True,
) -> str:
"""Write data to Excel worksheet."""
try:
full_path = get_excel_path(filepath)
result = write_data(full_path, sheet_name, data, start_cell, write_headers)
return result["message"]
except (ValidationError, DataError) as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error writing data: {e}")
raise
@mcp.tool()
def create_workbook(filepath: str, upload: bool = True) -> str:
"""Create new Excel workbook and optionally upload it to file server."""
try:
full_path = get_excel_path(filepath)
from excel_mcp.workbook import create_workbook as create_workbook_impl
result = create_workbook_impl(full_path, upload=upload)
if upload and "file_url" in result:
return f"Created workbook at {full_path} and uploaded to {result['file_url']}"
else:
return f"Created workbook at {full_path}"
except WorkbookError as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error creating workbook: {e}")
raise
@mcp.tool()
def create_worksheet(filepath: str, sheet_name: str) -> str:
"""Create new worksheet in workbook."""
try:
full_path = get_excel_path(filepath)
from excel_mcp.workbook import create_sheet as create_worksheet_impl
result = create_worksheet_impl(full_path, sheet_name)
return result["message"]
except (ValidationError, WorkbookError) as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error creating worksheet: {e}")
raise
@mcp.tool()
def create_chart(
filepath: str,
sheet_name: str,
data_range: str,
chart_type: str,
target_cell: str,
title: str = "",
x_axis: str = "",
y_axis: str = ""
) -> str:
"""Create chart in worksheet."""
try:
full_path = get_excel_path(filepath)
result = create_chart_impl(
filepath=full_path,
sheet_name=sheet_name,
data_range=data_range,
chart_type=chart_type,
target_cell=target_cell,
title=title,
x_axis=x_axis,
y_axis=y_axis
)
return result["message"]
except (ValidationError, ChartError) as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error creating chart: {e}")
raise
@mcp.tool()
def create_pivot_table(
filepath: str,
sheet_name: str,
data_range: str,
rows: List[str],
values: List[str],
columns: List[str] = None,
agg_func: str = "mean"
) -> str:
"""Create pivot table in worksheet."""
try:
full_path = get_excel_path(filepath)
result = create_pivot_table_impl(
filepath=full_path,
sheet_name=sheet_name,
data_range=data_range,
rows=rows,
values=values,
columns=columns or [],
agg_func=agg_func
)
return result["message"]
except (ValidationError, PivotError) as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error creating pivot table: {e}")
raise
@mcp.tool()
def copy_worksheet(
filepath: str,
source_sheet: str,
target_sheet: str
) -> str:
"""Copy worksheet within workbook."""
try:
full_path = get_excel_path(filepath)
result = copy_sheet(full_path, source_sheet, target_sheet)
return result["message"]
except (ValidationError, SheetError) as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error copying worksheet: {e}")
raise
@mcp.tool()
def delete_worksheet(
filepath: str,
sheet_name: str
) -> str:
"""Delete worksheet from workbook."""
try:
full_path = get_excel_path(filepath)
result = delete_sheet(full_path, sheet_name)
return result["message"]
except (ValidationError, SheetError) as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error deleting worksheet: {e}")
raise
@mcp.tool()
def rename_worksheet(
filepath: str,
old_name: str,
new_name: str
) -> str:
"""Rename worksheet in workbook."""
try:
full_path = get_excel_path(filepath)
result = rename_sheet(full_path, old_name, new_name)
return result["message"]
except (ValidationError, SheetError) as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error renaming worksheet: {e}")
raise
@mcp.tool()
def get_workbook_metadata(
filepath: str,
include_ranges: bool = False
) -> str:
"""Get metadata about workbook including sheets, ranges, etc."""
try:
full_path = get_excel_path(filepath)
result = get_workbook_info(full_path, include_ranges=include_ranges)
return str(result)
except WorkbookError as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error getting workbook metadata: {e}")
raise
@mcp.tool()
def merge_cells(filepath: str, sheet_name: str, start_cell: str, end_cell: str) -> str:
"""Merge a range of cells."""
try:
full_path = get_excel_path(filepath)
result = merge_range(full_path, sheet_name, start_cell, end_cell)
return result["message"]
except (ValidationError, SheetError) as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error merging cells: {e}")
raise
@mcp.tool()
def unmerge_cells(filepath: str, sheet_name: str, start_cell: str, end_cell: str) -> str:
"""Unmerge a range of cells."""
try:
full_path = get_excel_path(filepath)
result = unmerge_range(full_path, sheet_name, start_cell, end_cell)
return result["message"]
except (ValidationError, SheetError) as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error unmerging cells: {e}")
raise
@mcp.tool()
def copy_range(
filepath: str,
sheet_name: str,
source_start: str,
source_end: str,
target_start: str,
target_sheet: str = None
) -> str:
"""Copy a range of cells to another location."""
try:
full_path = get_excel_path(filepath)
from excel_mcp.sheet import copy_range_operation
result = copy_range_operation(
full_path,
sheet_name,
source_start,
source_end,
target_start,
target_sheet
)
return result["message"]
except (ValidationError, SheetError) as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error copying range: {e}")
raise
@mcp.tool()
def delete_range(
filepath: str,
sheet_name: str,
start_cell: str,
end_cell: str,
shift_direction: str = "up"
) -> str:
"""Delete a range of cells and shift remaining cells."""
try:
full_path = get_excel_path(filepath)
from excel_mcp.sheet import delete_range_operation
result = delete_range_operation(
full_path,
sheet_name,
start_cell,
end_cell,
shift_direction
)
return result["message"]
except (ValidationError, SheetError) as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error deleting range: {e}")
raise
@mcp.tool()
def validate_excel_range(
filepath: str,
sheet_name: str,
start_cell: str,
end_cell: str = None
) -> str:
"""Validate if a range exists and is properly formatted."""
try:
full_path = get_excel_path(filepath)
range_str = start_cell if not end_cell else f"{start_cell}:{end_cell}"
result = validate_range_impl(full_path, sheet_name, range_str)
return result["message"]
except ValidationError as e:
return f"Error: {str(e)}"
except Exception as e:
logger.error(f"Error validating range: {e}")
raise
@mcp.tool()
def process_excel_from_url(url: str, operation: str, operation_params: Dict[str, Any]) -> str:
"""Download Excel file from URL, process it with specified operation, and upload back to server.
Args:
url: URL of the Excel file to download and process
operation: Name of the operation to perform (e.g., 'format_range', 'apply_formula')
operation_params: Dictionary containing parameters for the specified operation
Returns:
Result message with the URL of the processed file
"""
try:
# 生成临时文件路径
import tempfile
import uuid
temp_dir = tempfile.gettempdir()
temp_filename = f"temp_excel_{uuid.uuid4()}.xlsx"
temp_filepath = os.path.join(temp_dir, temp_filename)
# 下载文件
from excel_mcp.workbook import download_file_from_url
download_file_from_url(url, temp_filepath)
logger.info(f"Downloaded file from {url} to {temp_filepath}")
# 获取文件名用于上传后的文件名
original_filename = os.path.basename(url.split('/')[-1])
processed_filename = f"processed_{original_filename}"
processed_filepath = os.path.join(EXCEL_FILES_PATH, processed_filename)
# 复制文件到Excel文件目录
import shutil
shutil.copy2(temp_filepath, processed_filepath)
# 打印operation_params
logger.info(f"operation_params: {operation_params}")
# 确保operation_params是字典类型
if isinstance(operation_params, str):
import json
try:
operation_params = json.loads(operation_params)
except json.JSONDecodeError:
logger.error(f"Invalid JSON in operation_params: {operation_params}")
raise ValueError(f"Invalid JSON format in operation_params")
# 执行指定操作
result_message = ""
if operation == "format_range":
# 替换filepath参数为处理后的文件路径
params = {k: v for k, v in operation_params.items() if k != "filepath"}
result_message = format_range(processed_filepath, **params)
elif operation == "apply_formula":
params = {k: v for k, v in operation_params.items() if k != "filepath"}
result_message = apply_formula(processed_filepath, **params)
elif operation == "write_data_to_excel":
params = {k: v for k, v in operation_params.items() if k != "filepath"}
result_message = write_data_to_excel(processed_filepath, **params)
elif operation == "create_chart":
params = {k: v for k, v in operation_params.items() if k != "filepath"}
result_message = create_chart(processed_filepath, **params)
elif operation == "create_pivot_table":
params = {k: v for k, v in operation_params.items() if k != "filepath"}
result_message = create_pivot_table(processed_filepath, **params)
else:
raise ValueError(f"Unsupported operation: {operation}")
# 上传处理后的文件
from excel_mcp.workbook import upload_file_to_server
upload_result = upload_file_to_server(processed_filepath)
# 清理临时文件
os.remove(temp_filepath)
return f"Operation '{operation}' completed successfully. Processed file available at: {upload_result['file_url']}\nOperation result: {result_message}"
except Exception as e:
logger.error(f"Error processing Excel from URL: {e}")
raise WorkbookError(f"Failed to process Excel from URL: {str(e)}")
async def run_server():
"""Run the Excel MCP server."""
try:
logger.info(f"Starting Excel MCP server (files directory: {EXCEL_FILES_PATH})")
await mcp.run_sse_async()
except KeyboardInterrupt:
logger.info("Server stopped by user")
await mcp.shutdown()
except Exception as e:
logger.error(f"Server failed: {e}")
raise
finally:
logger.info("Server shutdown complete")