Skip to main content
Glama

MCP Unified Server

by getfounded
excel.py62 kB
#!/usr/bin/env python3 import os import json import logging import io from enum import Enum from typing import List, Dict, Optional, Any, Union, Tuple import pandas as pd # Ensure compatibility with mcp server from mcp.server.fastmcp import FastMCP, Context # External MCP reference for tool registration external_mcp = None def set_external_mcp(mcp): """Set the external MCP reference for tool registration""" global external_mcp external_mcp = mcp logging.info("XlsxWriter tools MCP reference set") class XlsxWriterTools(str, Enum): """Enum of XlsxWriter tool names""" # Existing tools CREATE_WORKBOOK = "xlsx_create_workbook" ADD_WORKSHEET = "xlsx_add_worksheet" WRITE_DATA = "xlsx_write_data" WRITE_MATRIX = "xlsx_write_matrix" ADD_FORMAT = "xlsx_add_format" ADD_CHART = "xlsx_add_chart" ADD_IMAGE = "xlsx_add_image" ADD_FORMULA = "xlsx_add_formula" ADD_TABLE = "xlsx_add_table" CLOSE_WORKBOOK = "xlsx_close_workbook" # New reading tools READ_EXCEL = "xlsx_read_excel" READ_CSV = "xlsx_read_csv" GET_SHEET_NAMES = "xlsx_get_sheet_names" # New manipulation tools FILTER_DATAFRAME = "xlsx_filter_dataframe" SORT_DATAFRAME = "xlsx_sort_dataframe" GROUP_DATAFRAME = "xlsx_group_dataframe" MODIFY_COLUMNS = "xlsx_modify_columns" DESCRIBE_DATAFRAME = "xlsx_describe_dataframe" # New export tools DATAFRAME_TO_EXCEL = "xlsx_dataframe_to_excel" DATAFRAME_TO_CSV = "xlsx_dataframe_to_csv" # Storage for DataFrames in memory _dataframes = {} def _store_dataframe(dataframe_id, df): """Store DataFrame in memory for future operations""" _dataframes[dataframe_id] = df return dataframe_id def _get_dataframe(dataframe_id): """Retrieve DataFrame from memory""" return _dataframes.get(dataframe_id) def _list_dataframes(): """List all available DataFrames in memory""" return list(_dataframes.keys()) def _clear_dataframe(dataframe_id): """Remove DataFrame from memory""" if dataframe_id in _dataframes: del _dataframes[dataframe_id] return True return False class XlsxWriterService: """Service to handle Excel file creation, reading and manipulation""" def __init__(self): """Initialize the XlsxWriter service""" try: import xlsxwriter self.xlsxwriter = xlsxwriter self.initialized = True except ImportError: logging.error( "xlsxwriter library not installed. Please install with 'pip install XlsxWriter'") self.initialized = False self.xlsxwriter = None # Dictionary to store active workbooks and worksheets self.workbooks = {} # Check for pandas, openpyxl, and xlrd (for reading Excel) try: import pandas as pd self.pandas = pd self.pandas_available = True except ImportError: logging.error( "pandas library not installed. Please install with 'pip install pandas'") self.pandas_available = False self.pandas = None try: import openpyxl self.openpyxl_available = True except ImportError: logging.warning( "openpyxl library not installed. Some Excel reading features may be limited. Install with 'pip install openpyxl'") self.openpyxl_available = False try: import xlrd self.xlrd_available = True except ImportError: logging.warning( "xlrd library not installed. Reading older Excel formats may be limited. Install with 'pip install xlrd'") self.xlrd_available = False def _is_initialized(self): """Check if the service is properly initialized""" if not self.initialized: raise ValueError( "XlsxWriter service not properly initialized. Check if xlsxwriter library is installed.") return True def _check_pandas_available(self): """Check if pandas is available for DataFrame operations""" if not self.pandas_available: raise ValueError( "pandas library not installed. Please install with 'pip install pandas'") return True async def create_workbook(self, filename): """Create a new Excel workbook""" try: self._is_initialized() # Create the workbook workbook = self.xlsxwriter.Workbook(filename) # Store in our dictionary self.workbooks[filename] = { "workbook": workbook, "worksheets": {}, "formats": {}, "charts": {} } return {"filename": filename, "status": "created"} except Exception as e: return {"error": f"Error creating workbook: {str(e)}"} async def add_worksheet(self, filename, name=None): """Add a worksheet to the workbook""" try: self._is_initialized() # Check if workbook exists if filename not in self.workbooks: return {"error": f"Workbook {filename} not found"} workbook = self.workbooks[filename]["workbook"] # Add a worksheet worksheet = workbook.add_worksheet(name) # Store the worksheet worksheet_name = name if name else f"Sheet{len(self.workbooks[filename]['worksheets'])+1}" self.workbooks[filename]["worksheets"][worksheet_name] = worksheet return {"filename": filename, "worksheet": worksheet_name, "status": "added"} except Exception as e: return {"error": f"Error adding worksheet: {str(e)}"} async def write_data(self, filename, worksheet_name, row, col, data, format_name=None): """Write data to a cell""" try: self._is_initialized() # Check if workbook exists if filename not in self.workbooks: return {"error": f"Workbook {filename} not found"} # Check if worksheet exists if worksheet_name not in self.workbooks[filename]["worksheets"]: return {"error": f"Worksheet {worksheet_name} not found in {filename}"} worksheet = self.workbooks[filename]["worksheets"][worksheet_name] # Get format if specified format_obj = None if format_name and format_name in self.workbooks[filename]["formats"]: format_obj = self.workbooks[filename]["formats"][format_name] # Write the data worksheet.write(row, col, data, format_obj) return {"filename": filename, "worksheet": worksheet_name, "row": row, "col": col, "data": data, "status": "written"} except Exception as e: return {"error": f"Error writing data: {str(e)}"} async def write_matrix(self, filename, worksheet_name, start_row, start_col, data, formats=None): """Write a matrix of data to a worksheet""" try: self._is_initialized() # Check if workbook exists if filename not in self.workbooks: return {"error": f"Workbook {filename} not found"} # Check if worksheet exists if worksheet_name not in self.workbooks[filename]["worksheets"]: return {"error": f"Worksheet {worksheet_name} not found in {filename}"} worksheet = self.workbooks[filename]["worksheets"][worksheet_name] # Process formats if provided format_matrix = None if formats: format_matrix = [] for row in formats: format_row = [] for format_name in row: if format_name and format_name in self.workbooks[filename]["formats"]: format_row.append( self.workbooks[filename]["formats"][format_name]) else: format_row.append(None) format_matrix.append(format_row) # Write the data matrix for i, row_data in enumerate(data): for j, cell_data in enumerate(row_data): format_obj = None if format_matrix and i < len(format_matrix) and j < len(format_matrix[i]): format_obj = format_matrix[i][j] worksheet.write(start_row + i, start_col + j, cell_data, format_obj) return {"filename": filename, "worksheet": worksheet_name, "rows": len(data), "cols": len(data[0]) if data else 0, "status": "written"} except Exception as e: return {"error": f"Error writing matrix: {str(e)}"} async def add_format(self, filename, format_name, format_props): """Create a cell format""" try: self._is_initialized() # Check if workbook exists if filename not in self.workbooks: return {"error": f"Workbook {filename} not found"} workbook = self.workbooks[filename]["workbook"] # Create the format format_obj = workbook.add_format(format_props) # Store the format self.workbooks[filename]["formats"][format_name] = format_obj return {"filename": filename, "format": format_name, "properties": format_props, "status": "added"} except Exception as e: return {"error": f"Error adding format: {str(e)}"} async def add_chart(self, filename, worksheet_name, chart_type, data_range, position, options=None): """Add a chart to a worksheet""" try: self._is_initialized() # Check if workbook exists if filename not in self.workbooks: return {"error": f"Workbook {filename} not found"} # Check if worksheet exists if worksheet_name not in self.workbooks[filename]["worksheets"]: return {"error": f"Worksheet {worksheet_name} not found in {filename}"} workbook = self.workbooks[filename]["workbook"] worksheet = self.workbooks[filename]["worksheets"][worksheet_name] # Create the chart chart = workbook.add_chart({'type': chart_type}) # Add the data series for series in data_range: chart.add_series(series) # Set chart title and other options if provided if options: if 'title' in options: chart.set_title({'name': options['title']}) if 'x_axis' in options: chart.set_x_axis(options['x_axis']) if 'y_axis' in options: chart.set_y_axis(options['y_axis']) if 'style' in options: chart.set_style(options['style']) # Insert the chart into the worksheet worksheet.insert_chart(position['row'], position['col'], chart) # Store the chart chart_name = f"Chart{len(self.workbooks[filename]['charts'])+1}" self.workbooks[filename]["charts"][chart_name] = chart return {"filename": filename, "worksheet": worksheet_name, "chart": chart_name, "status": "added"} except Exception as e: return {"error": f"Error adding chart: {str(e)}"} async def add_image(self, filename, worksheet_name, image_path, position, options=None): """Add an image to a worksheet""" try: self._is_initialized() # Check if workbook exists if filename not in self.workbooks: return {"error": f"Workbook {filename} not found"} # Check if worksheet exists if worksheet_name not in self.workbooks[filename]["worksheets"]: return {"error": f"Worksheet {worksheet_name} not found in {filename}"} # Check if image file exists if not os.path.exists(image_path): return {"error": f"Image file {image_path} not found"} worksheet = self.workbooks[filename]["worksheets"][worksheet_name] # Insert the image worksheet.insert_image( position['row'], position['col'], image_path, options) return {"filename": filename, "worksheet": worksheet_name, "image": image_path, "status": "added"} except Exception as e: return {"error": f"Error adding image: {str(e)}"} async def add_formula(self, filename, worksheet_name, row, col, formula, format_name=None): """Add a formula to a cell""" try: self._is_initialized() # Check if workbook exists if filename not in self.workbooks: return {"error": f"Workbook {filename} not found"} # Check if worksheet exists if worksheet_name not in self.workbooks[filename]["worksheets"]: return {"error": f"Worksheet {worksheet_name} not found in {filename}"} worksheet = self.workbooks[filename]["worksheets"][worksheet_name] # Get format if specified format_obj = None if format_name and format_name in self.workbooks[filename]["formats"]: format_obj = self.workbooks[filename]["formats"][format_name] # Write the formula worksheet.write_formula(row, col, formula, format_obj) return {"filename": filename, "worksheet": worksheet_name, "row": row, "col": col, "formula": formula, "status": "added"} except Exception as e: return {"error": f"Error adding formula: {str(e)}"} async def add_table(self, filename, worksheet_name, start_row, start_col, end_row, end_col, options=None): """Add a table to a worksheet""" try: self._is_initialized() # Check if workbook exists if filename not in self.workbooks: return {"error": f"Workbook {filename} not found"} # Check if worksheet exists if worksheet_name not in self.workbooks[filename]["worksheets"]: return {"error": f"Worksheet {worksheet_name} not found in {filename}"} worksheet = self.workbooks[filename]["worksheets"][worksheet_name] # Add the table table_options = options or {} worksheet.add_table(start_row, start_col, end_row, end_col, table_options) return {"filename": filename, "worksheet": worksheet_name, "table_range": f"{start_row}:{start_col}:{end_row}:{end_col}", "status": "added"} except Exception as e: return {"error": f"Error adding table: {str(e)}"} async def close_workbook(self, filename): """Close and save the workbook""" try: self._is_initialized() # Check if workbook exists if filename not in self.workbooks: return {"error": f"Workbook {filename} not found"} workbook = self.workbooks[filename]["workbook"] # Close the workbook (which saves it) workbook.close() # Remove from our dictionary del self.workbooks[filename] return {"filename": filename, "status": "closed"} except Exception as e: return {"error": f"Error closing workbook: {str(e)}"} # # New methods for reading Excel and CSV files # async def read_excel(self, filename, sheet_name=0, **kwargs): """Read Excel file into DataFrame""" try: self._check_pandas_available() # Check if file exists if not os.path.exists(filename): return {"error": f"File {filename} not found"} # Read Excel file df = self.pandas.read_excel( filename, sheet_name=sheet_name, **kwargs) # If it returned a dict of DataFrames (multiple sheets) if isinstance(df, dict): return {"sheets": list(df.keys()), "dataframes": df} return df except Exception as e: return {"error": f"Error reading Excel file: {str(e)}"} async def read_csv(self, filename, **kwargs): """Read CSV file into DataFrame""" try: self._check_pandas_available() # Check if file exists if not os.path.exists(filename): return {"error": f"File {filename} not found"} # Read CSV file df = self.pandas.read_csv(filename, **kwargs) return df except Exception as e: return {"error": f"Error reading CSV file: {str(e)}"} async def get_excel_sheet_names(self, filename): """Get sheet names from Excel file""" try: self._check_pandas_available() # Check if file exists if not os.path.exists(filename): return {"error": f"File {filename} not found"} # Get sheet names excel_file = self.pandas.ExcelFile(filename) sheet_names = excel_file.sheet_names return sheet_names except Exception as e: return {"error": f"Error getting sheet names: {str(e)}"} async def dataframe_info(self, df): """Get information about DataFrame""" try: self._check_pandas_available() # Check if valid DataFrame if not isinstance(df, self.pandas.DataFrame): return {"error": "Invalid DataFrame"} # Create a string buffer to capture DataFrame.info() output buffer = io.StringIO() df.info(buf=buffer) info_output = buffer.getvalue() # Basic info about the DataFrame result = { "shape": df.shape, "columns": df.columns.tolist(), # Convert to strings for JSON serialization "dtypes": {str(k): str(v) for k, v in df.dtypes.to_dict().items()}, "info": info_output, "memory_usage": df.memory_usage(deep=True).sum(), "has_nulls": df.isnull().any().any() } # Add sample data (first 5 rows) try: result["head"] = json.loads( df.head().to_json(orient="records")) except: # Fallback if to_json fails for some reason result["head"] = str(df.head()) return result except Exception as e: return {"error": f"Error getting DataFrame info: {str(e)}"} async def dataframe_to_excel(self, df, filename, sheet_name="Sheet1", index=True, **kwargs): """Export DataFrame to Excel file""" try: self._check_pandas_available() # Check if valid DataFrame if not isinstance(df, self.pandas.DataFrame): return {"error": "Invalid DataFrame"} # Export to Excel df.to_excel(filename, sheet_name=sheet_name, index=index, **kwargs) return {"filename": filename, "sheet_name": sheet_name, "rows": len(df), "columns": len(df.columns), "status": "exported"} except Exception as e: return {"error": f"Error exporting to Excel: {str(e)}"} async def dataframe_to_csv(self, df, filename, index=True, **kwargs): """Export DataFrame to CSV file""" try: self._check_pandas_available() # Check if valid DataFrame if not isinstance(df, self.pandas.DataFrame): return {"error": "Invalid DataFrame"} # Export to CSV df.to_csv(filename, index=index, **kwargs) return {"filename": filename, "rows": len(df), "columns": len(df.columns), "status": "exported"} except Exception as e: return {"error": f"Error exporting to CSV: {str(e)}"} # # Data manipulation methods # async def filter_dataframe(self, df, query=None, column=None, value=None, operator="=="): """Filter DataFrame by query or column condition""" try: self._check_pandas_available() # Check if valid DataFrame if not isinstance(df, self.pandas.DataFrame): return {"error": "Invalid DataFrame"} # Filter by query string if query: try: filtered_df = df.query(query) return filtered_df except Exception as e: return {"error": f"Error in query: {str(e)}"} # Filter by column and value elif column and value is not None: if column not in df.columns: return {"error": f"Column '{column}' not found"} if operator == "==": filtered_df = df[df[column] == value] elif operator == "!=": filtered_df = df[df[column] != value] elif operator == ">": filtered_df = df[df[column] > value] elif operator == ">=": filtered_df = df[df[column] >= value] elif operator == "<": filtered_df = df[df[column] < value] elif operator == "<=": filtered_df = df[df[column] <= value] elif operator == "in": if not isinstance(value, list): return {"error": "Value must be a list when using 'in' operator"} filtered_df = df[df[column].isin(value)] elif operator == "contains": filtered_df = df[df[column].astype( str).str.contains(str(value))] else: return {"error": f"Unknown operator: {operator}"} return filtered_df else: return {"error": "Either query or column+value must be provided"} except Exception as e: return {"error": f"Error filtering DataFrame: {str(e)}"} async def sort_dataframe(self, df, by, ascending=True): """Sort DataFrame by columns""" try: self._check_pandas_available() # Check if valid DataFrame if not isinstance(df, self.pandas.DataFrame): return {"error": "Invalid DataFrame"} # Ensure 'by' is a list if isinstance(by, str): by = [by] # Check if all columns exist for col in by: if col not in df.columns: return {"error": f"Column '{col}' not found"} # Convert ascending to list if necessary if not isinstance(ascending, list): ascending = [ascending] * len(by) # Sort the DataFrame sorted_df = df.sort_values(by=by, ascending=ascending) return sorted_df except Exception as e: return {"error": f"Error sorting DataFrame: {str(e)}"} async def group_dataframe(self, df, by, agg_func=None): """Group DataFrame and apply aggregation""" try: self._check_pandas_available() # Check if valid DataFrame if not isinstance(df, self.pandas.DataFrame): return {"error": "Invalid DataFrame"} # Ensure 'by' is a list if isinstance(by, str): by = [by] # Check if all columns exist for col in by: if col not in df.columns: return {"error": f"Column '{col}' not found"} # Group by columns grouped = df.groupby(by) # Apply aggregation function if agg_func: if isinstance(agg_func, dict): # Complex aggregation with different functions per column result_df = grouped.agg(agg_func) else: # Simple aggregation with same function for all columns result_df = grouped.agg(agg_func) else: # Default aggregation (count) result_df = grouped.count() return result_df.reset_index() except Exception as e: return {"error": f"Error grouping DataFrame: {str(e)}"} async def describe_dataframe(self, df, include=None, exclude=None, percentiles=None): """Get statistical description of DataFrame""" try: self._check_pandas_available() # Check if valid DataFrame if not isinstance(df, self.pandas.DataFrame): return {"error": "Invalid DataFrame"} # Get description desc_df = df.describe( include=include, exclude=exclude, percentiles=percentiles) # Convert to dict for JSON serialization result = {} for col in desc_df.columns: result[str(col)] = {str(idx): float(val) if not pd.isna(val) else None for idx, val in desc_df[col].items()} return result except Exception as e: return {"error": f"Error describing DataFrame: {str(e)}"} # Tool function definitions that will be registered with MCP async def xlsx_create_workbook(filename: str, ctx: Context = None) -> str: """Create a new Excel workbook Parameters: - filename: Path to save the Excel file Returns: - JSON string containing the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if xlsxwriter library is installed." try: result = await xlsx.create_workbook(filename) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def xlsx_add_worksheet(filename: str, name: str = None, ctx: Context = None) -> str: """Add a worksheet to the workbook Parameters: - filename: Path to the Excel file - name: (Optional) Name for the worksheet Returns: - JSON string containing the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if xlsxwriter library is installed." try: result = await xlsx.add_worksheet(filename, name) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def xlsx_write_data(filename: str, worksheet: str, row: int, col: int, data: Any, format: str = None, ctx: Context = None) -> str: """Write data to a cell in a worksheet Parameters: - filename: Path to the Excel file - worksheet: Name of the worksheet - row: Row number (0-based) - col: Column number (0-based) - data: Data to write - format: (Optional) Name of a predefined format Returns: - JSON string containing the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if xlsxwriter library is installed." try: result = await xlsx.write_data(filename, worksheet, row, col, data, format) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def xlsx_write_matrix(filename: str, worksheet: str, start_row: int, start_col: int, data: List[List[Any]], formats: List[List[str]] = None, ctx: Context = None) -> str: """Write a matrix of data to a worksheet Parameters: - filename: Path to the Excel file - worksheet: Name of the worksheet - start_row: Starting row number (0-based) - start_col: Starting column number (0-based) - data: 2D list of data to write - formats: (Optional) 2D list of format names corresponding to data Returns: - JSON string containing the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if xlsxwriter library is installed." try: result = await xlsx.write_matrix(filename, worksheet, start_row, start_col, data, formats) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def xlsx_add_format(filename: str, format_name: str, properties: Dict[str, Any], ctx: Context = None) -> str: """Create a cell format Parameters: - filename: Path to the Excel file - format_name: Name to identify the format - properties: Dictionary of format properties (e.g., {'bold': True, 'font_color': 'red'}) Returns: - JSON string containing the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if xlsxwriter library is installed." try: result = await xlsx.add_format(filename, format_name, properties) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def xlsx_add_chart(filename: str, worksheet: str, chart_type: str, data_range: List[Dict[str, Any]], position: Dict[str, int], options: Dict[str, Any] = None, ctx: Context = None) -> str: """Add a chart to a worksheet Parameters: - filename: Path to the Excel file - worksheet: Name of the worksheet - chart_type: Type of chart (e.g., 'column', 'line', 'pie') - data_range: List of data series specifications - position: Dictionary with 'row' and 'col' keys specifying chart position - options: (Optional) Additional chart options Returns: - JSON string containing the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if xlsxwriter library is installed." try: result = await xlsx.add_chart(filename, worksheet, chart_type, data_range, position, options) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def xlsx_add_image(filename: str, worksheet: str, image_path: str, position: Dict[str, int], options: Dict[str, Any] = None, ctx: Context = None) -> str: """Add an image to a worksheet Parameters: - filename: Path to the Excel file - worksheet: Name of the worksheet - image_path: Path to the image file - position: Dictionary with 'row' and 'col' keys specifying image position - options: (Optional) Additional image options Returns: - JSON string containing the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if xlsxwriter library is installed." try: result = await xlsx.add_image(filename, worksheet, image_path, position, options) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def xlsx_add_formula(filename: str, worksheet: str, row: int, col: int, formula: str, format: str = None, ctx: Context = None) -> str: """Add a formula to a cell Parameters: - filename: Path to the Excel file - worksheet: Name of the worksheet - row: Row number (0-based) - col: Column number (0-based) - formula: Excel formula (e.g., '=SUM(A1:A10)') - format: (Optional) Name of a predefined format Returns: - JSON string containing the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if xlsxwriter library is installed." try: result = await xlsx.add_formula(filename, worksheet, row, col, formula, format) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def xlsx_add_table(filename: str, worksheet: str, start_row: int, start_col: int, end_row: int, end_col: int, options: Dict[str, Any] = None, ctx: Context = None) -> str: """Add a table to a worksheet Parameters: - filename: Path to the Excel file - worksheet: Name of the worksheet - start_row: Starting row number (0-based) - start_col: Starting column number (0-based) - end_row: Ending row number (0-based) - end_col: Ending column number (0-based) - options: (Optional) Table options (e.g., {'header_row': True, 'columns': [{'header': 'Name'}]}) Returns: - JSON string containing the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if xlsxwriter library is installed." try: result = await xlsx.add_table(filename, worksheet, start_row, start_col, end_row, end_col, options) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) async def xlsx_close_workbook(filename: str, ctx: Context = None) -> str: """Close and save the workbook Parameters: - filename: Path to the Excel file Returns: - JSON string containing the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if xlsxwriter library is installed." try: result = await xlsx.close_workbook(filename) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # # New tool functions for reading Excel and CSV files # async def xlsx_read_excel(filename: str, sheet_name: Union[str, int] = 0, output_id: str = None, header: Union[int, List[int], None] = 0, names: List[str] = None, skiprows: Union[int, List[int], None] = None, ctx: Context = None) -> str: """Read an Excel file into a pandas DataFrame Parameters: - filename: Path to the Excel file - sheet_name: Sheet name or index (default: 0) - output_id: ID to store the DataFrame in memory (default: filename) - header: Row(s) to use as column names (default: 0) - names: List of custom column names (default: None) - skiprows: Row indices to skip or number of rows to skip (default: None) Returns: - JSON string with DataFrame information """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if pandas and xlsxwriter libraries are installed." try: # Set default output_id to filename without extension if not output_id: output_id = os.path.splitext(os.path.basename(filename))[0] # Read the Excel file result = await xlsx.read_excel(filename, sheet_name=sheet_name, header=header, names=names, skiprows=skiprows) # Handle both single sheet and multiple sheets if isinstance(result, dict) and "error" in result: return json.dumps(result, indent=2) elif isinstance(result, dict) and "sheets" in result: # Multiple sheets returned sheet_info = {} for sheet_name, df in result["dataframes"].items(): sheet_id = f"{output_id}_{sheet_name}" _store_dataframe(sheet_id, df) info = await xlsx.dataframe_info(df) sheet_info[sheet_name] = { "dataframe_id": sheet_id, "shape": info["shape"], "columns": info["columns"] } return json.dumps({ "filename": filename, "sheets": result["sheets"], "sheet_info": sheet_info, "status": "read", "message": f"Multiple sheets read. Access individual DataFrames using their IDs." }, indent=2) else: # Single sheet returned _store_dataframe(output_id, result) info = await xlsx.dataframe_info(result) response = { "filename": filename, "dataframe_id": output_id, "shape": info["shape"], "columns": info["columns"], "dtypes": info["dtypes"], "has_nulls": info["has_nulls"], "head": info["head"], "status": "read" } return json.dumps(response, indent=2) except Exception as e: return json.dumps({"error": f"Error reading Excel file: {str(e)}"}, indent=2) async def xlsx_read_csv(filename: str, output_id: str = None, delimiter: str = ",", header: Union[int, List[int], None] = 0, names: List[str] = None, skiprows: Union[int, List[int], None] = None, encoding: str = None, ctx: Context = None) -> str: """Read a CSV file into a pandas DataFrame Parameters: - filename: Path to the CSV file - output_id: ID to store the DataFrame in memory (default: filename) - delimiter: Delimiter to use (default: ",") - header: Row(s) to use as column names (default: 0) - names: List of custom column names (default: None) - skiprows: Row indices to skip or number of rows to skip (default: None) - encoding: File encoding (default: None, pandas will try to detect) Returns: - JSON string with DataFrame information """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if pandas and xlsxwriter libraries are installed." try: # Set default output_id to filename without extension if not output_id: output_id = os.path.splitext(os.path.basename(filename))[0] # Read the CSV file kwargs = { "delimiter": delimiter, "header": header, "skiprows": skiprows } if names: kwargs["names"] = names if encoding: kwargs["encoding"] = encoding result = await xlsx.read_csv(filename, **kwargs) if isinstance(result, dict) and "error" in result: return json.dumps(result, indent=2) # Store DataFrame in memory _store_dataframe(output_id, result) # Get DataFrame info info = await xlsx.dataframe_info(result) response = { "filename": filename, "dataframe_id": output_id, "shape": info["shape"], "columns": info["columns"], "dtypes": info["dtypes"], "has_nulls": info["has_nulls"], "head": info["head"], "status": "read" } return json.dumps(response, indent=2) except Exception as e: return json.dumps({"error": f"Error reading CSV file: {str(e)}"}, indent=2) async def xlsx_get_sheet_names(filename: str, ctx: Context = None) -> str: """Get sheet names from an Excel file Parameters: - filename: Path to the Excel file Returns: - JSON string with sheet names """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if pandas and xlsxwriter libraries are installed." try: result = await xlsx.get_excel_sheet_names(filename) if isinstance(result, dict) and "error" in result: return json.dumps(result, indent=2) return json.dumps({ "filename": filename, "sheet_names": result, "count": len(result), "status": "success" }, indent=2) except Exception as e: return json.dumps({"error": f"Error getting sheet names: {str(e)}"}, indent=2) async def xlsx_dataframe_info(dataframe_id: str, ctx: Context = None) -> str: """Get information about a DataFrame Parameters: - dataframe_id: ID of the DataFrame in memory Returns: - JSON string with DataFrame information """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if pandas and xlsxwriter libraries are installed." try: # Get DataFrame from memory df = _get_dataframe(dataframe_id) if df is None: return json.dumps({"error": f"DataFrame with ID '{dataframe_id}' not found"}, indent=2) # Get DataFrame info info = await xlsx.dataframe_info(df) if isinstance(info, dict) and "error" in info: return json.dumps(info, indent=2) response = { "dataframe_id": dataframe_id, "shape": info["shape"], "columns": info["columns"], "dtypes": info["dtypes"], "has_nulls": info["has_nulls"], "memory_usage": info["memory_usage"], "info": info["info"], "head": info["head"], "status": "success" } return json.dumps(response, indent=2) except Exception as e: return json.dumps({"error": f"Error getting DataFrame info: {str(e)}"}, indent=2) async def xlsx_list_dataframes(ctx: Context = None) -> str: """List all DataFrames currently in memory Returns: - JSON string with list of DataFrame IDs """ try: dataframe_ids = _list_dataframes() # Get basic info for each DataFrame dataframes_info = {} xlsx = _get_xlsx_service() for df_id in dataframe_ids: df = _get_dataframe(df_id) if df is not None: dataframes_info[df_id] = { "shape": df.shape, "columns": list(df.columns) } return json.dumps({ "dataframe_ids": dataframe_ids, "count": len(dataframe_ids), "dataframes_info": dataframes_info, "status": "success" }, indent=2) except Exception as e: return json.dumps({"error": f"Error listing DataFrames: {str(e)}"}, indent=2) async def xlsx_clear_dataframe(dataframe_id: str, ctx: Context = None) -> str: """Remove a DataFrame from memory Parameters: - dataframe_id: ID of the DataFrame to clear Returns: - JSON string with the result """ try: result = _clear_dataframe(dataframe_id) if result: return json.dumps({ "dataframe_id": dataframe_id, "status": "cleared", "message": f"DataFrame with ID '{dataframe_id}' has been removed from memory" }, indent=2) else: return json.dumps({ "error": f"DataFrame with ID '{dataframe_id}' not found", "status": "not_found" }, indent=2) except Exception as e: return json.dumps({"error": f"Error clearing DataFrame: {str(e)}"}, indent=2) async def xlsx_dataframe_to_excel(dataframe_id: str, filename: str, sheet_name: str = "Sheet1", index: bool = True, ctx: Context = None) -> str: """Export a DataFrame to an Excel file Parameters: - dataframe_id: ID of the DataFrame in memory - filename: Path to save the Excel file - sheet_name: Name of the sheet (default: "Sheet1") - index: Whether to include the DataFrame index (default: True) Returns: - JSON string with the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if pandas and xlsxwriter libraries are installed." try: # Get DataFrame from memory df = _get_dataframe(dataframe_id) if df is None: return json.dumps({"error": f"DataFrame with ID '{dataframe_id}' not found"}, indent=2) # Export to Excel result = await xlsx.dataframe_to_excel(df, filename, sheet_name=sheet_name, index=index) if isinstance(result, dict) and "error" in result: return json.dumps(result, indent=2) return json.dumps({ "dataframe_id": dataframe_id, "filename": filename, "sheet_name": sheet_name, "rows": result["rows"], "columns": result["columns"], "status": "exported" }, indent=2) except Exception as e: return json.dumps({"error": f"Error exporting DataFrame to Excel: {str(e)}"}, indent=2) async def xlsx_dataframe_to_csv(dataframe_id: str, filename: str, index: bool = True, encoding: str = "utf-8", sep: str = ",", ctx: Context = None) -> str: """Export a DataFrame to a CSV file Parameters: - dataframe_id: ID of the DataFrame in memory - filename: Path to save the CSV file - index: Whether to include the DataFrame index (default: True) - encoding: File encoding (default: "utf-8") - sep: Delimiter to use (default: ",") Returns: - JSON string with the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if pandas and xlsxwriter libraries are installed." try: # Get DataFrame from memory df = _get_dataframe(dataframe_id) if df is None: return json.dumps({"error": f"DataFrame with ID '{dataframe_id}' not found"}, indent=2) # Export to CSV result = await xlsx.dataframe_to_csv(df, filename, index=index, encoding=encoding, sep=sep) if isinstance(result, dict) and "error" in result: return json.dumps(result, indent=2) return json.dumps({ "dataframe_id": dataframe_id, "filename": filename, "rows": result["rows"], "columns": result["columns"], "status": "exported" }, indent=2) except Exception as e: return json.dumps({"error": f"Error exporting DataFrame to CSV: {str(e)}"}, indent=2) # # Data Manipulation Tools # async def xlsx_filter_dataframe(dataframe_id: str, query: str = None, column: str = None, value: Any = None, operator: str = "==", output_id: str = None, ctx: Context = None) -> str: """Filter a DataFrame by query or column condition Parameters: - dataframe_id: ID of the DataFrame to filter - query: Query string for filtering (e.g., "column > 5 and column2 == 'value'") - column: Column name to filter by (alternative to query) - value: Value to compare with (used with column and operator) - operator: Comparison operator (used with column and value): ==, !=, >, >=, <, <=, in, contains - output_id: ID to store the filtered DataFrame (default: dataframe_id + "_filtered") Returns: - JSON string with the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if pandas and xlsxwriter libraries are installed." try: # Get DataFrame from memory df = _get_dataframe(dataframe_id) if df is None: return json.dumps({"error": f"DataFrame with ID '{dataframe_id}' not found"}, indent=2) # Set default output ID if not provided if not output_id: output_id = f"{dataframe_id}_filtered" # Filter the DataFrame if query: # Filter by query string filtered_df = await xlsx.filter_dataframe(df, query=query) elif column and value is not None: # Filter by column and value filtered_df = await xlsx.filter_dataframe(df, column=column, value=value, operator=operator) else: return json.dumps({"error": "Either query or column+value must be provided"}, indent=2) if isinstance(filtered_df, dict) and "error" in filtered_df: return json.dumps(filtered_df, indent=2) # Store filtered DataFrame _store_dataframe(output_id, filtered_df) # Get DataFrame info info = await xlsx.dataframe_info(filtered_df) return json.dumps({ "original_id": dataframe_id, "filtered_id": output_id, "original_rows": df.shape[0], "filtered_rows": filtered_df.shape[0], "columns": info["columns"], "head": info["head"], "status": "filtered" }, indent=2) except Exception as e: return json.dumps({"error": f"Error filtering DataFrame: {str(e)}"}, indent=2) async def xlsx_sort_dataframe(dataframe_id: str, by: Union[str, List[str]], ascending: Union[bool, List[bool]] = True, output_id: str = None, ctx: Context = None) -> str: """Sort a DataFrame by columns Parameters: - dataframe_id: ID of the DataFrame to sort - by: Column name(s) to sort by (string or list of strings) - ascending: Whether to sort in ascending order (boolean or list of booleans) - output_id: ID to store the sorted DataFrame (default: dataframe_id + "_sorted") Returns: - JSON string with the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if pandas and xlsxwriter libraries are installed." try: # Get DataFrame from memory df = _get_dataframe(dataframe_id) if df is None: return json.dumps({"error": f"DataFrame with ID '{dataframe_id}' not found"}, indent=2) # Set default output ID if not provided if not output_id: output_id = f"{dataframe_id}_sorted" # Sort the DataFrame sorted_df = await xlsx.sort_dataframe(df, by=by, ascending=ascending) if isinstance(sorted_df, dict) and "error" in sorted_df: return json.dumps(sorted_df, indent=2) # Store sorted DataFrame _store_dataframe(output_id, sorted_df) # Get DataFrame info info = await xlsx.dataframe_info(sorted_df) return json.dumps({ "original_id": dataframe_id, "sorted_id": output_id, "sorted_by": by if isinstance(by, str) else list(by), "ascending": ascending if isinstance(ascending, bool) else list(ascending), "rows": sorted_df.shape[0], "head": info["head"], "status": "sorted" }, indent=2) except Exception as e: return json.dumps({"error": f"Error sorting DataFrame: {str(e)}"}, indent=2) async def xlsx_group_dataframe(dataframe_id: str, by: Union[str, List[str]], agg_func: Union[str, Dict[str, str]] = "mean", output_id: str = None, ctx: Context = None) -> str: """Group a DataFrame and apply aggregation Parameters: - dataframe_id: ID of the DataFrame to group - by: Column name(s) to group by (string or list of strings) - agg_func: Aggregation function(s) to apply (string or dict of column->function) - output_id: ID to store the grouped DataFrame (default: dataframe_id + "_grouped") Returns: - JSON string with the result """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if pandas and xlsxwriter libraries are installed." try: # Get DataFrame from memory df = _get_dataframe(dataframe_id) if df is None: return json.dumps({"error": f"DataFrame with ID '{dataframe_id}' not found"}, indent=2) # Set default output ID if not provided if not output_id: output_id = f"{dataframe_id}_grouped" # Group the DataFrame grouped_df = await xlsx.group_dataframe(df, by=by, agg_func=agg_func) if isinstance(grouped_df, dict) and "error" in grouped_df: return json.dumps(grouped_df, indent=2) # Store grouped DataFrame _store_dataframe(output_id, grouped_df) # Get DataFrame info info = await xlsx.dataframe_info(grouped_df) return json.dumps({ "original_id": dataframe_id, "grouped_id": output_id, "grouped_by": by if isinstance(by, str) else list(by), "agg_func": agg_func if isinstance(agg_func, str) else dict(agg_func), "rows": grouped_df.shape[0], "head": info["head"], "status": "grouped" }, indent=2) except Exception as e: return json.dumps({"error": f"Error grouping DataFrame: {str(e)}"}, indent=2) async def xlsx_describe_dataframe(dataframe_id: str, include: Union[str, List[str]] = None, exclude: Union[str, List[str]] = None, percentiles: List[float] = None, ctx: Context = None) -> str: """Get statistical description of a DataFrame Parameters: - dataframe_id: ID of the DataFrame to describe - include: Types of columns to include (None, 'all', or list of dtypes) - exclude: Types of columns to exclude (None or list of dtypes) - percentiles: List of percentiles to include in output (default: [0.25, 0.5, 0.75]) Returns: - JSON string with the statistical description """ xlsx = _get_xlsx_service() if not xlsx: return "XlsxWriter service not properly initialized. Check if pandas and xlsxwriter libraries are installed." try: # Get DataFrame from memory df = _get_dataframe(dataframe_id) if df is None: return json.dumps({"error": f"DataFrame with ID '{dataframe_id}' not found"}, indent=2) # Describe the DataFrame result = await xlsx.describe_dataframe(df, include=include, exclude=exclude, percentiles=percentiles) if isinstance(result, dict) and "error" in result: return json.dumps(result, indent=2) return json.dumps({ "dataframe_id": dataframe_id, "description": result, "status": "described" }, indent=2) except Exception as e: return json.dumps({"error": f"Error describing DataFrame: {str(e)}"}, indent=2) async def xlsx_get_column_values(dataframe_id: str, column: str, unique: bool = False, count: bool = False, ctx: Context = None) -> str: """Get values from a specific column in a DataFrame Parameters: - dataframe_id: ID of the DataFrame - column: Name of the column to get values from - unique: Whether to return only unique values (default: False) - count: Whether to count occurrences of each value (default: False) Returns: - JSON string with the column values """ try: # Get DataFrame from memory df = _get_dataframe(dataframe_id) if df is None: return json.dumps({"error": f"DataFrame with ID '{dataframe_id}' not found"}, indent=2) # Check if column exists if column not in df.columns: return json.dumps({"error": f"Column '{column}' not found in DataFrame '{dataframe_id}'"}, indent=2) if count: # Count occurrences of each value value_counts = df[column].value_counts().to_dict() return json.dumps({ "dataframe_id": dataframe_id, "column": column, "value_counts": value_counts, "total_values": len(df[column]), "unique_values": len(value_counts), "status": "success" }, indent=2) elif unique: # Get unique values unique_values = df[column].unique().tolist() # Convert non-serializable values to strings if necessary unique_values = [str(v) if not isinstance( v, (int, float, bool, str, type(None))) else v for v in unique_values] return json.dumps({ "dataframe_id": dataframe_id, "column": column, "unique_values": unique_values, "count": len(unique_values), "status": "success" }, indent=2) else: # Get all values values = df[column].tolist() # Convert non-serializable values to strings if necessary values = [str(v) if not isinstance( v, (int, float, bool, str, type(None))) else v for v in values] return json.dumps({ "dataframe_id": dataframe_id, "column": column, "values": values, "count": len(values), "status": "success" }, indent=2) except Exception as e: return json.dumps({"error": f"Error getting column values: {str(e)}"}, indent=2) async def xlsx_get_correlation(dataframe_id: str, method: str = "pearson", ctx: Context = None) -> str: """Get correlation matrix for a DataFrame Parameters: - dataframe_id: ID of the DataFrame - method: Correlation method ('pearson', 'kendall', or 'spearman') Returns: - JSON string with the correlation matrix """ try: # Get DataFrame from memory df = _get_dataframe(dataframe_id) if df is None: return json.dumps({"error": f"DataFrame with ID '{dataframe_id}' not found"}, indent=2) # Calculate correlation matrix corr = df.corr(method=method).round(4).to_dict() return json.dumps({ "dataframe_id": dataframe_id, "correlation_matrix": corr, "method": method, "status": "success" }, indent=2) except Exception as e: return json.dumps({"error": f"Error calculating correlation: {str(e)}"}, indent=2) # Tool registration and initialization _xlsx_service = None def initialize_xlsx_service(): """Initialize the XlsxWriter service""" global _xlsx_service _xlsx_service = XlsxWriterService() return _xlsx_service def _get_xlsx_service(): """Get or initialize the XlsxWriter service""" global _xlsx_service if _xlsx_service is None: _xlsx_service = initialize_xlsx_service() return _xlsx_service def get_xlsx_tools(): """Get a dictionary of all XlsxWriter tools for registration with MCP""" return { # Existing tools XlsxWriterTools.CREATE_WORKBOOK: xlsx_create_workbook, XlsxWriterTools.ADD_WORKSHEET: xlsx_add_worksheet, XlsxWriterTools.WRITE_DATA: xlsx_write_data, XlsxWriterTools.WRITE_MATRIX: xlsx_write_matrix, XlsxWriterTools.ADD_FORMAT: xlsx_add_format, XlsxWriterTools.ADD_CHART: xlsx_add_chart, XlsxWriterTools.ADD_IMAGE: xlsx_add_image, XlsxWriterTools.ADD_FORMULA: xlsx_add_formula, XlsxWriterTools.ADD_TABLE: xlsx_add_table, XlsxWriterTools.CLOSE_WORKBOOK: xlsx_close_workbook, # New reading tools XlsxWriterTools.READ_EXCEL: xlsx_read_excel, XlsxWriterTools.READ_CSV: xlsx_read_csv, XlsxWriterTools.GET_SHEET_NAMES: xlsx_get_sheet_names, # DataFrame management "xlsx_dataframe_info": xlsx_dataframe_info, "xlsx_list_dataframes": xlsx_list_dataframes, "xlsx_clear_dataframe": xlsx_clear_dataframe, "xlsx_get_column_values": xlsx_get_column_values, # Data manipulation XlsxWriterTools.FILTER_DATAFRAME: xlsx_filter_dataframe, XlsxWriterTools.SORT_DATAFRAME: xlsx_sort_dataframe, XlsxWriterTools.GROUP_DATAFRAME: xlsx_group_dataframe, XlsxWriterTools.DESCRIBE_DATAFRAME: xlsx_describe_dataframe, "xlsx_get_correlation": xlsx_get_correlation, # Export tools XlsxWriterTools.DATAFRAME_TO_EXCEL: xlsx_dataframe_to_excel, XlsxWriterTools.DATAFRAME_TO_CSV: xlsx_dataframe_to_csv } # This function will be called by the unified server to initialize the module def initialize(mcp=None): """Initialize the XlsxWriter module with MCP reference""" if mcp: set_external_mcp(mcp) # Initialize the service service = initialize_xlsx_service() # Check for required dependencies missing_deps = [] try: import xlsxwriter except ImportError: missing_deps.append("xlsxwriter") try: import pandas except ImportError: missing_deps.append("pandas") try: import openpyxl except ImportError: # Optional but recommended logging.warning( "openpyxl not installed. Some Excel reading features may be limited.") try: import xlrd except ImportError: # Optional but recommended for older Excel formats logging.warning( "xlrd not installed. Reading older Excel formats may be limited.") if missing_deps: logging.warning( f"Missing required dependencies: {', '.join(missing_deps)}. Please install them.") return False if service and service.initialized and service.pandas_available: logging.info( "XlsxWriter service initialized successfully with pandas support") return True else: logging.warning( "Failed to initialize XlsxWriter service. Please ensure xlsxwriter and pandas are installed.") return False if __name__ == "__main__": print("XlsxWriter service module - use with MCP Unified Server")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/getfounded/mcp-tool-kit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server