Skip to main content
Glama
io_operations.py14.9 kB
"""I/O operations tools for CSV Editor MCP Server.""" import pandas as pd from typing import Dict, Any, Optional, List, Literal from pathlib import Path import tempfile import json from datetime import datetime from fastmcp import Context from ..models import ( get_session_manager, OperationResult, OperationType, ExportFormat, SessionInfo ) from ..utils.validators import validate_file_path, validate_url, sanitize_filename async def load_csv( file_path: str, encoding: str = "utf-8", delimiter: str = ",", session_id: Optional[str] = None, header: Optional[int] = 0, na_values: Optional[List[str]] = None, parse_dates: Optional[List[str]] = None, ctx: Context = None ) -> Dict[str, Any]: """Load a CSV file into a session. Args: file_path: Path to the CSV file encoding: File encoding (default: utf-8) delimiter: Column delimiter (default: comma) session_id: Optional existing session ID to use header: Row number to use as header (default: 0) na_values: Additional strings to recognize as NA/NaN parse_dates: Columns to parse as dates ctx: FastMCP context Returns: Operation result with session ID and data info """ try: # Validate file path is_valid, validated_path = validate_file_path(file_path) if not is_valid: return OperationResult( success=False, message=f"Invalid file path: {validated_path}", error=validated_path ).model_dump() if ctx: await ctx.info(f"Loading CSV file: {validated_path}") await ctx.report_progress(0.1, "Validating file...") # Get or create session session_manager = get_session_manager() session = session_manager.get_or_create_session(session_id) if ctx: await ctx.report_progress(0.3, "Reading file...") # Read CSV with pandas read_params = { "filepath_or_buffer": validated_path, "encoding": encoding, "delimiter": delimiter, "header": header, } if na_values: read_params["na_values"] = na_values if parse_dates: read_params["parse_dates"] = parse_dates df = pd.read_csv(**read_params) if ctx: await ctx.report_progress(0.8, "Processing data...") # Load into session session.load_data(df, validated_path) if ctx: await ctx.report_progress(1.0, "Complete!") await ctx.info(f"Loaded {len(df)} rows and {len(df.columns)} columns") return OperationResult( success=True, message=f"Successfully loaded CSV file", session_id=session.session_id, rows_affected=len(df), columns_affected=df.columns.tolist(), data={ "shape": df.shape, "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()}, "memory_usage_mb": df.memory_usage(deep=True).sum() / (1024 * 1024), "preview": df.to_dict('records') } ).model_dump() except Exception as e: if ctx: await ctx.error(f"Failed to load CSV: {str(e)}") return OperationResult( success=False, message="Failed to load CSV file", error=str(e) ).model_dump() async def load_csv_from_url( url: str, encoding: str = "utf-8", delimiter: str = ",", session_id: Optional[str] = None, ctx: Context = None ) -> Dict[str, Any]: """Load a CSV file from a URL. Args: url: URL of the CSV file encoding: File encoding delimiter: Column delimiter session_id: Optional existing session ID ctx: FastMCP context Returns: Operation result with session ID and data info """ try: # Validate URL is_valid, validated_url = validate_url(url) if not is_valid: return OperationResult( success=False, message=f"Invalid URL: {validated_url}", error=validated_url ).model_dump() if ctx: await ctx.info(f"Loading CSV from URL: {url}") await ctx.report_progress(0.1, "Downloading file...") # Download CSV using pandas (it handles URLs directly) df = pd.read_csv(url, encoding=encoding, delimiter=delimiter) if ctx: await ctx.report_progress(0.8, "Processing data...") # Get or create session session_manager = get_session_manager() session = session_manager.get_or_create_session(session_id) session.load_data(df, url) if ctx: await ctx.report_progress(1.0, "Complete!") await ctx.info(f"Loaded {len(df)} rows and {len(df.columns)} columns") return OperationResult( success=True, message=f"Successfully loaded CSV from URL", session_id=session.session_id, rows_affected=len(df), columns_affected=df.columns.tolist(), data={ "shape": df.shape, "source_url": url, "preview": df.head(5).to_dict('records') } ).model_dump() except Exception as e: if ctx: await ctx.error(f"Failed to load CSV from URL: {str(e)}") return OperationResult( success=False, message="Failed to load CSV from URL", error=str(e) ).model_dump() async def load_csv_from_content( content: str, delimiter: str = ",", session_id: Optional[str] = None, has_header: bool = True, ctx: Context = None ) -> Dict[str, Any]: """Load CSV data from a string content. Args: content: CSV content as string delimiter: Column delimiter session_id: Optional existing session ID has_header: Whether first row is header ctx: FastMCP context Returns: Operation result with session ID and data info """ try: if ctx: await ctx.info("Loading CSV from content string") # Parse CSV from string from io import StringIO df = pd.read_csv( StringIO(content), delimiter=delimiter, header=0 if has_header else None ) # Get or create session session_manager = get_session_manager() session = session_manager.get_or_create_session(session_id) session.load_data(df, None) if ctx: await ctx.info(f"Loaded {len(df)} rows and {len(df.columns)} columns") return OperationResult( success=True, message=f"Successfully loaded CSV from content", session_id=session.session_id, rows_affected=len(df), columns_affected=df.columns.tolist(), data={ "shape": df.shape, "preview": df.head(5).to_dict('records') } ).model_dump() except Exception as e: if ctx: await ctx.error(f"Failed to parse CSV content: {str(e)}") return OperationResult( success=False, message="Failed to parse CSV content", error=str(e) ).model_dump() async def export_csv( session_id: str, file_path: Optional[str] = None, format: ExportFormat = ExportFormat.CSV, encoding: str = "utf-8", index: bool = False, ctx: Context = None ) -> Dict[str, Any]: """Export session data to various formats. Args: session_id: Session ID to export file_path: Optional output file path (auto-generated if not provided) format: Export format (csv, tsv, json, excel, parquet, html, markdown) encoding: Output encoding index: Whether to include index in output ctx: FastMCP context Returns: Operation result with file path """ try: # Get session session_manager = get_session_manager() session = session_manager.get_session(session_id) if not session or session.df is None: return OperationResult( success=False, message="Session not found or no data loaded", error="Invalid session ID" ).model_dump() if ctx: await ctx.info(f"Exporting data in {format.value} format") await ctx.report_progress(0.1, "Preparing export...") # Generate file path if not provided if not file_path: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"export_{session_id[:8]}_{timestamp}" # Determine extension based on format extensions = { ExportFormat.CSV: ".csv", ExportFormat.TSV: ".tsv", ExportFormat.JSON: ".json", ExportFormat.EXCEL: ".xlsx", ExportFormat.PARQUET: ".parquet", ExportFormat.HTML: ".html", ExportFormat.MARKDOWN: ".md" } file_path = tempfile.gettempdir() + "/" + filename + extensions[format] file_path = Path(file_path) df = session.df if ctx: await ctx.report_progress(0.5, f"Writing {format.value} file...") # Export based on format if format == ExportFormat.CSV: df.to_csv(file_path, encoding=encoding, index=index) elif format == ExportFormat.TSV: df.to_csv(file_path, sep='\t', encoding=encoding, index=index) elif format == ExportFormat.JSON: df.to_json(file_path, orient='records', indent=2) elif format == ExportFormat.EXCEL: df.to_excel(file_path, index=index, engine='openpyxl') elif format == ExportFormat.PARQUET: df.to_parquet(file_path, index=index) elif format == ExportFormat.HTML: df.to_html(file_path, index=index) elif format == ExportFormat.MARKDOWN: df.to_markdown(file_path, index=index) else: return OperationResult( success=False, message=f"Unsupported format: {format}", error="Invalid export format" ).model_dump() # Record operation session.record_operation( OperationType.EXPORT, {"format": format.value, "file_path": str(file_path)} ) if ctx: await ctx.report_progress(1.0, "Export complete!") await ctx.info(f"Exported to {file_path}") return OperationResult( success=True, message=f"Successfully exported data to {format.value}", session_id=session_id, data={ "file_path": str(file_path), "format": format.value, "rows_exported": len(df), "file_size_bytes": file_path.stat().st_size } ).model_dump() except Exception as e: if ctx: await ctx.error(f"Failed to export data: {str(e)}") return OperationResult( success=False, message="Failed to export data", error=str(e) ).model_dump() async def get_session_info( session_id: str, ctx: Context = None ) -> Dict[str, Any]: """Get information about a specific session. Args: session_id: Session ID ctx: FastMCP context Returns: Session information """ try: session_manager = get_session_manager() session = session_manager.get_session(session_id) if not session: return OperationResult( success=False, message="Session not found", error="Invalid session ID" ).model_dump() if ctx: await ctx.info(f"Retrieved info for session {session_id}") info = session.get_info() return OperationResult( success=True, message="Session info retrieved", session_id=session_id, data=info.model_dump() ).model_dump() except Exception as e: if ctx: await ctx.error(f"Failed to get session info: {str(e)}") return OperationResult( success=False, message="Failed to get session info", error=str(e) ).model_dump() async def list_sessions(ctx: Context = None) -> Dict[str, Any]: """List all active sessions. Args: ctx: FastMCP context Returns: List of active sessions """ try: session_manager = get_session_manager() sessions = session_manager.list_sessions() if ctx: await ctx.info(f"Found {len(sessions)} active sessions") return { "success": True, "message": f"Found {len(sessions)} active sessions", "sessions": [s.model_dump() for s in sessions] } except Exception as e: if ctx: await ctx.error(f"Failed to list sessions: {str(e)}") return { "success": False, "message": "Failed to list sessions", "error": str(e) } async def close_session( session_id: str, ctx: Context = None ) -> Dict[str, Any]: """Close and clean up a session. Args: session_id: Session ID to close ctx: FastMCP context Returns: Operation result """ try: session_manager = get_session_manager() removed = await session_manager.remove_session(session_id) if not removed: return OperationResult( success=False, message="Session not found", error="Invalid session ID" ).model_dump() if ctx: await ctx.info(f"Closed session {session_id}") return OperationResult( success=True, message=f"Session {session_id} closed successfully", session_id=session_id ).model_dump() except Exception as e: if ctx: await ctx.error(f"Failed to close session: {str(e)}") return OperationResult( success=False, message="Failed to close session", error=str(e) ).model_dump()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/santoshray02/csv-editor'

If you have feedback or need assistance with the MCP directory API, please join our Discord server