Skip to main content
Glama
operations.py68.2 kB
"""Excel operations for the MCP filesystem server.""" import json import os import shutil import time from datetime import datetime from functools import partial from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple import anyio from mcp.server.fastmcp.utilities.logging import get_logger from openpyxl import Workbook, load_workbook from openpyxl.chart import BarChart, LineChart, PieChart, Reference, Series, ScatterChart from openpyxl.formula import Tokenizer from openpyxl.styles import Alignment, Border, Font, PatternFill, Side from openpyxl.utils import get_column_letter, column_index_from_string, range_boundaries from openpyxl.utils import ( column_index_from_string, get_column_letter, range_boundaries, ) from ..security import PathValidator from ..operations import format_timestamp from .converters import read_csv, write_csv from .utils import format_read_result, format_size, normalize_output_format logger = get_logger(__name__) # Force INFO level logging for debugging import logging logger.setLevel(logging.INFO) class ExcelOperations: """Excel file operations with path validation.""" def __init__( self, validator: PathValidator, config: Optional[Dict[str, Any]] = None, ): self.validator = validator config = config or {} self.default_max_rows: int = int( os.environ.get("MCP_EXCEL_MAX_ROWS", config.get("default_max_rows", 1000)) ) self.max_file_size_mb: float = float( os.environ.get("MCP_EXCEL_MAX_SIZE_MB", config.get("max_file_size_mb", 50)) ) self.supported_formats: List[str] = [ fmt.lower() for fmt in config.get("supported_formats", [".xlsx", ".xls", ".csv"]) ] self.enable_formulas: bool = bool(config.get("enable_formulas", True)) self._project_root = Path(__file__).resolve().parent.parent.parent self.templates_config_path: Optional[Path] = None self.templates: List[Dict[str, Any]] = [] self._template_error: Optional[str] = None self._load_templates_config(config) async def read_excel( self, path: str, sheet: Optional[str] = None, range_str: Optional[str] = None, max_rows: Optional[int] = None, output_format: str = "markdown", ) -> Dict[str, Any]: """Read an Excel file and return structured data. Args: path: Virtual path string to the Excel file (NOT a Path object) sheet: Sheet name (None or empty string means use first sheet) range_str: Cell range (e.g., "A1:C10") or None for entire sheet """ # Normalize sheet parameter: empty string -> None if sheet == "": sheet = None stage_start = time.perf_counter() def _log_stage(label: str) -> None: nonlocal stage_start now = time.perf_counter() logger.info(f"read_excel: {label} took {now - stage_start:.3f}s") stage_start = now logger.info(f"read_excel: Starting to read Excel file: path={path}, sheet={sheet}, range={range_str}, max_rows={max_rows}, output_format={output_format}") if not isinstance(path, str): error_msg = f"read_excel expects str (virtual path), got {type(path).__name__}" logger.error(f"read_excel: {error_msg}") raise TypeError(error_msg) try: logger.info(f"read_excel: Validating Excel path: {path}") abs_path = await self._validated_excel_path(path) logger.info(f"read_excel: Validated path: {abs_path}") _log_stage("path validation") except Exception as e: logger.error(f"read_excel: Path validation failed for '{path}': {type(e).__name__}: {e}", exc_info=True) raise try: logger.info(f"read_excel: Checking file size: {abs_path}") self._check_file_size(abs_path) logger.info(f"read_excel: File size check passed") _log_stage("size check") except Exception as e: logger.error(f"read_excel: File size check failed for '{path}': {type(e).__name__}: {e}", exc_info=True) raise if abs_path.suffix.lower() == ".csv": return await self._read_csv_file(abs_path, sheet, range_str, max_rows, output_format) # 先读取值(data_only=True),获取计算结果 wb_values = None ws_values = None try: logger.info(f"Loading workbook with data_only=True: {abs_path}") wb_values = await self._load_workbook(abs_path, read_only=True, data_only=True) logger.info(f"Workbook loaded, available sheets: {wb_values.sheetnames}") ws_values = self._get_sheet(wb_values, sheet) logger.info(f"Successfully loaded workbook with data_only=True, sheet: {ws_values.title}") _log_stage("load workbook data_only=True") except Exception as e: logger.warning(f"Failed to load workbook with data_only=True: {type(e).__name__}: {e}", exc_info=True) # 如果失败,回退到 data_only=False try: logger.info(f"Falling back to data_only=False: {abs_path}") wb_values = await self._load_workbook(abs_path, read_only=True, data_only=False) logger.info(f"Workbook loaded (data_only=False), available sheets: {wb_values.sheetnames}") ws_values = self._get_sheet(wb_values, sheet) logger.info(f"Successfully loaded workbook with data_only=False, sheet: {ws_values.title}") _log_stage("load workbook data_only=False (fallback)") except Exception as e2: logger.error(f"Failed to load workbook even with data_only=False: {type(e2).__name__}: {e2}", exc_info=True) raise try: min_row, max_row, min_col, max_col = self._parse_range(ws_values, range_str) total_rows = max(0, max_row - min_row + 1) logger.debug(f"Parsed range: min_row={min_row}, max_row={max_row}, min_col={min_col}, max_col={max_col}, total_rows={total_rows}") _log_stage("parse range") row_limit = max_rows or self.default_max_rows row_limit = row_limit if row_limit > 0 else self.default_max_rows rows: List[List[Any]] = [] truncated = False iter_start = time.perf_counter() # 读取公式(data_only=False)仅在行数不大于 100 且启用了公式模式时进行 wb_formulas = None ws_formulas = None use_separate_formulas = False formula_enabled = self.enable_formulas and total_rows <= 100 if formula_enabled: try: logger.info( "Attempting to load workbook with data_only=False for formulas: %s (total_rows=%d)", abs_path, total_rows, ) wb_formulas = await self._load_workbook(abs_path, read_only=True, data_only=False) ws_formulas = self._get_sheet(wb_formulas, sheet) use_separate_formulas = True logger.info( "Successfully loaded separate workbook for formulas, sheet: %s", ws_formulas.title, ) _log_stage("load workbook for formulas (data_only=False)") except Exception as e: logger.warning( "Failed to load separate workbook for formulas (will use values workbook): %s: %s", type(e).__name__, e, exc_info=True, ) wb_formulas = wb_values ws_formulas = ws_values use_separate_formulas = False logger.info("Using values workbook for formulas as fallback") else: wb_formulas = wb_values ws_formulas = ws_values use_separate_formulas = False reason = "enable_formulas=False" if not self.enable_formulas else f"row_count_exceeds_limit({total_rows}>100)" logger.info("Formula workbook loading skipped (%s)", reason) values_iter = ws_values.iter_rows( min_row=min_row, max_row=max_row, min_col=min_col, max_col=max_col, values_only=True, ) if use_separate_formulas: formula_iter = ws_formulas.iter_rows( min_row=min_row, max_row=max_row, min_col=min_col, max_col=max_col, values_only=False, ) for row_idx, (val_row, formula_row) in enumerate(zip(values_iter, formula_iter), start=1): if row_idx > row_limit: truncated = True break row_data: List[Any] = [] for val_cell, formula_cell in zip(val_row, formula_row): if val_cell is None and getattr(formula_cell, "data_type", "") == "f" and formula_cell.value: row_data.append(formula_cell.value) else: row_data.append(val_cell) rows.append(row_data) else: for row_idx, val_row in enumerate(values_iter, start=1): if row_idx > row_limit: truncated = True break rows.append(list(val_row)) result: Dict[str, Any] = { "sheet": ws_values.title, "sheets": wb_values.sheetnames, # 添加所有工作表列表 "rows": rows, "total_rows": total_rows, "returned_rows": len(rows), "truncated": truncated, } logger.info( "read_excel: iterated rows %d/%d (truncated=%s) in %.3fs", len(rows), total_rows, truncated, time.perf_counter() - iter_start, ) return result finally: # 确保关闭 workbook try: if wb_values is not None: wb_values.close() logger.info("Closed values workbook") except Exception as e: logger.warning(f"Error closing values workbook: {e}") try: if use_separate_formulas and wb_formulas is not None and wb_formulas != wb_values: wb_formulas.close() logger.info("Closed formulas workbook") except Exception as e: logger.warning(f"Error closing formulas workbook: {e}") # 如果没有使用独立的 formulas workbook,只需要关闭一个 if not use_separate_formulas and wb_formulas is not None and wb_formulas != wb_values: try: wb_formulas.close() logger.info("Closed workbook") except Exception as e: logger.warning(f"Error closing workbook: {e}") async def write_excel( self, path: str, data: List[List[Any]], sheet: str = "Sheet1", overwrite: bool = False, ) -> Dict[str, Any]: """Create or overwrite an Excel file. Args: path: Virtual path string to the Excel file (NOT a Path object) data: 2D array of data. If you want headers, include them as the first row. """ if not isinstance(path, str): raise TypeError(f"write_excel expects str (virtual path), got {type(path).__name__}") abs_path = await self._validated_excel_path(path) if abs_path.exists() and not overwrite: raise FileExistsError(f"File already exists: {path}") if not abs_path.parent.exists(): await anyio.to_thread.run_sync(lambda: abs_path.parent.mkdir(parents=True, exist_ok=True)) wb = Workbook() ws = wb.active ws.title = sheet for row in data: ws.append(row) await anyio.to_thread.run_sync(wb.save, abs_path) return { "success": True, "path": self.validator.real_to_virtual(abs_path), "sheet": sheet, "rows_written": len(data), "message": "Excel file created successfully", } async def update_excel_cells( self, path: str, updates: List[Dict[str, Any]], sheet: Optional[str] = None, create_if_missing: bool = False, ) -> Dict[str, Any]: """Update specific cells or ranges in an Excel file.""" abs_path, allowed = await self.validator.validate_path(path) if not allowed: raise ValueError(f"Path outside allowed directories: {path}") if not abs_path.exists(): if not create_if_missing: raise FileNotFoundError(f"File not found: {path}") wb = Workbook() else: wb = await self._load_workbook(abs_path, read_only=False, data_only=False) if sheet: ws = wb[sheet] if sheet in wb.sheetnames else wb.create_sheet(title=sheet) else: ws = wb.active updated_cells = 0 for update in updates: updated_cells += self._apply_update(ws, update) await anyio.to_thread.run_sync(wb.save, abs_path) return { "success": True, "path": self.validator.real_to_virtual(abs_path), "updated_cells": updated_cells, "message": "Cells updated successfully", } async def list_sheets(self, path: str) -> Dict[str, Any]: """List worksheets with basic statistics.""" abs_path = await self._validated_excel_path(path, must_exist=True) if self._is_csv(abs_path): rows = await read_csv(abs_path) rows_count = len(rows) cols_count = max((len(r) for r in rows), default=0) sheet_info = { "name": abs_path.stem or "Sheet1", "index": 0, "rows": rows_count, "columns": cols_count, "is_active": True, } return { "path": self.validator.real_to_virtual(abs_path), "sheets": [sheet_info], "total_sheets": 1, } wb = await self._load_workbook(abs_path, read_only=True, data_only=True) sheets: List[Dict[str, Any]] = [] active_title = wb.active.title for idx, sheet_name in enumerate(wb.sheetnames): ws = wb[sheet_name] sheets.append( { "name": sheet_name, "index": idx, "rows": ws.max_row, "columns": ws.max_column, "is_active": sheet_name == active_title, } ) return { "path": self.validator.real_to_virtual(abs_path), "sheets": sheets, "total_sheets": len(sheets), } async def get_excel_info(self, path: str) -> Dict[str, Any]: """Get metadata for an Excel file.""" abs_path = await self._validated_excel_path(path, must_exist=True) self._check_file_size(abs_path) stat = abs_path.stat() if self._is_csv(abs_path): rows = await read_csv(abs_path) rows_count = len(rows) cols_count = max((len(r) for r in rows), default=0) return { "path": self.validator.real_to_virtual(abs_path), "file_size": format_size(stat.st_size), "created_time": format_timestamp(stat.st_ctime), "modified_time": format_timestamp(stat.st_mtime), "sheets": [ {"name": abs_path.stem or "Sheet1", "rows": rows_count, "columns": cols_count} ], "total_sheets": 1, "has_formulas": False, } wb = await self._load_workbook(abs_path, read_only=True, data_only=False) sheets_info = [] has_formulas = False formula_scan_limit = 2000 scanned_cells = 0 for ws in wb.worksheets: sheets_info.append( { "name": ws.title, "rows": ws.max_row, "columns": ws.max_column, } ) if not has_formulas and self.enable_formulas: for row in ws.iter_rows(values_only=False): for cell in row: scanned_cells += 1 if cell.data_type == "f": has_formulas = True break if has_formulas or scanned_cells >= formula_scan_limit: break if has_formulas or scanned_cells >= formula_scan_limit: break return { "path": self.validator.real_to_virtual(abs_path), "file_size": format_size(stat.st_size), "created_time": format_timestamp(stat.st_ctime), "modified_time": format_timestamp(stat.st_mtime), "sheets": sheets_info, "total_sheets": len(sheets_info), "has_formulas": has_formulas, } async def excel_to_csv( self, path: str, sheet: Optional[str] = None, output_path: Optional[str] = None, delimiter: str = ",", encoding: str = "utf-8", ) -> Dict[str, Any]: """Convert an Excel sheet to CSV.""" abs_path = await self._validated_excel_path(path, must_exist=True) self._check_file_size(abs_path) if self._is_csv(abs_path): raise ValueError("Input file is already a CSV file") wb = await self._load_workbook(abs_path, read_only=True, data_only=True) ws = self._get_sheet(wb, sheet) target_path = await self._resolve_output_path( source_path=abs_path, provided_output=output_path, default_suffix=".csv" ) rows = ws.iter_rows(values_only=True) await write_csv(target_path, rows, delimiter=delimiter, encoding=encoding) return { "success": True, "path": self.validator.real_to_virtual(abs_path), "output_path": self.validator.real_to_virtual(target_path), "sheet": ws.title, "message": "Excel exported to CSV successfully", } async def csv_to_excel( self, path: str, output_path: Optional[str] = None, sheet: str = "Sheet1", delimiter: str = ",", encoding: str = "utf-8", overwrite: bool = False, ) -> Dict[str, Any]: """Convert a CSV file to Excel.""" abs_path, allowed = await self.validator.validate_path(path) if not allowed: raise ValueError(f"Path outside allowed directories: {path}") if not abs_path.exists(): raise FileNotFoundError(f"CSV file not found: {path}") if not self._is_csv(abs_path): raise ValueError("csv_to_excel expects a CSV input file") target_path = await self._resolve_output_path( source_path=abs_path, provided_output=output_path, default_suffix=".xlsx" ) if target_path.exists() and not overwrite: raise FileExistsError(f"File already exists: {target_path}") rows = await read_csv(abs_path, delimiter=delimiter, encoding=encoding) wb = Workbook() ws = wb.active ws.title = sheet for row in rows: ws.append(row) await anyio.to_thread.run_sync(wb.save, target_path) return { "success": True, "path": self.validator.real_to_virtual(abs_path), "output_path": self.validator.real_to_virtual(target_path), "sheet": sheet, "rows_written": len(rows), "message": "CSV converted to Excel successfully", } async def list_templates(self) -> Dict[str, Any]: """List configured Excel templates (title + description only).""" self._ensure_templates_available() templates = [{"title": tpl["title"], "desc": tpl.get("desc", "")} for tpl in self.templates] return {"success": True, "templates": templates, "count": len(templates)} async def create_from_template( self, template_title: str, directory: Optional[str] = None, file_name: Optional[str] = None, ) -> Dict[str, Any]: """Create a new Excel file from a configured template.""" self._ensure_templates_available() template = self._get_template_by_title(template_title) source_path: Path = template["source"] if not source_path.exists(): raise FileNotFoundError(f"Template source not found: {self._friendly_path(source_path)}") self._ensure_supported_extension(source_path) target_path = await self._resolve_template_target_path( directory=directory, file_name=file_name, template_source=source_path, template_title=template["title"], ) unique_target = await self._make_unique_path(target_path) if not unique_target.parent.exists(): await anyio.to_thread.run_sync(lambda: unique_target.parent.mkdir(parents=True, exist_ok=True)) await anyio.to_thread.run_sync(shutil.copy2, source_path, unique_target) # Convert to virtual path virtual_path = self.validator.real_to_virtual(unique_target) # Debug logging logger.debug(f"create_from_template: real_path={unique_target}, virtual_root={self.validator.virtual_root}, virtual_path={virtual_path}") return { "success": True, "path": virtual_path, "file_name": unique_target.name, "message": "Excel file created from template", } # ---------- Helpers ---------- def _load_templates_config(self, config: Dict[str, Any]) -> None: """Load template definitions from config or templates.json.""" templates_data = config.get("templates") templates_file = config.get("templates_file") templates_path: Optional[Path] = None if templates_file: try: templates_path = self._resolve_config_path(templates_file, self._project_root) except Exception as e: self._template_error = f"Invalid templates_file path: {e}" logger.warning(self._template_error) return else: templates_path = self._project_root / "excel_templates" / "templates.json" self.templates_config_path = templates_path if templates_data is None: if templates_path and templates_path.exists(): try: with open(templates_path, "r", encoding="utf-8") as f: templates_data = json.load(f) except Exception as e: self._template_error = f"Failed to load Excel templates: {e}" logger.warning(self._template_error) return else: self.templates = [] self._template_error = ( f"No Excel templates configured (expected {self._friendly_path(templates_path)})" ) return try: self.templates = self._normalize_templates(templates_data, templates_path) self._template_error = None if self.templates: logger.info( "Loaded %d Excel templates from %s", len(self.templates), self._friendly_path(templates_path) if templates_path else "config", ) except Exception as e: self._template_error = str(e) self.templates = [] def _normalize_templates( self, templates_data: Any, config_path: Optional[Path] ) -> List[Dict[str, Any]]: """Normalize template entries and resolve paths.""" if templates_data is None: return [] if not isinstance(templates_data, list): raise ValueError("Excel templates config must be a list") base_dir = config_path.parent if config_path else self._project_root normalized: List[Dict[str, Any]] = [] seen_titles = set() for idx, entry in enumerate(templates_data): if not isinstance(entry, dict): raise ValueError(f"Template entry at index {idx} must be an object") title = str(entry.get("title") or "").strip() desc = str(entry.get("desc") or "").strip() source_value = entry.get("source") if not title: raise ValueError(f"Template at index {idx} is missing 'title'") if not source_value: raise ValueError(f"Template '{title}' is missing 'source'") if title in seen_titles: raise ValueError(f"Duplicate template title: {title}") seen_titles.add(title) source_path = self._resolve_config_path(source_value, base_dir) if not source_path.exists(): logger.warning("Excel template source not found: %s", self._friendly_path(source_path)) normalized.append({"title": title, "desc": desc, "source": source_path}) return normalized def _resolve_config_path(self, path_value: Any, base_dir: Optional[Path]) -> Path: """Resolve paths from config, supporting relative paths to the base directory.""" candidate = Path(str(path_value)) if str(candidate).startswith(("http://", "https://")): raise ValueError("Remote template sources are not supported; provide a local file path") if not candidate.is_absolute(): candidate = (base_dir or self._project_root) / candidate return candidate.expanduser().resolve() def _friendly_path(self, path: Optional[Path]) -> str: """Return path relative to project root when possible.""" if path is None: return "" try: return str(path.relative_to(self._project_root)) except Exception: return str(path) def _ensure_templates_available(self) -> None: if self._template_error: raise ValueError(self._template_error) if not self.templates: hint = "No Excel templates configured" if self.templates_config_path: hint = f"No Excel templates configured (expected {self._friendly_path(self.templates_config_path)})" raise ValueError(hint) def _get_template_by_title(self, template_title: str) -> Dict[str, Any]: normalized = (template_title or "").strip() for tpl in self.templates: if tpl.get("title") == normalized: return tpl raise ValueError(f"Template not found: {template_title}") async def _resolve_template_target_path( self, directory: Optional[str], file_name: Optional[str], template_source: Path, template_title: str, ) -> Path: name_source = (file_name or template_title or "").strip() if not name_source: raise ValueError("file_name cannot be empty") logger.info(f"_resolve_template_target_path: directory={directory}, file_name={file_name}, template_title={template_title}") # Build virtual path string (not real Path object) if directory: # Validate directory first dir_path, allowed = await self.validator.validate_path(directory) if not allowed: raise ValueError(f"Path outside allowed directories: {directory}") # Convert to virtual path if self.validator.virtual_root: try: dir_virtual = self.validator.real_to_virtual(dir_path) except ValueError: # If conversion fails, use the directory as-is (might be already virtual) dir_virtual = directory else: dir_virtual = str(dir_path) # Build virtual path string virtual_path_str = f"{dir_virtual.rstrip('/')}/{name_source}" else: # Use root virtual path virtual_path_str = f"/{name_source}" logger.info(f"_resolve_template_target_path: Constructed virtual_path_str={virtual_path_str}") # Add suffix if missing if not Path(virtual_path_str).suffix: suffix = template_source.suffix or ".xlsx" virtual_path_str = virtual_path_str + suffix logger.info(f"_resolve_template_target_path: Added suffix, new virtual_path_str={virtual_path_str}") # Validate and convert to real path target_path, allowed = await self.validator.validate_path(virtual_path_str) if not allowed: raise ValueError(f"Path outside allowed directories: {virtual_path_str}") self._ensure_supported_extension(target_path) logger.info(f"_resolve_template_target_path: Final target_path={target_path}") return target_path async def _resolve_target_directory(self, directory: Optional[str]) -> Path: if directory: dir_path, allowed = await self.validator.validate_path(directory) if not allowed: raise ValueError(f"Path outside allowed directories: {directory}") else: dir_path = self._default_directory() if not dir_path.exists(): await anyio.to_thread.run_sync(lambda: dir_path.mkdir(parents=True, exist_ok=True)) if not dir_path.is_dir(): raise NotADirectoryError(f"Target directory is not a folder: {dir_path}") return dir_path def _default_directory(self) -> Path: if self.validator.virtual_root: return self.validator.virtual_root if self.validator.allowed_dirs: sorted_dirs = sorted(self.validator.allowed_dirs) if sorted_dirs: return Path(sorted_dirs[0]) raise ValueError("No allowed directories available for template creation") async def _make_unique_path(self, path: Path) -> Path: """Return a non-conflicting path by appending (N) if needed. Note: This method expects path to already be a validated REAL path, not a virtual path that needs conversion. """ # Path is already a real path from _resolve_template_target_path # Do NOT call validate_path again as it will treat it as virtual path # and cause path duplication! # Just verify it's within allowed directories abs_path = path.absolute() normalized = self.validator._normalize_case(str(abs_path)) is_allowed = any(normalized.startswith(allowed_dir) for allowed_dir in self.validator.allowed_dirs) if not is_allowed: raise ValueError(f"Path outside allowed directories: {path}") logger.info(f"_make_unique_path: input path={path}, abs_path={abs_path}") if not abs_path.exists(): logger.info(f"_make_unique_path: Path doesn't exist, returning as-is") return abs_path # File exists, find a unique name stem = abs_path.stem suffix = abs_path.suffix parent = abs_path.parent counter = 1 while True: candidate = parent / f"{stem}({counter}){suffix}" # Again, don't call validate_path, just check if it exists if not candidate.exists(): logger.info(f"_make_unique_path: Found unique name: {candidate}") return candidate counter += 1 if counter > 1000: # Safety limit raise ValueError(f"Could not find unique path after 1000 attempts") async def _validated_excel_path(self, path: str, must_exist: bool = False) -> Path: """Validate Excel file path. Args: path: Virtual path string (NOT a Path object) """ logger.debug(f"_validated_excel_path: Validating path: {path}, must_exist={must_exist}") if not isinstance(path, str): error_msg = ( f"_validated_excel_path expects str (virtual path), got {type(path).__name__}. " "Do not pass Path objects - use virtual path strings like '/file.xlsx'" ) logger.error(f"_validated_excel_path: {error_msg}") raise TypeError(error_msg) try: abs_path, allowed = await self.validator.validate_path(path) logger.debug(f"_validated_excel_path: Validation result - abs_path={abs_path}, allowed={allowed}") if not allowed: error_msg = f"Path outside allowed directories: {path}" logger.error(f"_validated_excel_path: {error_msg}") raise ValueError(error_msg) if must_exist and not abs_path.exists(): error_msg = f"File not found: {path} (real path: {abs_path})" logger.error(f"_validated_excel_path: {error_msg}") raise FileNotFoundError(error_msg) logger.debug(f"_validated_excel_path: Checking file extension: {abs_path}") self._ensure_supported_extension(abs_path) logger.debug(f"_validated_excel_path: Path validation successful: {abs_path}") return abs_path except Exception as e: logger.error(f"_validated_excel_path: Validation failed for '{path}': {type(e).__name__}: {e}", exc_info=True) raise async def _load_workbook( self, path: Path, read_only: bool = True, data_only: bool = True ): return await anyio.to_thread.run_sync( partial(load_workbook, path, read_only=read_only, data_only=data_only) ) def _get_sheet(self, wb, sheet: Optional[str]): """Get a worksheet from workbook. Args: wb: Workbook object sheet: Sheet name (None or empty string means use first sheet) Returns: Worksheet object """ # Normalize sheet parameter: empty string -> None if sheet == "": sheet = None logger.debug(f"_get_sheet: Requested sheet: {sheet}, Available sheets: {wb.sheetnames}") if sheet: if sheet not in wb.sheetnames: available_sheets = ", ".join(wb.sheetnames) if wb.sheetnames else "(no sheets)" error_msg = ( f"Sheet '{sheet}' not found in workbook. " f"Available sheets: {available_sheets}" ) logger.error(f"_get_sheet: {error_msg}") raise ValueError(error_msg) logger.debug(f"_get_sheet: Returning sheet: {sheet}") return wb[sheet] # No sheet specified, use first sheet if not wb.sheetnames: error_msg = "Workbook has no sheets" logger.error(f"_get_sheet: {error_msg}") raise ValueError(error_msg) first_sheet_name = wb.sheetnames[0] logger.info(f"_get_sheet: No sheet specified, using first sheet: {first_sheet_name}") return wb[first_sheet_name] def _parse_range(self, ws, range_str: Optional[str]) -> Tuple[int, int, int, int]: """Parse a range string and return (min_row, max_row, min_col, max_col).""" max_row = ws.max_row or 1 max_col = ws.max_column or 1 return self._parse_range_bounds(max_row, max_col, range_str) def _parse_range_bounds( self, max_row: int, max_col: int, range_str: Optional[str] ) -> Tuple[int, int, int, int]: """Parse a range string against provided bounds.""" max_row = max_row or 1 max_col = max_col or 1 if not range_str: return 1, max_row, 1, max_col try: min_col, min_row, max_col_parsed, max_row_parsed = range_boundaries(range_str) return min_row, max_row_parsed, min_col, max_col_parsed except ValueError: pass # Handle column-only or row-only ranges if ":" in range_str: start, end = range_str.split(":", 1) if start.isdigit() and end.isdigit(): min_row = int(start) max_row = int(end) return min_row, max_row, 1, max_col min_col = column_index_from_string(start) max_col = column_index_from_string(end) return 1, max_row, min_col, max_col # Single column without row numbers if range_str.isdigit(): row_idx = int(range_str) return row_idx, row_idx, 1, max_col col_idx = column_index_from_string(range_str) return 1, max_row, col_idx, col_idx def _check_file_size(self, path: Path) -> None: """Validate file size against configured limit.""" if not path.exists(): return max_bytes = self.max_file_size_mb * 1024 * 1024 size = path.stat().st_size if size > max_bytes: raise ValueError( f"File too large ({format_size(size)}), limit is {self.max_file_size_mb} MB" ) def _ensure_supported_extension(self, path: Path) -> None: suffix = path.suffix.lower() if suffix and self.supported_formats and suffix not in self.supported_formats: raise ValueError(f"Unsupported file format: {suffix}") async def _resolve_output_path( self, source_path: Path, provided_output: Optional[str], default_suffix: str ) -> Path: """Resolve output path for conversion operations. Args: source_path: Source file path (already a validated REAL path) provided_output: User-provided output path (virtual path string) or None default_suffix: Default suffix to use if no output path provided Returns: Real path for output file """ if provided_output: # User provided output path as virtual path string output_path, allowed = await self.validator.validate_path(provided_output) if not allowed: raise ValueError(f"Output path outside allowed directories: {provided_output}") else: # Generate default output path from source path # source_path is already a real path, so just change suffix output_path = source_path.with_suffix(default_suffix) # Verify the generated path is within allowed directories # Do NOT call validate_path as it would treat real path as virtual abs_path = output_path.absolute() normalized = self.validator._normalize_case(str(abs_path)) is_allowed = any(normalized.startswith(allowed_dir) for allowed_dir in self.validator.allowed_dirs) if not is_allowed: raise ValueError(f"Output path outside allowed directories: {output_path}") logger.info(f"_resolve_output_path: Generated default output_path={output_path}") if not output_path.parent.exists(): await anyio.to_thread.run_sync(lambda: output_path.parent.mkdir(parents=True, exist_ok=True)) return output_path def _apply_update(self, ws, update: Dict[str, Any]) -> int: """Apply a single update instruction to a worksheet.""" # Support both "cell" and "range" parameter names for compatibility cell_ref = update.get("cell") or update.get("range") if not cell_ref: raise ValueError("Missing 'cell' or 'range' in update instruction") value = update.get("value") formula = update.get("formula") if ":" in cell_ref: min_row, max_row, min_col, max_col = self._parse_range(ws, cell_ref) rows_count = max_row - min_row + 1 cols_count = max_col - min_col + 1 values_matrix = self._expand_values(value, rows_count, cols_count, formula) for row_offset, row_values in enumerate(values_matrix): for col_offset, cell_value in enumerate(row_values): target_row = min_row + row_offset target_col = min_col + col_offset cell_coord = f"{get_column_letter(target_col)}{target_row}" ws[cell_coord] = cell_value return rows_count * cols_count if formula: ws[cell_ref] = formula else: ws[cell_ref] = value return 1 def _expand_values( self, value: Any, rows: int, cols: int, formula: Optional[str], ) -> List[List[Any]]: """Expand the provided value into a matrix matching the target range.""" if formula is not None: return [[formula for _ in range(cols)] for _ in range(rows)] if isinstance(value, list): # List of lists if value and isinstance(value[0], list): normalized: List[List[Any]] = [] for row in value: # Pad or trim to expected width padded = list(row[:cols]) + [None] * max(0, cols - len(row)) normalized.append(padded[:cols]) # Pad missing rows while len(normalized) < rows: normalized.append([None] * cols) return normalized[:rows] # Flat list - fill row-major order flattened = list(value) normalized: List[List[Any]] = [] idx = 0 for _ in range(rows): row_values: List[Any] = [] for _ in range(cols): row_values.append(flattened[idx] if idx < len(flattened) else None) idx += 1 normalized.append(row_values) return normalized # Single scalar - broadcast return [[value for _ in range(cols)] for _ in range(rows)] def format_read_result(self, result: Dict[str, Any], output_format: str) -> str: """Public formatter wrapper for read_excel outputs.""" fmt = normalize_output_format(output_format) result["format"] = fmt return format_read_result(result, fmt) # ---------- Advanced Excel operations inspired by excel-mcp-server ---------- async def apply_formula(self, path: str, sheet: str, cell: str, formula: str) -> Dict[str, Any]: """Apply an Excel formula to a cell.""" abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) if sheet not in wb.sheetnames: raise ValueError(f"Sheet not found: {sheet}") # Basic syntax validation try: Tokenizer(formula) except Exception as e: raise ValueError(f"Invalid formula: {e}") ws = wb[sheet] ws[cell] = formula await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": f"Formula applied to {cell}", "path": self.validator.real_to_virtual(abs_path)} async def validate_formula_syntax(self, path: str, sheet: str, cell: str, formula: str) -> Dict[str, Any]: """Validate formula without applying.""" await self._validated_excel_path(path, must_exist=True) try: Tokenizer(formula) except Exception as e: return {"success": False, "message": f"Invalid formula: {e}"} return {"success": True, "message": "Formula syntax looks valid"} async def format_range( self, path: str, sheet: str, start_cell: str, end_cell: Optional[str] = None, bold: bool = False, italic: bool = False, underline: bool = False, font_size: Optional[int] = None, font_color: Optional[str] = None, bg_color: Optional[str] = None, border_style: Optional[str] = None, border_color: Optional[str] = None, number_format: Optional[str] = None, alignment: Optional[str] = None, wrap_text: bool = False, merge_cells: bool = False, protection: Optional[Dict[str, Any]] = None, conditional_format: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Apply formatting to a cell range.""" abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) if sheet not in wb.sheetnames: raise ValueError(f"Sheet not found: {sheet}") ws = wb[sheet] rng = f"{start_cell}:{end_cell}" if end_cell else start_cell cells = ws[rng] font = Font( bold=bold or None, italic=italic or None, underline="single" if underline else None, size=font_size, color=font_color if font_color else None, ) fill = PatternFill(fill_type="solid", fgColor=bg_color) if bg_color else None align_map = { "left": ("left", None), "center": ("center", None), "right": ("right", None), "top": (None, "top"), "middle": (None, "center"), "bottom": (None, "bottom"), } horizontal = vertical = None if alignment: horizontal, vertical = align_map.get(alignment, (None, None)) align = Alignment(horizontal=horizontal, vertical=vertical, wrap_text=wrap_text or None) border = None if border_style: side = Side(style=border_style, color=border_color or "000000") border = Border(left=side, right=side, top=side, bottom=side) for row in cells: for c in row: c.font = font.copy() if font else c.font c.fill = fill.copy() if fill else c.fill c.alignment = align if border: c.border = border if number_format: c.number_format = number_format if merge_cells and end_cell: ws.merge_cells(rng) await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": "Range formatted successfully", "path": self.validator.real_to_virtual(abs_path)} async def read_data_with_metadata( self, path: str, sheet: str, start_cell: str = "A1", end_cell: Optional[str] = None, preview_only: bool = False, ) -> Dict[str, Any]: """Read cells with metadata and validation info.""" abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=True, data_only=False) if sheet not in wb.sheetnames: raise ValueError(f"Sheet not found: {sheet}") ws = wb[sheet] rng = f"{start_cell}:{end_cell}" if end_cell else start_cell min_col, min_row, max_col, max_row = range_boundaries(rng) cells_data = [] for row in ws.iter_rows(min_row=min_row, max_row=max_row, min_col=min_col, max_col=max_col): for c in row: cells_data.append( { "address": c.coordinate, "value": c.value, "row": c.row, "column": c.column, "data_type": c.data_type, "number_format": c.number_format, } ) return { "success": True, "sheet": sheet, "cells": cells_data if not preview_only else cells_data[: min(20, len(cells_data))], "path": self.validator.real_to_virtual(abs_path), } async def write_data_to_excel( self, path: str, sheet: str, data: List[List[Any]], start_cell: str = "A1", ) -> Dict[str, Any]: """Write a 2D list to sheet starting at start_cell without clearing existing data.""" abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) if sheet not in wb.sheetnames: raise ValueError(f"Sheet not found: {sheet}") ws = wb[sheet] start_col_idx, start_row_idx = range_boundaries(f"{start_cell}:{start_cell}")[:2] for r_idx, row in enumerate(data): for c_idx, value in enumerate(row): target = ws.cell(row=start_row_idx + r_idx, column=start_col_idx + c_idx) target.value = value await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": "Data written", "path": self.validator.real_to_virtual(abs_path)} async def create_workbook(self, path: str) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path) if abs_path.exists(): raise FileExistsError(f"File already exists: {path}") wb = Workbook() await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": "Workbook created", "path": self.validator.real_to_virtual(abs_path)} async def create_worksheet(self, path: str, sheet_name: str) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) if sheet_name in wb.sheetnames: raise ValueError("Sheet already exists") wb.create_sheet(title=sheet_name) await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": f"Sheet '{sheet_name}' created", "path": self.validator.real_to_virtual(abs_path)} async def copy_worksheet(self, path: str, source_sheet: str, target_sheet: str) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) if source_sheet not in wb.sheetnames: raise ValueError("Source sheet not found") if target_sheet in wb.sheetnames: raise ValueError("Target sheet already exists") source = wb[source_sheet] new_ws = wb.copy_worksheet(source) new_ws.title = target_sheet await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": f"Sheet copied to '{target_sheet}'", "path": self.validator.real_to_virtual(abs_path)} async def delete_worksheet(self, path: str, sheet_name: str) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) if sheet_name not in wb.sheetnames: raise ValueError("Sheet not found") wb.remove(wb[sheet_name]) await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": f"Sheet '{sheet_name}' deleted", "path": self.validator.real_to_virtual(abs_path)} async def rename_worksheet(self, path: str, old_name: str, new_name: str) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) if old_name not in wb.sheetnames: raise ValueError("Sheet not found") if new_name in wb.sheetnames: raise ValueError("Target name already exists") wb[old_name].title = new_name await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": f"Sheet renamed to '{new_name}'", "path": self.validator.real_to_virtual(abs_path)} async def get_workbook_metadata(self, path: str, include_ranges: bool = False) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=True, data_only=False) sheets = [{"name": s, "index": idx} for idx, s in enumerate(wb.sheetnames)] named_ranges = list(wb.defined_names.keys()) if include_ranges else [] stat = abs_path.stat() return { "success": True, "path": self.validator.real_to_virtual(abs_path), "sheets": sheets, "named_ranges": named_ranges, "size": stat.st_size, "size_human": format_size(stat.st_size), } async def merge_cells(self, path: str, sheet: str, start_cell: str, end_cell: str) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) ws = wb[sheet] ws.merge_cells(f"{start_cell}:{end_cell}") await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": "Cells merged", "path": self.validator.real_to_virtual(abs_path)} async def unmerge_cells(self, path: str, sheet: str, start_cell: str, end_cell: str) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) ws = wb[sheet] ws.unmerge_cells(f"{start_cell}:{end_cell}") await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": "Cells unmerged", "path": self.validator.real_to_virtual(abs_path)} async def get_merged_cells(self, path: str, sheet: str) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=True, data_only=False) ws = wb[sheet] merged = [str(rng) for rng in ws.merged_cells.ranges] return {"success": True, "ranges": merged, "path": self.validator.real_to_virtual(abs_path)} async def copy_range( self, path: str, sheet: str, source_start: str, source_end: str, target_start: str, target_sheet: Optional[str] = None, ) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) src_ws = wb[sheet] tgt_ws = wb[target_sheet] if target_sheet else src_ws src_min_col, src_min_row, src_max_col, src_max_row = range_boundaries(f"{source_start}:{source_end}") tgt_start_col, tgt_start_row = range_boundaries(f"{target_start}:{target_start}")[:2] for r_off, row in enumerate(src_ws.iter_rows(min_row=src_min_row, max_row=src_max_row, min_col=src_min_col, max_col=src_max_col)): for c_off, cell in enumerate(row): tgt = tgt_ws.cell(row=tgt_start_row + r_off, column=tgt_start_col + c_off) tgt.value = cell.value tgt._style = cell._style await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": "Range copied", "path": self.validator.real_to_virtual(abs_path)} async def delete_range( self, path: str, sheet: str, start_cell: str, end_cell: str, shift_direction: str = "up", ) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) ws = wb[sheet] min_col, min_row, max_col, max_row = range_boundaries(f"{start_cell}:{end_cell}") if shift_direction in ("up", "down"): ws.delete_rows(min_row, max_row - min_row + 1) else: ws.delete_cols(min_col, max_col - min_col + 1) await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": "Range deleted", "path": self.validator.real_to_virtual(abs_path)} async def validate_excel_range(self, path: str, sheet: str, start_cell: str, end_cell: Optional[str] = None) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=True, data_only=False) if sheet not in wb.sheetnames: raise ValueError("Sheet not found") rng = start_cell if not end_cell else f"{start_cell}:{end_cell}" try: range_boundaries(rng) except ValueError as e: return {"success": False, "message": str(e)} return {"success": True, "message": "Range is valid"} async def get_data_validation_info(self, path: str, sheet: str) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=True, data_only=False) ws = wb[sheet] validations = [] for dv in ws.data_validations.dataValidation: validations.append( { "type": dv.type, "sqref": str(dv.sqref), "formula1": dv.formula1, "formula2": dv.formula2, "allow_blank": dv.allow_blank, "showErrorMessage": dv.showErrorMessage, } ) return {"success": True, "validations": validations, "path": self.validator.real_to_virtual(abs_path)} async def insert_rows(self, path: str, sheet: str, start_row: int, count: int = 1) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) ws = wb[sheet] ws.insert_rows(start_row, count) await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": "Rows inserted", "path": self.validator.real_to_virtual(abs_path)} async def insert_columns(self, path: str, sheet: str, start_col: int, count: int = 1) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) ws = wb[sheet] ws.insert_cols(start_col, count) await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": "Columns inserted", "path": self.validator.real_to_virtual(abs_path)} async def delete_rows(self, path: str, sheet: str, start_row: int, count: int = 1) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) ws = wb[sheet] ws.delete_rows(start_row, count) await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": "Rows deleted", "path": self.validator.real_to_virtual(abs_path)} async def delete_columns(self, path: str, sheet: str, start_col: int, count: int = 1) -> Dict[str, Any]: abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) ws = wb[sheet] ws.delete_cols(start_col, count) await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": "Columns deleted", "path": self.validator.real_to_virtual(abs_path)} async def create_chart( self, path: str, sheet: str, data_range: str, chart_type: str, target_cell: str, title: str = "", x_axis: str = "", y_axis: str = "", ) -> Dict[str, Any]: """Create a basic chart from a data range.""" abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) ws = wb[sheet] min_col, min_row, max_col, max_row = range_boundaries(data_range) data_ref = Reference(ws, min_col=min_col, min_row=min_row, max_col=max_col, max_row=max_row) chart_type_lower = chart_type.lower() if chart_type_lower in ("column", "bar"): chart = BarChart() elif chart_type_lower == "line": chart = LineChart() elif chart_type_lower == "pie": chart = PieChart() elif chart_type_lower == "scatter": chart = ScatterChart() chart.series = [Series(data_ref, title=title or None)] else: raise ValueError("Unsupported chart type") if chart_type_lower != "scatter": chart.add_data(data_ref, titles_from_data=True) chart.title = title chart.x_axis.title = x_axis chart.y_axis.title = y_axis ws.add_chart(chart, target_cell) await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": "Chart created", "path": self.validator.real_to_virtual(abs_path)} async def create_table( self, path: str, sheet: str, data_range: str, table_name: Optional[str] = None, table_style: str = "TableStyleMedium9", ) -> Dict[str, Any]: from openpyxl.worksheet.table import Table, TableStyleInfo abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=False) ws = wb[sheet] tbl = Table(displayName=table_name or f"Table_{len(ws._tables)+1}", ref=data_range) style = TableStyleInfo(name=table_style, showFirstColumn=False, showLastColumn=False, showRowStripes=True, showColumnStripes=False) tbl.tableStyleInfo = style ws.add_table(tbl) await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": "Table created", "path": self.validator.real_to_virtual(abs_path)} async def create_pivot_table( self, path: str, sheet: str, data_range: str, rows: List[str], values: List[str], columns: Optional[List[str]] = None, agg_func: str = "mean", target_sheet: Optional[str] = None, target_cell: str = "A1", ) -> Dict[str, Any]: """Simplified pivot: load data into memory and aggregate.""" abs_path = await self._validated_excel_path(path, must_exist=True) wb = await self._load_workbook(abs_path, read_only=False, data_only=True) src_ws = wb[sheet] min_col, min_row, max_col, max_row = range_boundaries(data_range) headers = [cell.value for cell in src_ws[min_row]] data_rows = [] for row in src_ws.iter_rows(min_row=min_row + 1, max_row=max_row, min_col=min_col, max_col=max_col, values_only=True): data_rows.append(dict(zip(headers, row))) def agg(values_list): vals = [v for v in values_list if isinstance(v, (int, float))] if not vals: return None if agg_func == "sum": return sum(vals) if agg_func == "max": return max(vals) if agg_func == "min": return min(vals) if agg_func == "count": return len(vals) # default mean return sum(vals) / len(vals) pivot = {} for row_dict in data_rows: row_key = tuple(row_dict.get(r) for r in rows) col_key = tuple(row_dict.get(c) for c in (columns or [])) if columns else () pivot.setdefault(row_key, {}).setdefault(col_key, {val: [] for val in values}) for val in values: pivot[row_key][col_key][val].append(row_dict.get(val)) out_rows = [] for row_key, col_map in pivot.items(): for col_key, val_map in col_map.items(): entry = {r: rv for r, rv in zip(rows, row_key)} if columns: for c, cv in zip(columns, col_key): entry[c] = cv for val in values: entry[val] = agg(val_map[val]) out_rows.append(entry) tgt_ws = wb[target_sheet] if target_sheet and target_sheet in wb.sheetnames else wb.create_sheet(target_sheet or "Pivot") start_col_idx, start_row_idx = range_boundaries(f"{target_cell}:{target_cell}")[:2] # headers headers_out = list(rows) + (columns or []) + values for c_idx, h in enumerate(headers_out): tgt_ws.cell(row=start_row_idx, column=start_col_idx + c_idx, value=h) for r_offset, row in enumerate(out_rows, start=1): for c_idx, h in enumerate(headers_out): tgt_ws.cell(row=start_row_idx + r_offset, column=start_col_idx + c_idx, value=row.get(h)) await anyio.to_thread.run_sync(wb.save, abs_path) return {"success": True, "message": "Pivot table created", "path": self.validator.real_to_virtual(abs_path), "sheet": tgt_ws.title} def _is_csv(self, path: Path) -> bool: return path.suffix.lower() == ".csv" # ---------- CSV helpers ---------- async def _read_csv_file( self, path: Path, sheet: Optional[str], range_str: Optional[str], max_rows: Optional[int], output_format: str, ) -> Dict[str, Any]: rows = await read_csv(path) total_rows_available = len(rows) total_cols_available = max((len(r) for r in rows), default=0) min_row, max_row, min_col, max_col = self._parse_range_bounds( total_rows_available or 1, total_cols_available or 1, range_str ) sliced_rows = self._slice_rows(rows, min_row, max_row, min_col, max_col) row_limit = max_rows or self.default_max_rows row_limit = row_limit if row_limit > 0 else self.default_max_rows truncated = len(sliced_rows) > row_limit if truncated: sliced_rows = sliced_rows[:row_limit] available_rows = max(0, min(max_row, total_rows_available) - min_row + 1) sheet_name = sheet or path.stem or "Sheet1" result: Dict[str, Any] = { "sheet": sheet_name, "sheets": [sheet_name], # CSV 只有一个 sheet "rows": sliced_rows, "total_rows": available_rows, "returned_rows": len(sliced_rows), "truncated": truncated, } return result def _slice_rows( self, rows: List[List[Any]], min_row: int, max_row: int, min_col: int, max_col: int, ) -> List[List[Any]]: """Slice raw CSV rows using Excel-like coordinates.""" sliced: List[List[Any]] = [] for idx, row in enumerate(rows, start=1): if idx < min_row or idx > max_row: continue padded = list(row) if len(padded) < max_col: padded.extend([None] * (max_col - len(padded))) sliced.append(padded[min_col - 1 : max_col]) return sliced

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/answerlink/MCP-Workspace-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server