td_read_project_file
Extract and read specific files from Treasure Data project archives to examine SQL queries, workflow definitions, and other content without fully extracting the archive.
Instructions
Read the contents of a specific file from a project archive.
This tool extracts and reads a specific file from a project archive,
returning its contents. This allows examining SQL queries, workflow
definitions, and other files without fully extracting the archive.
Args:
archive_path: The path to the downloaded project archive (.tar.gz file)
file_path: The path of the file within the archive to read
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| archive_path | Yes | ||
| file_path | Yes |
Implementation Reference
- td_mcp_server/mcp_impl.py:512-589 (handler)The core handler function for the 'td_read_project_file' tool. It validates the archive and file paths, opens the tar.gz archive securely, extracts the specified file (skipping directories and oversized files), reads and decodes the content (UTF-8 preferred, fallback to Latin-1), and returns the file content along with metadata. Includes comprehensive error handling and security measures against path traversal and zip bombs.@mcp.tool() async def td_read_project_file(archive_path: str, file_path: str) -> dict[str, Any]: """Read the contents of a specific file from a project archive. This tool extracts and reads a specific file from a project archive, returning its contents. This allows examining SQL queries, workflow definitions, and other files without fully extracting the archive. Args: archive_path: The path to the downloaded project archive (.tar.gz file) file_path: The path of the file within the archive to read """ # Input validation - prevent path traversal if not _validate_archive_path(archive_path): return _format_error_response("Invalid archive path") if not _validate_file_path(file_path): return _format_error_response("Invalid file path") try: if not os.path.exists(archive_path): return _format_error_response("Archive file not found") try: with tarfile.open(archive_path, "r:gz") as tar: try: file_info = tar.getmember(file_path) # Security check for the member if not _safe_extract_member(file_info, "/tmp/validation"): return _format_error_response( "File access denied for security reasons" ) # Don't try to read directories if file_info.isdir(): return _format_error_response("Cannot read directory contents") # Extract and read the file f = tar.extractfile(file_info) if f is None: return _format_error_response("Failed to extract file") # Read with size limit if file_info.size > MAX_READ_SIZE: return _format_error_response("File too large to read") content_bytes = f.read() # Try to decode as text try: content = content_bytes.decode("utf-8") except UnicodeDecodeError: try: content = content_bytes.decode("latin-1") except UnicodeDecodeError: return _format_error_response( "File is not readable as text" ) extension = Path(file_path).suffix.lower() return { "success": True, "file_path": file_path, "content": content, "size": file_info.size, "extension": extension, } except KeyError: return _format_error_response("File not found in archive") except tarfile.ReadError: return _format_error_response("Invalid or corrupted archive file") except (OSError, UnicodeDecodeError) as e: return _format_error_response(f"Failed to read file: {str(e)}") except Exception as e: return _format_error_response(f"Unexpected error while reading file: {str(e)}")
- td_mcp_server/mcp_impl.py:512-512 (registration)The @mcp.tool() decorator registers the td_read_project_file function as an MCP tool in the FastMCP server.@mcp.tool()
- td_mcp_server/mcp_impl.py:61-82 (helper)Helper function to validate the archive_path input parameter, ensuring it's in a temporary directory, ends with .tar.gz, and prevents path traversal.def _validate_archive_path(archive_path: str) -> bool: """Validate archive path to ensure it's in allowed temporary directories.""" if not archive_path: return False # Normalize the path to prevent tricks normalized_path = os.path.normpath(archive_path) # Allow paths in temp directories or test paths temp_prefix = tempfile.gettempdir() allowed_prefixes = [temp_prefix, "/tmp"] if not any(normalized_path.startswith(prefix) for prefix in allowed_prefixes): return False # Prevent path traversal if ".." in normalized_path: return False if not archive_path.endswith(".tar.gz"): return False return True
- td_mcp_server/mcp_impl.py:49-58 (helper)Helper function to validate the file_path input parameter against path traversal attacks.def _validate_file_path(file_path: str) -> bool: """Validate file path to prevent path traversal attacks.""" if not file_path: return False # Normalize path and check for traversal attempts normalized = os.path.normpath(file_path) # Prevent absolute paths and traversal if normalized.startswith("/") or normalized.startswith("\\") or ".." in normalized: return False return True
- td_mcp_server/mcp_impl.py:85-107 (helper)Helper function used to safely validate tar members before extraction, preventing path traversal, absolute paths, and oversized files (zip bombs). Called within the handler for security.def _safe_extract_member(member, extract_path: str) -> bool: """Safely extract a tar member, preventing path traversal and other attacks.""" # Normalize the member name member_path = os.path.normpath(member.name) # Prevent absolute paths if member_path.startswith("/") or member_path.startswith("\\"): return False # Prevent path traversal if ".." in member_path: return False # Check final extracted path final_path = os.path.join(extract_path, member_path) if not final_path.startswith(extract_path): return False # Check file size (prevent zip bombs) if hasattr(member, "size") and member.size > MAX_FILE_SIZE: return False return True