export_to_csv
Execute SQL queries and export results directly to CSV files for data analysis, sharing, or backup purposes.
Instructions
Export query results to a CSV file.
Args:
query: SQL SELECT query to execute
filename: Output filename (relative or absolute path)
delimiter: Field delimiter (default: comma)
Returns:
Dictionary with:
- status: 'success' or error
- path: Absolute path to created file
- row_count: Number of rows exported
- file_size: Size of created file in bytes
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | ||
| filename | Yes | ||
| delimiter | No | , |
Implementation Reference
- src/mssql_mcp/tools/export.py:81-161 (handler)The @mcp.tool()-decorated export_to_csv function that executes the tool: validates the SQL SELECT query, executes it, writes results to CSV file with specified delimiter, handles empty results and errors.@mcp.tool() def export_to_csv( query: str, filename: str, delimiter: str = ",", ) -> dict[str, Any]: """Export query results to a CSV file. Args: query: SQL SELECT query to execute filename: Output filename (relative or absolute path) delimiter: Field delimiter (default: comma) Returns: Dictionary with: - status: 'success' or error - path: Absolute path to created file - row_count: Number of rows exported - file_size: Size of created file in bytes """ try: manager = get_connection_manager() config = manager.config # Create validator validator = SQLValidator( blocked_commands=config.blocked_commands, read_only=True, allowed_schemas=config.allowed_schemas if config.allowed_schemas else None, ) # Validate query is SELECT-only if not validator.is_select_only(query): return { "error": "Only SELECT queries are allowed for export", "query": query, } # Check blocked commands is_valid, error = validator.validate(query) if not is_valid: return {"error": error, "query": query} # Execute query (no row limit for export) rows = manager.execute_query(query) # Prepare output path path = Path(filename) if not path.is_absolute(): path = Path.cwd() / path # Ensure parent directory exists path.parent.mkdir(parents=True, exist_ok=True) # Handle empty results if not rows: # Create empty file with just a newline path.write_text("") return { "status": "success", "path": str(path.absolute()), "row_count": 0, "file_size": 0, } # Write CSV file with open(path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=rows[0].keys(), delimiter=delimiter) writer.writeheader() writer.writerows(rows) return { "status": "success", "path": str(path.absolute()), "row_count": len(rows), "file_size": path.stat().st_size, } except Exception as e: logger.error(f"Error exporting to CSV: {e}") return {"error": str(e)}
- src/mssql_mcp/server.py:203-203 (registration)Import statement that loads the tools modules (including export.py containing export_to_csv), registering all tools with the MCP server instance.from .tools import crud, databases, export, query, stored_procs, tables # noqa: E402, F401
- src/mssql_mcp/tools/__init__.py:4-4 (registration)Import of export.py (and other tool modules) within tools package, facilitating registration when tools package is imported.from . import crud, databases, export, query, stored_procs, tables
- src/mssql_mcp/tools/export.py:82-100 (schema)Function signature with type annotations and docstring defining input parameters (query, filename, delimiter) and output format for the export_to_csv tool schema.def export_to_csv( query: str, filename: str, delimiter: str = ",", ) -> dict[str, Any]: """Export query results to a CSV file. Args: query: SQL SELECT query to execute filename: Output filename (relative or absolute path) delimiter: Field delimiter (default: comma) Returns: Dictionary with: - status: 'success' or error - path: Absolute path to created file - row_count: Number of rows exported - file_size: Size of created file in bytes """