load_csv_from_url
Load CSV data from a URL into a DataBeak session for analysis. Downloads, parses with security validation, and returns a session ID with data preview.
Instructions
Load CSV file from URL into DataBeak session.
Downloads and parses CSV data with security validation. Returns session ID and data preview for further operations.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| url | Yes | URL of the CSV file to download and load | |
| encoding | No | Text encoding for file reading (utf-8, latin1, cp1252, etc.) | utf-8 |
| delimiter | No | Column delimiter character (comma, tab, semicolon, pipe) | , |
| header_config | No | Header detection configuration |
Implementation Reference
- The main handler function that implements the load_csv_from_url tool. Downloads CSV from the provided URL using httpx, parses it with pandas.read_csv handling encoding, delimiter, and header config, validates DataFrame size against settings, loads into the session, and returns a LoadResult with preview and metadata.async def load_csv_from_url( ctx: Annotated[Context, Field(description="FastMCP context for session access")], url: Annotated[str, Field(description="URL of the CSV file to download and load")], encoding: Annotated[ str, Field(description="Text encoding for file reading (utf-8, latin1, cp1252, etc.)") ] = "utf-8", delimiter: Annotated[ str, Field(description="Column delimiter character (comma, tab, semicolon, pipe)") ] = ",", header_config: Annotated[ HeaderConfigUnion | None, Field(default=None, description="Header detection configuration"), ] = None, ) -> LoadResult: """Load CSV file from URL into DataBeak session. Downloads and parses CSV data with security validation. Returns session ID and data preview for further operations. """ # Get session_id from FastMCP context session_id = ctx.session_id settings = get_settings() # Handle default header configuration if header_config is None: header_config = AutoDetectHeader() # Validate URL is_valid, validated_url = validate_url(url) if not is_valid: msg = f"Invalid URL: {validated_url}" raise ToolError(msg) await ctx.info(f"Loading CSV from URL: {url}") await ctx.report_progress(0.1) # Download with timeout and content-type verification try: # Pre-download validation with timeout and content-type checking await ctx.info("Verifying URL and downloading content...") # Use async HTTP client for non-blocking download async with httpx.AsyncClient(timeout=settings.url_timeout_seconds) as client: # HEAD request first to check content-type and size head_response = await client.head(url, follow_redirects=True) head_response.raise_for_status() # Verify content-type content_type = head_response.headers.get("content-type", "").lower() content_length = head_response.headers.get("content-length") # Check content type valid_content_types = [ "text/csv", "text/plain", "application/csv", "application/octet-stream", # Some servers use generic type "text/tab-separated-values", ] if content_type and not any(ct in content_type for ct in valid_content_types): logger.warning("Unexpected content-type: %s. Proceeding anyway.", content_type) await ctx.info(f"Warning: Content-type is {content_type}, expected CSV format") # Check content length if content_length: download_size_mb = int(content_length) / (1024 * 1024) if download_size_mb > settings.max_download_size_mb: msg = f"Download too large: {download_size_mb:.1f} MB exceeds limit of {settings.max_download_size_mb} MB" raise ToolError(msg) await ctx.info(f"Download validated. Content-type: {content_type or 'unknown'}") await ctx.report_progress(0.3) # Download CSV content with size enforcement max_bytes = settings.max_download_size_mb * 1024 * 1024 downloaded_bytes = 0 chunks = [] async with client.stream("GET", url, follow_redirects=True) as response: response.raise_for_status() async for chunk in response.aiter_bytes(chunk_size=8192): downloaded_bytes += len(chunk) if downloaded_bytes > max_bytes: msg = f"Download exceeded size limit of {settings.max_download_size_mb} MB during transfer" raise ToolError(msg) chunks.append(chunk) # Decode downloaded content csv_bytes = b"".join(chunks) csv_content = csv_bytes.decode("utf-8", errors="replace") # Parse CSV from downloaded content df = pd.read_csv( StringIO(csv_content), encoding=encoding, delimiter=delimiter, header=resolve_header_param(header_config), ) validate_dataframe_size(df) except (httpx.TimeoutException, httpx.HTTPError, httpx.RequestError) as e: logger.exception("Network error downloading URL") await ctx.error(f"Network error: {e}") msg = f"Network error: {e}" raise ToolError(msg) from e except UnicodeDecodeError as e: # CSV parsing succeeded but encoding specified doesn't match content # This shouldn't happen with httpx.response.text (auto-detects encoding) # but keeping fallback for edge cases msg = f"Encoding error: {e}. The downloaded content encoding doesn't match '{encoding}'." raise ToolError(msg) from e await ctx.report_progress(0.8) # Get or create session session_manager = get_session_manager() session = session_manager.get_or_create_session(session_id) if df is None: msg = "Failed to load data from URL" raise ToolError(msg) session.load_data(df, url) await ctx.report_progress(1.0) await ctx.info(f"Loaded {len(df)} rows and {len(df.columns)} columns from URL") return create_load_result(df)
- src/databeak/servers/io_server.py:471-471 (registration)Registers the load_csv_from_url handler function as an MCP tool on the io_server FastMCP instance.io_server.tool(name="load_csv_from_url")(load_csv_from_url)
- Pydantic models defining the discriminated union for header_config input parameter, including AutoDetectHeader, NoHeader, ExplicitHeaderRow subclasses.class HeaderConfig(BaseModel, ABC): """Abstract base class for header configuration.""" mode: str = Field(description="Header detection mode") @abstractmethod def get_pandas_param(self) -> int | None | Literal["infer"]: """Convert to pandas read_csv header parameter.""" ... class AutoDetectHeader(HeaderConfig): """Auto-detect whether file has headers using pandas inference.""" mode: Literal["auto"] = "auto" def get_pandas_param(self) -> Literal["infer"]: """Return pandas parameter for auto-detection.""" return "infer" class NoHeader(HeaderConfig): """File has no headers - generate default column names (Column_0, Column_1, etc.).""" mode: Literal["none"] = "none" def get_pandas_param(self) -> None: """Return pandas parameter for no headers.""" return None class ExplicitHeaderRow(HeaderConfig): """Use specific row number as header.""" mode: Literal["row"] = "row" row_number: NonNegativeInt = Field(description="Row number to use as header (0-based)") def get_pandas_param(self) -> int: """Return pandas parameter for explicit header row.""" return self.row_number # Discriminated union type HeaderConfigUnion = Annotated[ AutoDetectHeader | NoHeader | ExplicitHeaderRow, Discriminator("mode"), ]
- Pydantic model for the return type of the load_csv_from_url tool, including rows, columns, data preview, and memory usage.class LoadResult(BaseToolResponse): """Response model for data loading operations.""" rows_affected: int = Field(description="Number of rows loaded") columns_affected: list[str] = Field(description="List of column names detected") data: DataPreview | None = Field(None, description="Sample of loaded data") memory_usage_mb: float | None = Field(None, description="Memory usage in megabytes")
- Helper function called by the handler to create the LoadResult from the loaded DataFrame, generating a data preview.def create_load_result(df: pd.DataFrame) -> LoadResult: """Create LoadResult from a DataFrame. Args: df: Loaded DataFrame Returns: LoadResult with data preview and metadata """ # Create data preview with indices preview_data = create_data_preview_with_indices(df, 5) data_preview = DataPreview( rows=preview_data["records"], row_count=preview_data["total_rows"], column_count=preview_data["total_columns"], truncated=preview_data["preview_rows"] < preview_data["total_rows"], ) return LoadResult( rows_affected=len(df), columns_affected=[str(col) for col in df.columns], data=data_preview, memory_usage_mb=df.memory_usage(deep=True).sum() / (1024 * 1024), )