Excel MCP Server

by fish0710
Verified
from pathlib import Path from typing import Any import logging from openpyxl import load_workbook from openpyxl.styles import Font from openpyxl.worksheet.worksheet import Worksheet from openpyxl.utils import get_column_letter from .exceptions import DataError from .cell_utils import parse_cell_range logger = logging.getLogger(__name__) def read_excel_range( filepath: Path | str, sheet_name: str, start_cell: str = "A1", end_cell: str | None = None, preview_only: bool = False ) -> list[dict[str, Any]]: """Read data from Excel range with optional preview mode""" try: wb = load_workbook(filepath, read_only=True) if sheet_name not in wb.sheetnames: raise DataError(f"Sheet '{sheet_name}' not found") ws = wb[sheet_name] # Parse start cell if ':' in start_cell: start_cell, end_cell = start_cell.split(':') # Get start coordinates try: start_coords = parse_cell_range(f"{start_cell}:{start_cell}") if not start_coords or not all(coord is not None for coord in start_coords[:2]): raise DataError(f"Invalid start cell reference: {start_cell}") start_row, start_col = start_coords[0], start_coords[1] except ValueError as e: raise DataError(f"Invalid start cell format: {str(e)}") # Determine end coordinates if end_cell: try: end_coords = parse_cell_range(f"{end_cell}:{end_cell}") if not end_coords or not all(coord is not None for coord in end_coords[:2]): raise DataError(f"Invalid end cell reference: {end_cell}") end_row, end_col = end_coords[0], end_coords[1] except ValueError as e: raise DataError(f"Invalid end cell format: {str(e)}") else: # For single cell, use same coordinates end_row, end_col = start_row, start_col # Validate range bounds if start_row > ws.max_row or start_col > ws.max_column: raise DataError( f"Start cell out of bounds. Sheet dimensions are " f"A1:{get_column_letter(ws.max_column)}{ws.max_row}" ) data = [] # If it's a single cell or single row, just read the values directly if start_row == end_row: row_data = {} for col in range(start_col, end_col + 1): cell = ws.cell(row=start_row, column=col) col_name = f"Column_{col}" row_data[col_name] = cell.value if any(v is not None for v in row_data.values()): data.append(row_data) else: # Multiple rows - use header row headers = [] for col in range(start_col, end_col + 1): cell_value = ws.cell(row=start_row, column=col).value headers.append(str(cell_value) if cell_value is not None else f"Column_{col}") # Get data rows max_rows = min(start_row + 5, end_row) if preview_only else end_row for row in range(start_row + 1, max_rows + 1): row_data = {} for col, header in enumerate(headers, start=start_col): cell = ws.cell(row=row, column=col) row_data[header] = cell.value if any(v is not None for v in row_data.values()): data.append(row_data) wb.close() return data except DataError as e: logger.error(str(e)) raise except Exception as e: logger.error(f"Failed to read Excel range: {e}") raise DataError(str(e)) def write_data( filepath: str, sheet_name: str | None, data: list[dict[str, Any]] | None, start_cell: str = "A1", write_headers: bool = True, ) -> dict[str, str]: """Write data to Excel sheet with workbook handling""" try: if not data: raise DataError("No data provided to write") wb = load_workbook(filepath) # If no sheet specified, use active sheet if not sheet_name: sheet_name = wb.active.title elif sheet_name not in wb.sheetnames: wb.create_sheet(sheet_name) ws = wb[sheet_name] # Validate start cell try: start_coords = parse_cell_range(start_cell) if not start_coords or not all(coord is not None for coord in start_coords[:2]): raise DataError(f"Invalid start cell reference: {start_cell}") except ValueError as e: raise DataError(f"Invalid start cell format: {str(e)}") if len(data) > 0: # Check if first row of data contains headers first_row = data[0] has_headers = all( isinstance(value, str) and value.strip() == key.strip() for key, value in first_row.items() ) # If first row contains headers, skip it when write_headers is True if has_headers and write_headers: data = data[1:] _write_data_to_worksheet(ws, data, start_cell, write_headers) wb.save(filepath) wb.close() return {"message": f"Data written to {sheet_name}", "active_sheet": sheet_name} except DataError as e: logger.error(str(e)) raise except Exception as e: logger.error(f"Failed to write data: {e}") raise DataError(str(e)) def _write_data_to_worksheet( worksheet: Worksheet, data: list[dict[str, Any]], start_cell: str = "A1", write_headers: bool = True, ) -> None: """Write data to worksheet - internal helper function""" try: if not data: raise DataError("No data provided to write") try: start_coords = parse_cell_range(start_cell) if not start_coords or not all(x is not None for x in start_coords[:2]): raise DataError(f"Invalid start cell reference: {start_cell}") start_row, start_col = start_coords[0], start_coords[1] except ValueError as e: raise DataError(f"Invalid start cell format: {str(e)}") # Validate data structure if not all(isinstance(row, dict) for row in data): raise DataError("All data rows must be dictionaries") # Write headers if requested headers = list(data[0].keys()) if write_headers: for i, header in enumerate(headers): cell = worksheet.cell(row=start_row, column=start_col + i) cell.value = header cell.font = Font(bold=True) start_row += 1 # Move start row down if headers were written # Write data for i, row_dict in enumerate(data): if not all(h in row_dict for h in headers): raise DataError(f"Row {i+1} is missing required headers") for j, header in enumerate(headers): cell = worksheet.cell(row=start_row + i, column=start_col + j) cell.value = row_dict.get(header, "") except DataError as e: logger.error(str(e)) raise except Exception as e: logger.error(f"Failed to write worksheet data: {e}") raise DataError(str(e))