Skip to main content
Glama
excel_service.py12.3 kB
"""Excel Service Module. Provides high-level Excel processing services with intelligent parameter detection, caching, and enhanced error handling. """ import os import pandas as pd from typing import Dict, Any, Optional, List, Tuple, Union from pathlib import Path import hashlib import json from datetime import datetime try: from core.config import get_config from core.exceptions import ( FileAccessError, DataProcessingError, ValidationError, SecurityError, ResourceError ) from core.types import FileInfo, ExcelMetadata, ExecutionResult CORE_AVAILABLE = True except ImportError: CORE_AVAILABLE = False # Fallback types class FileInfo: def __init__(self, **kwargs): for k, v in kwargs.items(): setattr(self, k, v) class ExcelMetadata: def __init__(self, **kwargs): for k, v in kwargs.items(): setattr(self, k, v) class ExecutionResult: def __init__(self, **kwargs): for k, v in kwargs.items(): setattr(self, k, v) class ExcelService: """Excel processing service with intelligent features.""" def __init__(self): """Initialize Excel service.""" self.config = get_config() if CORE_AVAILABLE else None self._file_cache = {} self._metadata_cache = {} def get_file_info(self, file_path: str) -> FileInfo: """Get comprehensive file information. Args: file_path: Path to the file Returns: FileInfo object with file details Raises: FileAccessError: If file cannot be accessed """ try: path = Path(file_path) if not path.exists(): raise FileAccessError(file_path=file_path, reason="文件不存在") stat = path.stat() # Check file size limits if self.config and stat.st_size > self.config.security.max_file_size: raise SecurityError( f"文件大小 ({stat.st_size} 字节) 超过限制 " f"({self.config.security.max_file_size} 字节)" ) return FileInfo( path=str(path.absolute()), name=path.name, size=stat.st_size, extension=path.suffix.lower(), modified_time=datetime.fromtimestamp(stat.st_mtime), is_readable=os.access(path, os.R_OK), is_writable=os.access(path, os.W_OK) ) except Exception as e: if CORE_AVAILABLE: raise FileAccessError(file_path=file_path, reason=f"获取文件信息失败: {e}") else: raise Exception(f"获取文件信息失败: {e}") def detect_excel_structure(self, file_path: str) -> ExcelMetadata: """Intelligently detect Excel file structure. Args: file_path: Path to Excel file Returns: ExcelMetadata with detected structure information """ # Check cache first cache_key = self._get_cache_key(file_path) if cache_key in self._metadata_cache: return self._metadata_cache[cache_key] try: file_info = self.get_file_info(file_path) # Read Excel file with error handling if file_info.extension in ['.xlsx', '.xls']: # Get sheet names first excel_file = pd.ExcelFile(file_path) sheet_names = excel_file.sheet_names # Analyze first sheet for structure df_sample = pd.read_excel(file_path, sheet_name=0, nrows=10) # Detect encoding and separators for CSV-like analysis encoding = 'utf-8' separator = None elif file_info.extension == '.csv': # Detect CSV encoding and separator encoding = self._detect_encoding(file_path) separator = self._detect_separator(file_path, encoding) # Read sample df_sample = pd.read_csv(file_path, encoding=encoding, sep=separator, nrows=10) sheet_names = ['Sheet1'] # CSV has only one "sheet" else: raise ValidationError(parameter="file_extension", value=file_info.extension, expected="支持的Excel格式(.xlsx, .xls, .csv)") # Analyze data structure columns_info = [] for col in df_sample.columns: col_data = df_sample[col].dropna() if len(col_data) > 0: dtype = str(col_data.dtype) sample_values = col_data.head(3).tolist() unique_count = col_data.nunique() null_count = df_sample[col].isnull().sum() else: dtype = 'object' sample_values = [] unique_count = 0 null_count = len(df_sample) columns_info.append({ 'name': col, 'dtype': dtype, 'sample_values': sample_values, 'unique_count': unique_count, 'null_count': null_count, 'null_percentage': (null_count / len(df_sample)) * 100 if len(df_sample) > 0 else 0 }) # Create metadata metadata = ExcelMetadata( file_info=file_info, sheet_names=sheet_names, total_sheets=len(sheet_names), columns_info=columns_info, total_columns=len(columns_info), sample_rows=len(df_sample), encoding=encoding, separator=separator, detected_at=datetime.now() ) # Cache the result self._metadata_cache[cache_key] = metadata return metadata except Exception as e: if CORE_AVAILABLE: raise DataProcessingError( operation="Excel结构检测", data_info=str({'file_path': file_path}), error_details=f"Excel结构检测失败: {e}" ) else: raise Exception(f"Excel结构检测失败: {e}") def smart_read_excel(self, file_path: str, **kwargs) -> pd.DataFrame: """Smart Excel reading with automatic parameter detection. Args: file_path: Path to Excel file **kwargs: Override parameters Returns: pandas DataFrame """ # Get metadata for smart parameter detection metadata = self.detect_excel_structure(file_path) # Prepare reading parameters read_params = { 'sheet_name': kwargs.get('sheet_name', 0), 'header': kwargs.get('header', 0), 'index_col': kwargs.get('index_col', None), 'usecols': kwargs.get('usecols', None), 'nrows': kwargs.get('nrows', None), 'skiprows': kwargs.get('skiprows', None) } # Smart parameter adjustments based on metadata if metadata.file_info.extension == '.csv': read_params.update({ 'encoding': metadata.encoding, 'sep': metadata.separator }) df = pd.read_csv(file_path, **read_params) else: df = pd.read_excel(file_path, **read_params) return df def get_smart_suggestions(self, file_path: str) -> Dict[str, Any]: """Get intelligent parameter suggestions for Excel processing. Args: file_path: Path to Excel file Returns: Dictionary with parameter suggestions """ metadata = self.detect_excel_structure(file_path) suggestions = { 'file_info': { 'name': metadata.file_info.name, 'size_mb': round(metadata.file_info.size / (1024 * 1024), 2), 'format': metadata.file_info.extension, 'sheets_available': metadata.sheet_names }, 'reading_suggestions': { 'recommended_sheet': metadata.sheet_names[0] if metadata.sheet_names else None, 'has_header': True, # Most Excel files have headers 'encoding': metadata.encoding if hasattr(metadata, 'encoding') else 'utf-8', 'separator': metadata.separator if hasattr(metadata, 'separator') else None }, 'data_insights': { 'total_columns': metadata.total_columns, 'sample_rows_analyzed': metadata.sample_rows, 'columns_with_nulls': len([c for c in metadata.columns_info if c['null_count'] > 0]), 'numeric_columns': len([c for c in metadata.columns_info if 'int' in c['dtype'] or 'float' in c['dtype']]), 'text_columns': len([c for c in metadata.columns_info if 'object' in c['dtype']]) }, 'processing_recommendations': [] } # Add specific recommendations high_null_cols = [c for c in metadata.columns_info if c['null_percentage'] > 50] if high_null_cols: suggestions['processing_recommendations'].append( f"注意: {len(high_null_cols)} 个列的空值超过50%,建议进行数据清洗" ) if metadata.file_info.size > 10 * 1024 * 1024: # 10MB suggestions['processing_recommendations'].append( "文件较大,建议使用分块读取或指定列范围以提高性能" ) if len(metadata.sheet_names) > 1: suggestions['processing_recommendations'].append( f"文件包含 {len(metadata.sheet_names)} 个工作表,请指定要处理的工作表" ) return suggestions def _get_cache_key(self, file_path: str) -> str: """Generate cache key for file.""" stat = os.stat(file_path) content = f"{file_path}_{stat.st_mtime}_{stat.st_size}" return hashlib.md5(content.encode()).hexdigest() def _detect_encoding(self, file_path: str) -> str: """Detect file encoding.""" try: import chardet with open(file_path, 'rb') as f: raw_data = f.read(10000) # Read first 10KB result = chardet.detect(raw_data) return result.get('encoding', 'utf-8') except ImportError: # Fallback to common encodings for encoding in ['utf-8', 'gbk', 'gb2312', 'latin1']: try: with open(file_path, 'r', encoding=encoding) as f: f.read(1000) return encoding except UnicodeDecodeError: continue return 'utf-8' def _detect_separator(self, file_path: str, encoding: str) -> str: """Detect CSV separator.""" try: import csv with open(file_path, 'r', encoding=encoding) as f: sample = f.read(1024) sniffer = csv.Sniffer() delimiter = sniffer.sniff(sample).delimiter return delimiter except: # Fallback to common separators separators = [',', ';', '\t', '|'] with open(file_path, 'r', encoding=encoding) as f: first_line = f.readline() for sep in separators: if sep in first_line: return sep return ',' def clear_cache(self): """Clear all caches.""" self._file_cache.clear() self._metadata_cache.clear()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Lillard01/chatExcel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server