Skip to main content
Glama

convert_pdf_file

Convert PDF files to Markdown format for easier editing and content reuse, with optional OCR support for scanned documents.

Instructions

Convert local PDF file to Markdown, supports single file or file list

Args:
    file_path: PDF file local path or path list, can be separated by spaces, commas, or newlines
    enable_ocr: Whether to enable OCR (default: True)

Returns:
    dict: Conversion result information

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
file_pathYes
enable_ocrNo

Implementation Reference

  • The core handler function for the 'convert_pdf_file' tool. It handles uploading local PDF files to the MinerU API using presigned URLs, polling for processing completion, and downloading the resulting Markdown ZIP files which are automatically extracted.
    @mcp.tool()  
    async def convert_pdf_file(file_path: str, enable_ocr: bool = True) -> Dict[str, Any]:
        """
        Convert local PDF file to Markdown, supports single file or file list
        
        Args:
            file_path: PDF file local path or path list, can be separated by spaces, commas, or newlines
            enable_ocr: Whether to enable OCR (default: True)
    
        Returns:
            dict: Conversion result information
        """
        if not MINERU_API_KEY:
            return {"success": False, "error": "Missing API key, please set environment variable MINERU_API_KEY"}
        
        if isinstance(file_path, str):
            file_paths = parse_path_string(file_path)
        else:
            file_paths = [file_path]  
        
        for path in file_paths:
            if not os.path.exists(path):
                return {"success": False, "error": f"File does not exist: {path}"}
            else:
                if not path.lower().endswith('.pdf'):
                    return {"success": False, "error": f"File is not in PDF format: {path}"}
        
        async with httpx.AsyncClient(timeout=300.0) as client:
            try:
                file_names = [os.path.basename(path) for path in file_paths]
                
                files_data = []
                for i, name in enumerate(file_names):
                    files_data.append({
                        "name": name,
                        "is_ocr": enable_ocr,
                        "data_id": f"file_convert_{i+1}_{int(time.time())}"
                    })
                
                file_url_data = {
                    "enable_formula": True,
                    "language": "auto",
                    "layout_model": "doclayout_yolo",
                    "enable_table": True,
                    "files": files_data
                }
                
                file_url_response = await client.post(
                    MINERU_FILE_URLS_API,
                    headers=HEADERS,
                    json=file_url_data,
                    timeout=60.0
                )
                
                if file_url_response.status_code != 200:
                    return {"success": False, "error": f"Failed to get upload link: {file_url_response.status_code}"}
                
                file_url_result = file_url_response.json()
                
                if file_url_result.get("code") != 0 and file_url_result.get("code") != 200:
                    error_msg = file_url_result.get("msg", "Unknown error")
                    return {"success": False, "error": f"Failed to get upload link: {error_msg}"}
                
                batch_id = file_url_result.get("data", {}).get("batch_id", "")
                file_urls = file_url_result.get("data", {}).get("file_urls", [])
                
                if not batch_id or not file_urls or len(file_urls) != len(file_paths):
                    return {"success": False, "error": "Failed to get upload link or batch ID"}
                
                upload_results = []
                for i, (file_path, upload_url) in enumerate(zip(file_paths, file_urls)):
                    try:
                        with open(file_path, 'rb') as f:
                            file_content = f.read()
                            
                            upload_response = await client.put(
                                upload_url,
                                content=file_content,
                                headers={},  
                                timeout=300.0
                            )
                        
                        if upload_response.status_code != 200:
                            upload_results.append({"file": file_names[i], "success": False})
                        else:
                            upload_results.append({"file": file_names[i], "success": True})
                    except Exception as e:
                        upload_results.append({"file": file_names[i], "success": False, "error": str(e)})
                
                if not any(result["success"] for result in upload_results):
                    return {"success": False, "error": "All files failed to upload", "upload_results": upload_results}
                
                task_status = await check_task_status(client, batch_id)
                
                if not task_status.get("success"):
                    return task_status
                
                downloaded_files = await download_batch_results(client, task_status.get("extract_results", []))
                
                return {
                    "success": True, 
                    "downloaded_files": downloaded_files,
                    "batch_id": batch_id,
                    "upload_results": upload_results,
                    "total_files": len(file_paths),
                    "processed_files": len(downloaded_files)
                }
                    
            except Exception as e:
                return {"success": False, "error": str(e)}
  • The @mcp.tool() decorator registers the convert_pdf_file function as an MCP tool.
    @mcp.tool()  
  • Helper function to parse input file_path string into a list of paths, handling quotes, spaces, commas, newlines.
    def parse_path_string(path_string):
        """
        Parse file path string separated by spaces, commas, or newlines
        
        Args:
            path_string: File path string
            
        Returns:
            list: List of file paths
        """
        if isinstance(path_string, str):
            if (path_string.startswith('"') and path_string.endswith('"')) or \
               (path_string.startswith("'") and path_string.endswith("'")):
                path_string = path_string[1:-1]
        
        paths = []
        for part in path_string.split():
            if ',' in part:
                paths.extend(part.split(','))
            elif '\n' in part:
                paths.extend(part.split('\n'))
            else:
                paths.append(part)
        
        cleaned_paths = []
        for path in paths:
            if (path.startswith('"') and path.endswith('"')) or \
               (path.startswith("'") and path.endswith("'")):
                cleaned_paths.append(path[1:-1])
            else:
                cleaned_paths.append(path)
        
        return cleaned_paths
  • Supporting helper functions for task status polling, batch result downloading, ZIP extraction, and status printing, shared with convert_pdf_url but used by convert_pdf_file.
    async def check_task_status(client, batch_id, max_retries=60, sleep_seconds=5):
        """
        Check batch task status
        
        Args:
            client: HTTP client
            batch_id: Batch ID
            max_retries: Maximum number of retries
            sleep_seconds: Seconds between retries
            
        Returns:
            dict: Dictionary containing task status information, or error message if failed
        """
        retry_count = 0
        
        while retry_count < max_retries:
            retry_count += 1
            
            try:
                status_response = await client.get(
                    f"{MINERU_BATCH_RESULTS_API}/{batch_id}",
                    headers=HEADERS,
                    timeout=60.0  
                )
                
                if status_response.status_code != 200:
                    retry_count += 1
                    if retry_count < max_retries:
                        await asyncio.sleep(sleep_seconds)
                    continue
                
                try:
                    status_data = status_response.json()
                except Exception as e:
                    retry_count += 1
                    if retry_count < max_retries:
                        await asyncio.sleep(sleep_seconds)
                    continue
                
                task_data = status_data.get("data", {})
                extract_results = task_data.get("extract_result", [])
                
                all_done, any_done = print_task_status(extract_results)
                
                if all_done:
                    return {
                        "success": True,
                        "extract_results": extract_results,
                        "task_data": task_data,
                        "status_data": status_data
                    }
                
                await asyncio.sleep(sleep_seconds)
                
            except Exception as e:
                retry_count += 1
                if retry_count < max_retries:
                    await asyncio.sleep(sleep_seconds)
        
        return {
            "success": False,
            "error": "Polling timeout, unable to get final results"
        }
    
    async def download_batch_results(client, extract_results):
        """
        Download batch task results
        
        Args:
            client: HTTP client
            extract_results: List of task results
            
        Returns:
            list: List of downloaded file information
        """
        downloaded_files = []
        
        for i, result in enumerate(extract_results):
            if result.get("state") == "done":
                try:
                    file_name = result.get("file_name", f"file_{i+1}")
                    zip_url = result.get("full_zip_url", "")
                    
                    if not zip_url:
                        continue
                    
                    downloaded_file = await download_zip_file(client, zip_url, file_name)
                    if downloaded_file:
                        downloaded_files.append(downloaded_file)
                except Exception as e:
                    pass
        
        return downloaded_files
    
    async def download_zip_file(client, zip_url, file_name, prefix="md", max_retries=3):
        """
        Download and save ZIP file, then automatically unzip
        
        Args:
            client: HTTP client
            zip_url: ZIP file URL
            file_name: File name
            prefix: File prefix
            max_retries: Maximum number of retries
            
        Returns:
            dict: Dictionary containing file name and unzip directory, or None if failed
        """
        retry_count = 0
        
        while retry_count < max_retries:
            try:
                zip_response = await client.get(zip_url, follow_redirects=True, timeout=120.0)
                
                if zip_response.status_code == 200:
                    current_date = time.strftime("%Y%m%d")
                    
                    base_name = os.path.splitext(file_name)[0]
                    # Only remove control characters and chars that are invalid in filenames
                    safe_name = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '', base_name).strip()
                    # Replace spaces with underscores
                    safe_name = re.sub(r'\s+', '_', safe_name)
                    
                    if safe_name.isdigit() or re.match(r'^\d+\.\d+$', safe_name):
                        safe_name = f"paper_{safe_name}"
                        
                    zip_filename = f"{prefix}_{safe_name}_{current_date}.zip"
                    
                    download_dir = Path(OUTPUT_DIR)
                    if not download_dir.exists():
                        try:
                            download_dir.mkdir(parents=True, exist_ok=True)
                        except Exception as e:
                            print(f"Error creating directory: {e}")
                            return None
                    
                    save_path = download_dir / zip_filename
                    
                    with open(save_path, "wb") as f:
                        f.write(zip_response.content)
                    
                    extract_dir = download_dir / safe_name
                    if not extract_dir.exists():
                        extract_dir.mkdir(parents=True)
                    
                    import zipfile
                    try:
                        with zipfile.ZipFile(save_path, 'r') as zip_ref:
                            zip_ref.extractall(extract_dir)
                        os.remove(save_path)
                        
                        return {
                            "file_name": file_name,
                            "extract_dir": str(extract_dir)
                        }
                    except Exception as e:
                        pass
                else:
                    retry_count += 1
                    if retry_count < max_retries:
                        await asyncio.sleep(2)
                    continue
            except Exception as e:
                retry_count += 1
                if retry_count < max_retries:
                    await asyncio.sleep(2)
                continue
        
        return None
    
    def parse_url_string(url_string):
        """
        Parse URL string separated by spaces, commas, or newlines
        
        Args:
            url_string: URL string
            
        Returns:
            list: List of URLs
        """
        if isinstance(url_string, str):
            if (url_string.startswith('"') and url_string.endswith('"')) or \
               (url_string.startswith("'") and url_string.endswith("'")):
                url_string = url_string[1:-1]
        
        urls = []
        for part in url_string.split():
            if ',' in part:
                urls.extend(part.split(','))
            elif '\n' in part:
                urls.extend(part.split('\n'))
            else:
                urls.append(part)
        
        cleaned_urls = []
        for url in urls:
            if (url.startswith('"') and url.endswith('"')) or \
               (url.startswith("'") and url.endswith("'")):
                cleaned_urls.append(url[1:-1])
            else:
                cleaned_urls.append(url)
        
        return cleaned_urls
    
    def parse_path_string(path_string):
        """
        Parse file path string separated by spaces, commas, or newlines
        
        Args:
            path_string: File path string
            
        Returns:
            list: List of file paths
        """
        if isinstance(path_string, str):
            if (path_string.startswith('"') and path_string.endswith('"')) or \
               (path_string.startswith("'") and path_string.endswith("'")):
                path_string = path_string[1:-1]
        
        paths = []
        for part in path_string.split():
            if ',' in part:
                paths.extend(part.split(','))
            elif '\n' in part:
                paths.extend(part.split('\n'))
            else:
                paths.append(part)
        
        cleaned_paths = []
        for path in paths:
            if (path.startswith('"') and path.endswith('"')) or \
               (path.startswith("'") and path.endswith("'")):
                cleaned_paths.append(path[1:-1])
            else:
                cleaned_paths.append(path)
        
        return cleaned_paths
    
    # Create MCP server
    mcp = FastMCP("PDF to Markdown Conversion Service")
    
    @mcp.tool()  
    async def convert_pdf_url(url: str, enable_ocr: bool = True) -> Dict[str, Any]:
        """
        Convert PDF URL to Markdown, supports single URL or URL list
        
        Args:
            url: PDF file URL or URL list, can be separated by spaces, commas, or newlines
            enable_ocr: Whether to enable OCR (default: True)
    
        Returns:
            dict: Conversion result information
        """
        if not MINERU_API_KEY:
            return {"success": False, "error": "Missing API key, please set environment variable MINERU_API_KEY"}
        
        if isinstance(url, str):
            urls = parse_url_string(url)
        else:
            urls = [url]  
        
        async with httpx.AsyncClient(timeout=300.0) as client:
            try:
                files = []
                for i, url_item in enumerate(urls):
                    files.append({
                        "url": url_item, 
                        "is_ocr": enable_ocr, 
                        "data_id": f"url_convert_{i+1}_{int(time.time())}"
                    })
                
                batch_data = {
                    "enable_formula": True,
                    "language": "auto",
                    "layout_model": "doclayout_yolo",
                    "enable_table": True,
                    "files": files
                }
                
                response = await client.post(
                    MINERU_BATCH_API,
                    headers=HEADERS,
                    json=batch_data,
                    timeout=300.0
                )
                
                if response.status_code != 200:
                    return {"success": False, "error": f"Request failed: {response.status_code}"}
                
                try:
                    status_data = response.json()
                    
                    if status_data.get("code") != 0 and status_data.get("code") != 200:
                        error_msg = status_data.get("msg", "Unknown error")
                        return {"success": False, "error": f"API returned error: {error_msg}"}
                        
                    batch_id = status_data.get("data", {}).get("batch_id", "")
                    if not batch_id:
                        return {"success": False, "error": "Failed to get batch ID"}
                    
                    task_status = await check_task_status(client, batch_id)
                    
                    if not task_status.get("success"):
                        return task_status
                    
                    downloaded_files = await download_batch_results(client, task_status.get("extract_results", []))
                    
                    return {
                        "success": True, 
                        "downloaded_files": downloaded_files,
                        "batch_id": batch_id,
                        "total_urls": len(urls),
                        "processed_urls": len(downloaded_files)
                    }
                    
                except json.JSONDecodeError as e:
                    return {"success": False, "error": f"Failed to parse JSON: {e}"}
                    
            except Exception as e:
                return {"success": False, "error": str(e)}
    
    @mcp.tool()  
    async def convert_pdf_file(file_path: str, enable_ocr: bool = True) -> Dict[str, Any]:
        """
        Convert local PDF file to Markdown, supports single file or file list
        
        Args:
            file_path: PDF file local path or path list, can be separated by spaces, commas, or newlines
            enable_ocr: Whether to enable OCR (default: True)
    
        Returns:
            dict: Conversion result information
        """
        if not MINERU_API_KEY:
            return {"success": False, "error": "Missing API key, please set environment variable MINERU_API_KEY"}
        
        if isinstance(file_path, str):
            file_paths = parse_path_string(file_path)
        else:
            file_paths = [file_path]  
        
        for path in file_paths:
            if not os.path.exists(path):
                return {"success": False, "error": f"File does not exist: {path}"}
            else:
                if not path.lower().endswith('.pdf'):
                    return {"success": False, "error": f"File is not in PDF format: {path}"}
        
        async with httpx.AsyncClient(timeout=300.0) as client:
            try:
                file_names = [os.path.basename(path) for path in file_paths]
                
                files_data = []
                for i, name in enumerate(file_names):
                    files_data.append({
                        "name": name,
                        "is_ocr": enable_ocr,
                        "data_id": f"file_convert_{i+1}_{int(time.time())}"
                    })
                
                file_url_data = {
                    "enable_formula": True,
                    "language": "auto",
                    "layout_model": "doclayout_yolo",
                    "enable_table": True,
                    "files": files_data
                }
                
                file_url_response = await client.post(
                    MINERU_FILE_URLS_API,
                    headers=HEADERS,
                    json=file_url_data,
                    timeout=60.0
                )
                
                if file_url_response.status_code != 200:
                    return {"success": False, "error": f"Failed to get upload link: {file_url_response.status_code}"}
                
                file_url_result = file_url_response.json()
                
                if file_url_result.get("code") != 0 and file_url_result.get("code") != 200:
                    error_msg = file_url_result.get("msg", "Unknown error")
                    return {"success": False, "error": f"Failed to get upload link: {error_msg}"}
                
                batch_id = file_url_result.get("data", {}).get("batch_id", "")
                file_urls = file_url_result.get("data", {}).get("file_urls", [])
                
                if not batch_id or not file_urls or len(file_urls) != len(file_paths):
                    return {"success": False, "error": "Failed to get upload link or batch ID"}
                
                upload_results = []
                for i, (file_path, upload_url) in enumerate(zip(file_paths, file_urls)):
                    try:
                        with open(file_path, 'rb') as f:
                            file_content = f.read()
                            
                            upload_response = await client.put(
                                upload_url,
                                content=file_content,
                                headers={},  
                                timeout=300.0
                            )
                        
                        if upload_response.status_code != 200:
                            upload_results.append({"file": file_names[i], "success": False})
                        else:
                            upload_results.append({"file": file_names[i], "success": True})
                    except Exception as e:
                        upload_results.append({"file": file_names[i], "success": False, "error": str(e)})
                
                if not any(result["success"] for result in upload_results):
                    return {"success": False, "error": "All files failed to upload", "upload_results": upload_results}
                
                task_status = await check_task_status(client, batch_id)
                
                if not task_status.get("success"):
                    return task_status
                
                downloaded_files = await download_batch_results(client, task_status.get("extract_results", []))
                
                return {
                    "success": True, 
                    "downloaded_files": downloaded_files,
                    "batch_id": batch_id,
                    "upload_results": upload_results,
                    "total_files": len(file_paths),
                    "processed_files": len(downloaded_files)
                }
                    
            except Exception as e:
                return {"success": False, "error": str(e)}
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries full burden for behavioral disclosure. It mentions the OCR capability and return format (dict with conversion result information), but lacks critical details: whether this is a read-only operation, what happens with invalid files, if there are size/time limitations, what specific information the result dict contains, or error handling behavior.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is reasonably concise with clear sections (Args, Returns) and front-loaded purpose statement. However, the 'Args' and 'Returns' labels add some redundancy since this information is partially available in the schema, and some sentences could be more efficiently worded.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

For a file conversion tool with 2 parameters, no annotations, and no output schema, the description is insufficient. It lacks information about file format requirements, conversion quality, error conditions, output structure details, performance characteristics, or how the tool differs from its sibling. The return value description ('dict: Conversion result information') is particularly vague given no output schema exists.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The description provides basic parameter information in the Args section, explaining that file_path accepts local paths or lists with various separators, and enable_ocr defaults to True. However, with 0% schema description coverage, it doesn't fully compensate by explaining path format requirements, file accessibility constraints, or what OCR actually does in this context beyond the boolean toggle.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose: converting PDF files to Markdown format, with support for single files or lists. It specifies the resource (PDF files) and action (convert to Markdown), though it doesn't explicitly differentiate from the sibling tool 'convert_pdf_url' which likely handles URL-based PDFs rather than local files.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives. While it mentions support for single files or lists, it doesn't explain when to choose this over 'convert_pdf_url' or other potential conversion tools. There's no mention of prerequisites, limitations, or typical use cases.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/FutureUnreal/mcp-pdf2md'

If you have feedback or need assistance with the MCP directory API, please join our Discord server