Skip to main content
Glama

convert_pdf_file

Convert PDF files to Markdown format for easier editing and content reuse, with optional OCR support for scanned documents.

Instructions

Convert local PDF file to Markdown, supports single file or file list

Args:
    file_path: PDF file local path or path list, can be separated by spaces, commas, or newlines
    enable_ocr: Whether to enable OCR (default: True)

Returns:
    dict: Conversion result information

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
file_pathYes
enable_ocrNo

Implementation Reference

  • The core handler function for the 'convert_pdf_file' tool. It handles uploading local PDF files to the MinerU API using presigned URLs, polling for processing completion, and downloading the resulting Markdown ZIP files which are automatically extracted.
    @mcp.tool()  
    async def convert_pdf_file(file_path: str, enable_ocr: bool = True) -> Dict[str, Any]:
        """
        Convert local PDF file to Markdown, supports single file or file list
        
        Args:
            file_path: PDF file local path or path list, can be separated by spaces, commas, or newlines
            enable_ocr: Whether to enable OCR (default: True)
    
        Returns:
            dict: Conversion result information
        """
        if not MINERU_API_KEY:
            return {"success": False, "error": "Missing API key, please set environment variable MINERU_API_KEY"}
        
        if isinstance(file_path, str):
            file_paths = parse_path_string(file_path)
        else:
            file_paths = [file_path]  
        
        for path in file_paths:
            if not os.path.exists(path):
                return {"success": False, "error": f"File does not exist: {path}"}
            else:
                if not path.lower().endswith('.pdf'):
                    return {"success": False, "error": f"File is not in PDF format: {path}"}
        
        async with httpx.AsyncClient(timeout=300.0) as client:
            try:
                file_names = [os.path.basename(path) for path in file_paths]
                
                files_data = []
                for i, name in enumerate(file_names):
                    files_data.append({
                        "name": name,
                        "is_ocr": enable_ocr,
                        "data_id": f"file_convert_{i+1}_{int(time.time())}"
                    })
                
                file_url_data = {
                    "enable_formula": True,
                    "language": "auto",
                    "layout_model": "doclayout_yolo",
                    "enable_table": True,
                    "files": files_data
                }
                
                file_url_response = await client.post(
                    MINERU_FILE_URLS_API,
                    headers=HEADERS,
                    json=file_url_data,
                    timeout=60.0
                )
                
                if file_url_response.status_code != 200:
                    return {"success": False, "error": f"Failed to get upload link: {file_url_response.status_code}"}
                
                file_url_result = file_url_response.json()
                
                if file_url_result.get("code") != 0 and file_url_result.get("code") != 200:
                    error_msg = file_url_result.get("msg", "Unknown error")
                    return {"success": False, "error": f"Failed to get upload link: {error_msg}"}
                
                batch_id = file_url_result.get("data", {}).get("batch_id", "")
                file_urls = file_url_result.get("data", {}).get("file_urls", [])
                
                if not batch_id or not file_urls or len(file_urls) != len(file_paths):
                    return {"success": False, "error": "Failed to get upload link or batch ID"}
                
                upload_results = []
                for i, (file_path, upload_url) in enumerate(zip(file_paths, file_urls)):
                    try:
                        with open(file_path, 'rb') as f:
                            file_content = f.read()
                            
                            upload_response = await client.put(
                                upload_url,
                                content=file_content,
                                headers={},  
                                timeout=300.0
                            )
                        
                        if upload_response.status_code != 200:
                            upload_results.append({"file": file_names[i], "success": False})
                        else:
                            upload_results.append({"file": file_names[i], "success": True})
                    except Exception as e:
                        upload_results.append({"file": file_names[i], "success": False, "error": str(e)})
                
                if not any(result["success"] for result in upload_results):
                    return {"success": False, "error": "All files failed to upload", "upload_results": upload_results}
                
                task_status = await check_task_status(client, batch_id)
                
                if not task_status.get("success"):
                    return task_status
                
                downloaded_files = await download_batch_results(client, task_status.get("extract_results", []))
                
                return {
                    "success": True, 
                    "downloaded_files": downloaded_files,
                    "batch_id": batch_id,
                    "upload_results": upload_results,
                    "total_files": len(file_paths),
                    "processed_files": len(downloaded_files)
                }
                    
            except Exception as e:
                return {"success": False, "error": str(e)}
  • The @mcp.tool() decorator registers the convert_pdf_file function as an MCP tool.
    @mcp.tool()  
  • Helper function to parse input file_path string into a list of paths, handling quotes, spaces, commas, newlines.
    def parse_path_string(path_string):
        """
        Parse file path string separated by spaces, commas, or newlines
        
        Args:
            path_string: File path string
            
        Returns:
            list: List of file paths
        """
        if isinstance(path_string, str):
            if (path_string.startswith('"') and path_string.endswith('"')) or \
               (path_string.startswith("'") and path_string.endswith("'")):
                path_string = path_string[1:-1]
        
        paths = []
        for part in path_string.split():
            if ',' in part:
                paths.extend(part.split(','))
            elif '\n' in part:
                paths.extend(part.split('\n'))
            else:
                paths.append(part)
        
        cleaned_paths = []
        for path in paths:
            if (path.startswith('"') and path.endswith('"')) or \
               (path.startswith("'") and path.endswith("'")):
                cleaned_paths.append(path[1:-1])
            else:
                cleaned_paths.append(path)
        
        return cleaned_paths
  • Supporting helper functions for task status polling, batch result downloading, ZIP extraction, and status printing, shared with convert_pdf_url but used by convert_pdf_file.
    async def check_task_status(client, batch_id, max_retries=60, sleep_seconds=5):
        """
        Check batch task status
        
        Args:
            client: HTTP client
            batch_id: Batch ID
            max_retries: Maximum number of retries
            sleep_seconds: Seconds between retries
            
        Returns:
            dict: Dictionary containing task status information, or error message if failed
        """
        retry_count = 0
        
        while retry_count < max_retries:
            retry_count += 1
            
            try:
                status_response = await client.get(
                    f"{MINERU_BATCH_RESULTS_API}/{batch_id}",
                    headers=HEADERS,
                    timeout=60.0  
                )
                
                if status_response.status_code != 200:
                    retry_count += 1
                    if retry_count < max_retries:
                        await asyncio.sleep(sleep_seconds)
                    continue
                
                try:
                    status_data = status_response.json()
                except Exception as e:
                    retry_count += 1
                    if retry_count < max_retries:
                        await asyncio.sleep(sleep_seconds)
                    continue
                
                task_data = status_data.get("data", {})
                extract_results = task_data.get("extract_result", [])
                
                all_done, any_done = print_task_status(extract_results)
                
                if all_done:
                    return {
                        "success": True,
                        "extract_results": extract_results,
                        "task_data": task_data,
                        "status_data": status_data
                    }
                
                await asyncio.sleep(sleep_seconds)
                
            except Exception as e:
                retry_count += 1
                if retry_count < max_retries:
                    await asyncio.sleep(sleep_seconds)
        
        return {
            "success": False,
            "error": "Polling timeout, unable to get final results"
        }
    
    async def download_batch_results(client, extract_results):
        """
        Download batch task results
        
        Args:
            client: HTTP client
            extract_results: List of task results
            
        Returns:
            list: List of downloaded file information
        """
        downloaded_files = []
        
        for i, result in enumerate(extract_results):
            if result.get("state") == "done":
                try:
                    file_name = result.get("file_name", f"file_{i+1}")
                    zip_url = result.get("full_zip_url", "")
                    
                    if not zip_url:
                        continue
                    
                    downloaded_file = await download_zip_file(client, zip_url, file_name)
                    if downloaded_file:
                        downloaded_files.append(downloaded_file)
                except Exception as e:
                    pass
        
        return downloaded_files
    
    async def download_zip_file(client, zip_url, file_name, prefix="md", max_retries=3):
        """
        Download and save ZIP file, then automatically unzip
        
        Args:
            client: HTTP client
            zip_url: ZIP file URL
            file_name: File name
            prefix: File prefix
            max_retries: Maximum number of retries
            
        Returns:
            dict: Dictionary containing file name and unzip directory, or None if failed
        """
        retry_count = 0
        
        while retry_count < max_retries:
            try:
                zip_response = await client.get(zip_url, follow_redirects=True, timeout=120.0)
                
                if zip_response.status_code == 200:
                    current_date = time.strftime("%Y%m%d")
                    
                    base_name = os.path.splitext(file_name)[0]
                    # Only remove control characters and chars that are invalid in filenames
                    safe_name = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '', base_name).strip()
                    # Replace spaces with underscores
                    safe_name = re.sub(r'\s+', '_', safe_name)
                    
                    if safe_name.isdigit() or re.match(r'^\d+\.\d+$', safe_name):
                        safe_name = f"paper_{safe_name}"
                        
                    zip_filename = f"{prefix}_{safe_name}_{current_date}.zip"
                    
                    download_dir = Path(OUTPUT_DIR)
                    if not download_dir.exists():
                        try:
                            download_dir.mkdir(parents=True, exist_ok=True)
                        except Exception as e:
                            print(f"Error creating directory: {e}")
                            return None
                    
                    save_path = download_dir / zip_filename
                    
                    with open(save_path, "wb") as f:
                        f.write(zip_response.content)
                    
                    extract_dir = download_dir / safe_name
                    if not extract_dir.exists():
                        extract_dir.mkdir(parents=True)
                    
                    import zipfile
                    try:
                        with zipfile.ZipFile(save_path, 'r') as zip_ref:
                            zip_ref.extractall(extract_dir)
                        os.remove(save_path)
                        
                        return {
                            "file_name": file_name,
                            "extract_dir": str(extract_dir)
                        }
                    except Exception as e:
                        pass
                else:
                    retry_count += 1
                    if retry_count < max_retries:
                        await asyncio.sleep(2)
                    continue
            except Exception as e:
                retry_count += 1
                if retry_count < max_retries:
                    await asyncio.sleep(2)
                continue
        
        return None
    
    def parse_url_string(url_string):
        """
        Parse URL string separated by spaces, commas, or newlines
        
        Args:
            url_string: URL string
            
        Returns:
            list: List of URLs
        """
        if isinstance(url_string, str):
            if (url_string.startswith('"') and url_string.endswith('"')) or \
               (url_string.startswith("'") and url_string.endswith("'")):
                url_string = url_string[1:-1]
        
        urls = []
        for part in url_string.split():
            if ',' in part:
                urls.extend(part.split(','))
            elif '\n' in part:
                urls.extend(part.split('\n'))
            else:
                urls.append(part)
        
        cleaned_urls = []
        for url in urls:
            if (url.startswith('"') and url.endswith('"')) or \
               (url.startswith("'") and url.endswith("'")):
                cleaned_urls.append(url[1:-1])
            else:
                cleaned_urls.append(url)
        
        return cleaned_urls
    
    def parse_path_string(path_string):
        """
        Parse file path string separated by spaces, commas, or newlines
        
        Args:
            path_string: File path string
            
        Returns:
            list: List of file paths
        """
        if isinstance(path_string, str):
            if (path_string.startswith('"') and path_string.endswith('"')) or \
               (path_string.startswith("'") and path_string.endswith("'")):
                path_string = path_string[1:-1]
        
        paths = []
        for part in path_string.split():
            if ',' in part:
                paths.extend(part.split(','))
            elif '\n' in part:
                paths.extend(part.split('\n'))
            else:
                paths.append(part)
        
        cleaned_paths = []
        for path in paths:
            if (path.startswith('"') and path.endswith('"')) or \
               (path.startswith("'") and path.endswith("'")):
                cleaned_paths.append(path[1:-1])
            else:
                cleaned_paths.append(path)
        
        return cleaned_paths
    
    # Create MCP server
    mcp = FastMCP("PDF to Markdown Conversion Service")
    
    @mcp.tool()  
    async def convert_pdf_url(url: str, enable_ocr: bool = True) -> Dict[str, Any]:
        """
        Convert PDF URL to Markdown, supports single URL or URL list
        
        Args:
            url: PDF file URL or URL list, can be separated by spaces, commas, or newlines
            enable_ocr: Whether to enable OCR (default: True)
    
        Returns:
            dict: Conversion result information
        """
        if not MINERU_API_KEY:
            return {"success": False, "error": "Missing API key, please set environment variable MINERU_API_KEY"}
        
        if isinstance(url, str):
            urls = parse_url_string(url)
        else:
            urls = [url]  
        
        async with httpx.AsyncClient(timeout=300.0) as client:
            try:
                files = []
                for i, url_item in enumerate(urls):
                    files.append({
                        "url": url_item, 
                        "is_ocr": enable_ocr, 
                        "data_id": f"url_convert_{i+1}_{int(time.time())}"
                    })
                
                batch_data = {
                    "enable_formula": True,
                    "language": "auto",
                    "layout_model": "doclayout_yolo",
                    "enable_table": True,
                    "files": files
                }
                
                response = await client.post(
                    MINERU_BATCH_API,
                    headers=HEADERS,
                    json=batch_data,
                    timeout=300.0
                )
                
                if response.status_code != 200:
                    return {"success": False, "error": f"Request failed: {response.status_code}"}
                
                try:
                    status_data = response.json()
                    
                    if status_data.get("code") != 0 and status_data.get("code") != 200:
                        error_msg = status_data.get("msg", "Unknown error")
                        return {"success": False, "error": f"API returned error: {error_msg}"}
                        
                    batch_id = status_data.get("data", {}).get("batch_id", "")
                    if not batch_id:
                        return {"success": False, "error": "Failed to get batch ID"}
                    
                    task_status = await check_task_status(client, batch_id)
                    
                    if not task_status.get("success"):
                        return task_status
                    
                    downloaded_files = await download_batch_results(client, task_status.get("extract_results", []))
                    
                    return {
                        "success": True, 
                        "downloaded_files": downloaded_files,
                        "batch_id": batch_id,
                        "total_urls": len(urls),
                        "processed_urls": len(downloaded_files)
                    }
                    
                except json.JSONDecodeError as e:
                    return {"success": False, "error": f"Failed to parse JSON: {e}"}
                    
            except Exception as e:
                return {"success": False, "error": str(e)}
    
    @mcp.tool()  
    async def convert_pdf_file(file_path: str, enable_ocr: bool = True) -> Dict[str, Any]:
        """
        Convert local PDF file to Markdown, supports single file or file list
        
        Args:
            file_path: PDF file local path or path list, can be separated by spaces, commas, or newlines
            enable_ocr: Whether to enable OCR (default: True)
    
        Returns:
            dict: Conversion result information
        """
        if not MINERU_API_KEY:
            return {"success": False, "error": "Missing API key, please set environment variable MINERU_API_KEY"}
        
        if isinstance(file_path, str):
            file_paths = parse_path_string(file_path)
        else:
            file_paths = [file_path]  
        
        for path in file_paths:
            if not os.path.exists(path):
                return {"success": False, "error": f"File does not exist: {path}"}
            else:
                if not path.lower().endswith('.pdf'):
                    return {"success": False, "error": f"File is not in PDF format: {path}"}
        
        async with httpx.AsyncClient(timeout=300.0) as client:
            try:
                file_names = [os.path.basename(path) for path in file_paths]
                
                files_data = []
                for i, name in enumerate(file_names):
                    files_data.append({
                        "name": name,
                        "is_ocr": enable_ocr,
                        "data_id": f"file_convert_{i+1}_{int(time.time())}"
                    })
                
                file_url_data = {
                    "enable_formula": True,
                    "language": "auto",
                    "layout_model": "doclayout_yolo",
                    "enable_table": True,
                    "files": files_data
                }
                
                file_url_response = await client.post(
                    MINERU_FILE_URLS_API,
                    headers=HEADERS,
                    json=file_url_data,
                    timeout=60.0
                )
                
                if file_url_response.status_code != 200:
                    return {"success": False, "error": f"Failed to get upload link: {file_url_response.status_code}"}
                
                file_url_result = file_url_response.json()
                
                if file_url_result.get("code") != 0 and file_url_result.get("code") != 200:
                    error_msg = file_url_result.get("msg", "Unknown error")
                    return {"success": False, "error": f"Failed to get upload link: {error_msg}"}
                
                batch_id = file_url_result.get("data", {}).get("batch_id", "")
                file_urls = file_url_result.get("data", {}).get("file_urls", [])
                
                if not batch_id or not file_urls or len(file_urls) != len(file_paths):
                    return {"success": False, "error": "Failed to get upload link or batch ID"}
                
                upload_results = []
                for i, (file_path, upload_url) in enumerate(zip(file_paths, file_urls)):
                    try:
                        with open(file_path, 'rb') as f:
                            file_content = f.read()
                            
                            upload_response = await client.put(
                                upload_url,
                                content=file_content,
                                headers={},  
                                timeout=300.0
                            )
                        
                        if upload_response.status_code != 200:
                            upload_results.append({"file": file_names[i], "success": False})
                        else:
                            upload_results.append({"file": file_names[i], "success": True})
                    except Exception as e:
                        upload_results.append({"file": file_names[i], "success": False, "error": str(e)})
                
                if not any(result["success"] for result in upload_results):
                    return {"success": False, "error": "All files failed to upload", "upload_results": upload_results}
                
                task_status = await check_task_status(client, batch_id)
                
                if not task_status.get("success"):
                    return task_status
                
                downloaded_files = await download_batch_results(client, task_status.get("extract_results", []))
                
                return {
                    "success": True, 
                    "downloaded_files": downloaded_files,
                    "batch_id": batch_id,
                    "upload_results": upload_results,
                    "total_files": len(file_paths),
                    "processed_files": len(downloaded_files)
                }
                    
            except Exception as e:
                return {"success": False, "error": str(e)}
Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/FutureUnreal/mcp-pdf2md'

If you have feedback or need assistance with the MCP directory API, please join our Discord server