Excel MCP Server

by fish0710
Verified
import logging import os from pathlib import Path from typing import Any import requests from openpyxl import Workbook, load_workbook from openpyxl.utils import get_column_letter from .exceptions import WorkbookError logger = logging.getLogger(__name__) # 静态资源服务器配置 FILE_SERVER_URL = "http://localhost:3001" UPLOAD_ENDPOINT = f"{FILE_SERVER_URL}/upload" FILES_LIST_ENDPOINT = f"{FILE_SERVER_URL}/files/list" FILE_ACCESS_BASE_URL = f"{FILE_SERVER_URL}/files/" def upload_file_to_server(filepath: str) -> dict[str, Any]: """Upload a file to the static file server Args: filepath: Path to the file to upload Returns: Dictionary with upload result information including file URL """ try: if not os.path.exists(filepath): raise WorkbookError(f"File not found: {filepath}") filename = os.path.basename(filepath) with open(filepath, 'rb') as file: files = {'file': (filename, file)} response = requests.post(UPLOAD_ENDPOINT, files=files) if response.status_code != 200: raise WorkbookError(f"Failed to upload file: {response.text}") # 构建文件访问URL file_url = f"{FILE_ACCESS_BASE_URL}{filename}" return { "message": f"File uploaded successfully", "file_url": file_url, "filename": filename } except Exception as e: logger.error(f"Failed to upload file: {e}") raise WorkbookError(f"Failed to upload file: {e!s}") def download_file_from_url(url: str, save_path: str) -> str: """Download a file from URL and save it to the specified path Args: url: URL of the file to download save_path: Path where to save the downloaded file Returns: Path to the downloaded file """ try: response = requests.get(url, stream=True) response.raise_for_status() # Create directory if it doesn't exist os.makedirs(os.path.dirname(save_path), exist_ok=True) with open(save_path, 'wb') as file: for chunk in response.iter_content(chunk_size=8192): file.write(chunk) return save_path except Exception as e: logger.error(f"Failed to download file: {e}") raise WorkbookError(f"Failed to download file: {e!s}") def create_workbook(filepath: str, sheet_name: str = "Sheet1", upload: bool = False) -> dict[str, Any]: """Create a new Excel workbook with optional custom sheet name and upload to server""" try: wb = Workbook() # Rename default sheet if "Sheet" in wb.sheetnames: sheet = wb["Sheet"] sheet.title = sheet_name else: wb.create_sheet(sheet_name) path = Path(filepath) path.parent.mkdir(parents=True, exist_ok=True) wb.save(str(path)) result = { "message": f"Created workbook: {filepath}", "active_sheet": sheet_name, "workbook": wb } # 如果需要上传文件 if upload: upload_result = upload_file_to_server(filepath) result["file_url"] = upload_result["file_url"] result["message"] = f"Created and uploaded workbook: {filepath}. URL: {upload_result['file_url']}" return result except Exception as e: logger.error(f"Failed to create workbook: {e}") raise WorkbookError(f"Failed to create workbook: {e!s}") def get_or_create_workbook(filepath: str) -> Workbook: """Get existing workbook or create new one if it doesn't exist""" try: return load_workbook(filepath) except FileNotFoundError: return create_workbook(filepath)["workbook"] def create_sheet(filepath: str, sheet_name: str) -> dict: """Create a new worksheet in the workbook if it doesn't exist.""" try: wb = load_workbook(filepath) # Check if sheet already exists if sheet_name in wb.sheetnames: raise WorkbookError(f"Sheet {sheet_name} already exists") # Create new sheet wb.create_sheet(sheet_name) wb.save(filepath) wb.close() return {"message": f"Sheet {sheet_name} created successfully"} except WorkbookError as e: logger.error(str(e)) raise except Exception as e: logger.error(f"Failed to create sheet: {e}") raise WorkbookError(str(e)) def get_workbook_info(filepath: str, include_ranges: bool = False) -> dict[str, Any]: """Get metadata about workbook including sheets, ranges, etc.""" try: path = Path(filepath) if not path.exists(): raise WorkbookError(f"File not found: {filepath}") wb = load_workbook(filepath, read_only=True) info = { "filename": path.name, "sheets": wb.sheetnames, "size": path.stat().st_size, "modified": path.stat().st_mtime } if include_ranges: # Add used ranges for each sheet ranges = {} for sheet_name in wb.sheetnames: ws = wb[sheet_name] if ws.max_row > 0 and ws.max_column > 0: ranges[sheet_name] = f"A1:{get_column_letter(ws.max_column)}{ws.max_row}" info["used_ranges"] = ranges wb.close() return info except WorkbookError as e: logger.error(str(e)) raise except Exception as e: logger.error(f"Failed to get workbook info: {e}") raise WorkbookError(str(e))