convert_pdf_file
Convert PDF files to Markdown format for easier editing and content reuse, with optional OCR support for scanned documents.
Instructions
Convert local PDF file to Markdown, supports single file or file list
Args:
file_path: PDF file local path or path list, can be separated by spaces, commas, or newlines
enable_ocr: Whether to enable OCR (default: True)
Returns:
dict: Conversion result information
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| file_path | Yes | ||
| enable_ocr | No |
Implementation Reference
- src/pdf2md/server.py:394-504 (handler)The core handler function for the 'convert_pdf_file' tool. It handles uploading local PDF files to the MinerU API using presigned URLs, polling for processing completion, and downloading the resulting Markdown ZIP files which are automatically extracted.@mcp.tool() async def convert_pdf_file(file_path: str, enable_ocr: bool = True) -> Dict[str, Any]: """ Convert local PDF file to Markdown, supports single file or file list Args: file_path: PDF file local path or path list, can be separated by spaces, commas, or newlines enable_ocr: Whether to enable OCR (default: True) Returns: dict: Conversion result information """ if not MINERU_API_KEY: return {"success": False, "error": "Missing API key, please set environment variable MINERU_API_KEY"} if isinstance(file_path, str): file_paths = parse_path_string(file_path) else: file_paths = [file_path] for path in file_paths: if not os.path.exists(path): return {"success": False, "error": f"File does not exist: {path}"} else: if not path.lower().endswith('.pdf'): return {"success": False, "error": f"File is not in PDF format: {path}"} async with httpx.AsyncClient(timeout=300.0) as client: try: file_names = [os.path.basename(path) for path in file_paths] files_data = [] for i, name in enumerate(file_names): files_data.append({ "name": name, "is_ocr": enable_ocr, "data_id": f"file_convert_{i+1}_{int(time.time())}" }) file_url_data = { "enable_formula": True, "language": "auto", "layout_model": "doclayout_yolo", "enable_table": True, "files": files_data } file_url_response = await client.post( MINERU_FILE_URLS_API, headers=HEADERS, json=file_url_data, timeout=60.0 ) if file_url_response.status_code != 200: return {"success": False, "error": f"Failed to get upload link: {file_url_response.status_code}"} file_url_result = file_url_response.json() if file_url_result.get("code") != 0 and file_url_result.get("code") != 200: error_msg = file_url_result.get("msg", "Unknown error") return {"success": False, "error": f"Failed to get upload link: {error_msg}"} batch_id = file_url_result.get("data", {}).get("batch_id", "") file_urls = file_url_result.get("data", {}).get("file_urls", []) if not batch_id or not file_urls or len(file_urls) != len(file_paths): return {"success": False, "error": "Failed to get upload link or batch ID"} upload_results = [] for i, (file_path, upload_url) in enumerate(zip(file_paths, file_urls)): try: with open(file_path, 'rb') as f: file_content = f.read() upload_response = await client.put( upload_url, content=file_content, headers={}, timeout=300.0 ) if upload_response.status_code != 200: upload_results.append({"file": file_names[i], "success": False}) else: upload_results.append({"file": file_names[i], "success": True}) except Exception as e: upload_results.append({"file": file_names[i], "success": False, "error": str(e)}) if not any(result["success"] for result in upload_results): return {"success": False, "error": "All files failed to upload", "upload_results": upload_results} task_status = await check_task_status(client, batch_id) if not task_status.get("success"): return task_status downloaded_files = await download_batch_results(client, task_status.get("extract_results", [])) return { "success": True, "downloaded_files": downloaded_files, "batch_id": batch_id, "upload_results": upload_results, "total_files": len(file_paths), "processed_files": len(downloaded_files) } except Exception as e: return {"success": False, "error": str(e)}
- src/pdf2md/server.py:394-394 (registration)The @mcp.tool() decorator registers the convert_pdf_file function as an MCP tool.@mcp.tool()
- src/pdf2md/server.py:277-309 (helper)Helper function to parse input file_path string into a list of paths, handling quotes, spaces, commas, newlines.def parse_path_string(path_string): """ Parse file path string separated by spaces, commas, or newlines Args: path_string: File path string Returns: list: List of file paths """ if isinstance(path_string, str): if (path_string.startswith('"') and path_string.endswith('"')) or \ (path_string.startswith("'") and path_string.endswith("'")): path_string = path_string[1:-1] paths = [] for part in path_string.split(): if ',' in part: paths.extend(part.split(',')) elif '\n' in part: paths.extend(part.split('\n')) else: paths.append(part) cleaned_paths = [] for path in paths: if (path.startswith('"') and path.endswith('"')) or \ (path.startswith("'") and path.endswith("'")): cleaned_paths.append(path[1:-1]) else: cleaned_paths.append(path) return cleaned_paths
- src/pdf2md/server.py:73-504 (helper)Supporting helper functions for task status polling, batch result downloading, ZIP extraction, and status printing, shared with convert_pdf_url but used by convert_pdf_file.async def check_task_status(client, batch_id, max_retries=60, sleep_seconds=5): """ Check batch task status Args: client: HTTP client batch_id: Batch ID max_retries: Maximum number of retries sleep_seconds: Seconds between retries Returns: dict: Dictionary containing task status information, or error message if failed """ retry_count = 0 while retry_count < max_retries: retry_count += 1 try: status_response = await client.get( f"{MINERU_BATCH_RESULTS_API}/{batch_id}", headers=HEADERS, timeout=60.0 ) if status_response.status_code != 200: retry_count += 1 if retry_count < max_retries: await asyncio.sleep(sleep_seconds) continue try: status_data = status_response.json() except Exception as e: retry_count += 1 if retry_count < max_retries: await asyncio.sleep(sleep_seconds) continue task_data = status_data.get("data", {}) extract_results = task_data.get("extract_result", []) all_done, any_done = print_task_status(extract_results) if all_done: return { "success": True, "extract_results": extract_results, "task_data": task_data, "status_data": status_data } await asyncio.sleep(sleep_seconds) except Exception as e: retry_count += 1 if retry_count < max_retries: await asyncio.sleep(sleep_seconds) return { "success": False, "error": "Polling timeout, unable to get final results" } async def download_batch_results(client, extract_results): """ Download batch task results Args: client: HTTP client extract_results: List of task results Returns: list: List of downloaded file information """ downloaded_files = [] for i, result in enumerate(extract_results): if result.get("state") == "done": try: file_name = result.get("file_name", f"file_{i+1}") zip_url = result.get("full_zip_url", "") if not zip_url: continue downloaded_file = await download_zip_file(client, zip_url, file_name) if downloaded_file: downloaded_files.append(downloaded_file) except Exception as e: pass return downloaded_files async def download_zip_file(client, zip_url, file_name, prefix="md", max_retries=3): """ Download and save ZIP file, then automatically unzip Args: client: HTTP client zip_url: ZIP file URL file_name: File name prefix: File prefix max_retries: Maximum number of retries Returns: dict: Dictionary containing file name and unzip directory, or None if failed """ retry_count = 0 while retry_count < max_retries: try: zip_response = await client.get(zip_url, follow_redirects=True, timeout=120.0) if zip_response.status_code == 200: current_date = time.strftime("%Y%m%d") base_name = os.path.splitext(file_name)[0] # Only remove control characters and chars that are invalid in filenames safe_name = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '', base_name).strip() # Replace spaces with underscores safe_name = re.sub(r'\s+', '_', safe_name) if safe_name.isdigit() or re.match(r'^\d+\.\d+$', safe_name): safe_name = f"paper_{safe_name}" zip_filename = f"{prefix}_{safe_name}_{current_date}.zip" download_dir = Path(OUTPUT_DIR) if not download_dir.exists(): try: download_dir.mkdir(parents=True, exist_ok=True) except Exception as e: print(f"Error creating directory: {e}") return None save_path = download_dir / zip_filename with open(save_path, "wb") as f: f.write(zip_response.content) extract_dir = download_dir / safe_name if not extract_dir.exists(): extract_dir.mkdir(parents=True) import zipfile try: with zipfile.ZipFile(save_path, 'r') as zip_ref: zip_ref.extractall(extract_dir) os.remove(save_path) return { "file_name": file_name, "extract_dir": str(extract_dir) } except Exception as e: pass else: retry_count += 1 if retry_count < max_retries: await asyncio.sleep(2) continue except Exception as e: retry_count += 1 if retry_count < max_retries: await asyncio.sleep(2) continue return None def parse_url_string(url_string): """ Parse URL string separated by spaces, commas, or newlines Args: url_string: URL string Returns: list: List of URLs """ if isinstance(url_string, str): if (url_string.startswith('"') and url_string.endswith('"')) or \ (url_string.startswith("'") and url_string.endswith("'")): url_string = url_string[1:-1] urls = [] for part in url_string.split(): if ',' in part: urls.extend(part.split(',')) elif '\n' in part: urls.extend(part.split('\n')) else: urls.append(part) cleaned_urls = [] for url in urls: if (url.startswith('"') and url.endswith('"')) or \ (url.startswith("'") and url.endswith("'")): cleaned_urls.append(url[1:-1]) else: cleaned_urls.append(url) return cleaned_urls def parse_path_string(path_string): """ Parse file path string separated by spaces, commas, or newlines Args: path_string: File path string Returns: list: List of file paths """ if isinstance(path_string, str): if (path_string.startswith('"') and path_string.endswith('"')) or \ (path_string.startswith("'") and path_string.endswith("'")): path_string = path_string[1:-1] paths = [] for part in path_string.split(): if ',' in part: paths.extend(part.split(',')) elif '\n' in part: paths.extend(part.split('\n')) else: paths.append(part) cleaned_paths = [] for path in paths: if (path.startswith('"') and path.endswith('"')) or \ (path.startswith("'") and path.endswith("'")): cleaned_paths.append(path[1:-1]) else: cleaned_paths.append(path) return cleaned_paths # Create MCP server mcp = FastMCP("PDF to Markdown Conversion Service") @mcp.tool() async def convert_pdf_url(url: str, enable_ocr: bool = True) -> Dict[str, Any]: """ Convert PDF URL to Markdown, supports single URL or URL list Args: url: PDF file URL or URL list, can be separated by spaces, commas, or newlines enable_ocr: Whether to enable OCR (default: True) Returns: dict: Conversion result information """ if not MINERU_API_KEY: return {"success": False, "error": "Missing API key, please set environment variable MINERU_API_KEY"} if isinstance(url, str): urls = parse_url_string(url) else: urls = [url] async with httpx.AsyncClient(timeout=300.0) as client: try: files = [] for i, url_item in enumerate(urls): files.append({ "url": url_item, "is_ocr": enable_ocr, "data_id": f"url_convert_{i+1}_{int(time.time())}" }) batch_data = { "enable_formula": True, "language": "auto", "layout_model": "doclayout_yolo", "enable_table": True, "files": files } response = await client.post( MINERU_BATCH_API, headers=HEADERS, json=batch_data, timeout=300.0 ) if response.status_code != 200: return {"success": False, "error": f"Request failed: {response.status_code}"} try: status_data = response.json() if status_data.get("code") != 0 and status_data.get("code") != 200: error_msg = status_data.get("msg", "Unknown error") return {"success": False, "error": f"API returned error: {error_msg}"} batch_id = status_data.get("data", {}).get("batch_id", "") if not batch_id: return {"success": False, "error": "Failed to get batch ID"} task_status = await check_task_status(client, batch_id) if not task_status.get("success"): return task_status downloaded_files = await download_batch_results(client, task_status.get("extract_results", [])) return { "success": True, "downloaded_files": downloaded_files, "batch_id": batch_id, "total_urls": len(urls), "processed_urls": len(downloaded_files) } except json.JSONDecodeError as e: return {"success": False, "error": f"Failed to parse JSON: {e}"} except Exception as e: return {"success": False, "error": str(e)} @mcp.tool() async def convert_pdf_file(file_path: str, enable_ocr: bool = True) -> Dict[str, Any]: """ Convert local PDF file to Markdown, supports single file or file list Args: file_path: PDF file local path or path list, can be separated by spaces, commas, or newlines enable_ocr: Whether to enable OCR (default: True) Returns: dict: Conversion result information """ if not MINERU_API_KEY: return {"success": False, "error": "Missing API key, please set environment variable MINERU_API_KEY"} if isinstance(file_path, str): file_paths = parse_path_string(file_path) else: file_paths = [file_path] for path in file_paths: if not os.path.exists(path): return {"success": False, "error": f"File does not exist: {path}"} else: if not path.lower().endswith('.pdf'): return {"success": False, "error": f"File is not in PDF format: {path}"} async with httpx.AsyncClient(timeout=300.0) as client: try: file_names = [os.path.basename(path) for path in file_paths] files_data = [] for i, name in enumerate(file_names): files_data.append({ "name": name, "is_ocr": enable_ocr, "data_id": f"file_convert_{i+1}_{int(time.time())}" }) file_url_data = { "enable_formula": True, "language": "auto", "layout_model": "doclayout_yolo", "enable_table": True, "files": files_data } file_url_response = await client.post( MINERU_FILE_URLS_API, headers=HEADERS, json=file_url_data, timeout=60.0 ) if file_url_response.status_code != 200: return {"success": False, "error": f"Failed to get upload link: {file_url_response.status_code}"} file_url_result = file_url_response.json() if file_url_result.get("code") != 0 and file_url_result.get("code") != 200: error_msg = file_url_result.get("msg", "Unknown error") return {"success": False, "error": f"Failed to get upload link: {error_msg}"} batch_id = file_url_result.get("data", {}).get("batch_id", "") file_urls = file_url_result.get("data", {}).get("file_urls", []) if not batch_id or not file_urls or len(file_urls) != len(file_paths): return {"success": False, "error": "Failed to get upload link or batch ID"} upload_results = [] for i, (file_path, upload_url) in enumerate(zip(file_paths, file_urls)): try: with open(file_path, 'rb') as f: file_content = f.read() upload_response = await client.put( upload_url, content=file_content, headers={}, timeout=300.0 ) if upload_response.status_code != 200: upload_results.append({"file": file_names[i], "success": False}) else: upload_results.append({"file": file_names[i], "success": True}) except Exception as e: upload_results.append({"file": file_names[i], "success": False, "error": str(e)}) if not any(result["success"] for result in upload_results): return {"success": False, "error": "All files failed to upload", "upload_results": upload_results} task_status = await check_task_status(client, batch_id) if not task_status.get("success"): return task_status downloaded_files = await download_batch_results(client, task_status.get("extract_results", [])) return { "success": True, "downloaded_files": downloaded_files, "batch_id": batch_id, "upload_results": upload_results, "total_files": len(file_paths), "processed_files": len(downloaded_files) } except Exception as e: return {"success": False, "error": str(e)}