Skip to main content
Glama
knishioka

Treasure Data MCP Server

by knishioka

td_read_project_file

Extract and read specific files from Treasure Data project archives to examine SQL queries, workflow definitions, and other content without fully extracting the archive.

Instructions

Read the contents of a specific file from a project archive.

This tool extracts and reads a specific file from a project archive,
returning its contents. This allows examining SQL queries, workflow
definitions, and other files without fully extracting the archive.

Args:
    archive_path: The path to the downloaded project archive (.tar.gz file)
    file_path: The path of the file within the archive to read

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
archive_pathYes
file_pathYes

Implementation Reference

  • The core handler function for the 'td_read_project_file' tool. It validates the archive and file paths, opens the tar.gz archive securely, extracts the specified file (skipping directories and oversized files), reads and decodes the content (UTF-8 preferred, fallback to Latin-1), and returns the file content along with metadata. Includes comprehensive error handling and security measures against path traversal and zip bombs.
    @mcp.tool()
    async def td_read_project_file(archive_path: str, file_path: str) -> dict[str, Any]:
        """Read the contents of a specific file from a project archive.
    
        This tool extracts and reads a specific file from a project archive,
        returning its contents. This allows examining SQL queries, workflow
        definitions, and other files without fully extracting the archive.
    
        Args:
            archive_path: The path to the downloaded project archive (.tar.gz file)
            file_path: The path of the file within the archive to read
        """
        # Input validation - prevent path traversal
        if not _validate_archive_path(archive_path):
            return _format_error_response("Invalid archive path")
    
        if not _validate_file_path(file_path):
            return _format_error_response("Invalid file path")
    
        try:
            if not os.path.exists(archive_path):
                return _format_error_response("Archive file not found")
    
            try:
                with tarfile.open(archive_path, "r:gz") as tar:
                    try:
                        file_info = tar.getmember(file_path)
    
                        # Security check for the member
                        if not _safe_extract_member(file_info, "/tmp/validation"):
                            return _format_error_response(
                                "File access denied for security reasons"
                            )
    
                        # Don't try to read directories
                        if file_info.isdir():
                            return _format_error_response("Cannot read directory contents")
    
                        # Extract and read the file
                        f = tar.extractfile(file_info)
                        if f is None:
                            return _format_error_response("Failed to extract file")
    
                        # Read with size limit
                        if file_info.size > MAX_READ_SIZE:
                            return _format_error_response("File too large to read")
    
                        content_bytes = f.read()
    
                        # Try to decode as text
                        try:
                            content = content_bytes.decode("utf-8")
                        except UnicodeDecodeError:
                            try:
                                content = content_bytes.decode("latin-1")
                            except UnicodeDecodeError:
                                return _format_error_response(
                                    "File is not readable as text"
                                )
    
                        extension = Path(file_path).suffix.lower()
    
                        return {
                            "success": True,
                            "file_path": file_path,
                            "content": content,
                            "size": file_info.size,
                            "extension": extension,
                        }
                    except KeyError:
                        return _format_error_response("File not found in archive")
            except tarfile.ReadError:
                return _format_error_response("Invalid or corrupted archive file")
        except (OSError, UnicodeDecodeError) as e:
            return _format_error_response(f"Failed to read file: {str(e)}")
        except Exception as e:
            return _format_error_response(f"Unexpected error while reading file: {str(e)}")
  • The @mcp.tool() decorator registers the td_read_project_file function as an MCP tool in the FastMCP server.
    @mcp.tool()
  • Helper function to validate the archive_path input parameter, ensuring it's in a temporary directory, ends with .tar.gz, and prevents path traversal.
    def _validate_archive_path(archive_path: str) -> bool:
        """Validate archive path to ensure it's in allowed temporary directories."""
        if not archive_path:
            return False
    
        # Normalize the path to prevent tricks
        normalized_path = os.path.normpath(archive_path)
    
        # Allow paths in temp directories or test paths
        temp_prefix = tempfile.gettempdir()
        allowed_prefixes = [temp_prefix, "/tmp"]
    
        if not any(normalized_path.startswith(prefix) for prefix in allowed_prefixes):
            return False
    
        # Prevent path traversal
        if ".." in normalized_path:
            return False
    
        if not archive_path.endswith(".tar.gz"):
            return False
        return True
  • Helper function to validate the file_path input parameter against path traversal attacks.
    def _validate_file_path(file_path: str) -> bool:
        """Validate file path to prevent path traversal attacks."""
        if not file_path:
            return False
        # Normalize path and check for traversal attempts
        normalized = os.path.normpath(file_path)
        # Prevent absolute paths and traversal
        if normalized.startswith("/") or normalized.startswith("\\") or ".." in normalized:
            return False
        return True
  • Helper function used to safely validate tar members before extraction, preventing path traversal, absolute paths, and oversized files (zip bombs). Called within the handler for security.
    def _safe_extract_member(member, extract_path: str) -> bool:
        """Safely extract a tar member, preventing path traversal and other attacks."""
        # Normalize the member name
        member_path = os.path.normpath(member.name)
    
        # Prevent absolute paths
        if member_path.startswith("/") or member_path.startswith("\\"):
            return False
    
        # Prevent path traversal
        if ".." in member_path:
            return False
    
        # Check final extracted path
        final_path = os.path.join(extract_path, member_path)
        if not final_path.startswith(extract_path):
            return False
    
        # Check file size (prevent zip bombs)
        if hasattr(member, "size") and member.size > MAX_FILE_SIZE:
            return False
    
        return True

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/knishioka/td-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server