mcp-pdf2md
by FutureUnreal
Verified
- mcp-pdf2md
- src
- pdf2md
import os
import json
import time
import asyncio
import httpx
import re
import logging
from pathlib import Path
from dotenv import load_dotenv
from typing import Optional, List, Dict, Any
from mcp.server.fastmcp import FastMCP
# Set up logging - disable all log output
logging.basicConfig(
level=logging.CRITICAL, # Only record critical errors
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[] # Remove all handlers, no log output
)
logger = logging.getLogger("pdf2md")
logger.disabled = True # Completely disable logging
# Load environment variables
load_dotenv()
# API configuration
MINERU_API_BASE = os.environ.get("MINERU_API_BASE", "https://mineru.net/api/v4/extract/task")
MINERU_API_KEY = os.environ.get("MINERU_API_KEY", "")
MINERU_BATCH_API = os.environ.get("MINERU_BATCH_API", "https://mineru.net/api/v4/extract/task/batch")
MINERU_BATCH_RESULTS_API = os.environ.get("MINERU_BATCH_RESULTS_API", "https://mineru.net/api/v4/extract-results/batch")
MINERU_FILE_URLS_API = os.environ.get("MINERU_FILE_URLS_API", "https://mineru.net/api/v4/file-urls/batch")
# Global variables
OUTPUT_DIR = "./downloads"
# API authentication headers
HEADERS = {
"Authorization": MINERU_API_KEY if MINERU_API_KEY.startswith("Bearer ") else f"Bearer {MINERU_API_KEY}",
"Content-Type": "application/json"
}
def set_output_dir(output_dir: str):
"""Set the output directory path"""
global OUTPUT_DIR
# Normalize path to handle Unicode characters properly
OUTPUT_DIR = os.path.normpath(output_dir)
def print_task_status(extract_results):
"""
Print task status and check if all tasks are completed
Args:
extract_results: List of task results
Returns:
tuple: (all tasks completed, any task completed)
"""
all_done = True
any_done = False
for i, result in enumerate(extract_results):
current_status = result.get("state", "")
file_name = result.get("file_name", "")
status_icon = "✅" if current_status == "done" else "⏳"
if current_status == "done":
any_done = True
else:
all_done = False
return all_done, any_done
async def check_task_status(client, batch_id, max_retries=60, sleep_seconds=5):
"""
Check batch task status
Args:
client: HTTP client
batch_id: Batch ID
max_retries: Maximum number of retries
sleep_seconds: Seconds between retries
Returns:
dict: Dictionary containing task status information, or error message if failed
"""
retry_count = 0
while retry_count < max_retries:
retry_count += 1
try:
status_response = await client.get(
f"{MINERU_BATCH_RESULTS_API}/{batch_id}",
headers=HEADERS,
timeout=60.0
)
if status_response.status_code != 200:
retry_count += 1
if retry_count < max_retries:
await asyncio.sleep(sleep_seconds)
continue
try:
status_data = status_response.json()
except Exception as e:
retry_count += 1
if retry_count < max_retries:
await asyncio.sleep(sleep_seconds)
continue
task_data = status_data.get("data", {})
extract_results = task_data.get("extract_result", [])
all_done, any_done = print_task_status(extract_results)
if all_done:
return {
"success": True,
"extract_results": extract_results,
"task_data": task_data,
"status_data": status_data
}
await asyncio.sleep(sleep_seconds)
except Exception as e:
retry_count += 1
if retry_count < max_retries:
await asyncio.sleep(sleep_seconds)
return {
"success": False,
"error": "Polling timeout, unable to get final results"
}
async def download_batch_results(client, extract_results):
"""
Download batch task results
Args:
client: HTTP client
extract_results: List of task results
Returns:
list: List of downloaded file information
"""
downloaded_files = []
for i, result in enumerate(extract_results):
if result.get("state") == "done":
try:
file_name = result.get("file_name", f"file_{i+1}")
zip_url = result.get("full_zip_url", "")
if not zip_url:
continue
downloaded_file = await download_zip_file(client, zip_url, file_name)
if downloaded_file:
downloaded_files.append(downloaded_file)
except Exception as e:
pass
return downloaded_files
async def download_zip_file(client, zip_url, file_name, prefix="md", max_retries=3):
"""
Download and save ZIP file, then automatically unzip
Args:
client: HTTP client
zip_url: ZIP file URL
file_name: File name
prefix: File prefix
max_retries: Maximum number of retries
Returns:
dict: Dictionary containing file name and unzip directory, or None if failed
"""
retry_count = 0
while retry_count < max_retries:
try:
zip_response = await client.get(zip_url, follow_redirects=True, timeout=120.0)
if zip_response.status_code == 200:
current_date = time.strftime("%Y%m%d")
base_name = os.path.splitext(file_name)[0]
# Only remove control characters and chars that are invalid in filenames
safe_name = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '', base_name).strip()
# Replace spaces with underscores
safe_name = re.sub(r'\s+', '_', safe_name)
if safe_name.isdigit() or re.match(r'^\d+\.\d+$', safe_name):
safe_name = f"paper_{safe_name}"
zip_filename = f"{prefix}_{safe_name}_{current_date}.zip"
download_dir = Path(OUTPUT_DIR)
if not download_dir.exists():
try:
download_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"Error creating directory: {e}")
return None
save_path = download_dir / zip_filename
with open(save_path, "wb") as f:
f.write(zip_response.content)
extract_dir = download_dir / safe_name
if not extract_dir.exists():
extract_dir.mkdir(parents=True)
import zipfile
try:
with zipfile.ZipFile(save_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
os.remove(save_path)
return {
"file_name": file_name,
"extract_dir": str(extract_dir)
}
except Exception as e:
pass
else:
retry_count += 1
if retry_count < max_retries:
await asyncio.sleep(2)
continue
except Exception as e:
retry_count += 1
if retry_count < max_retries:
await asyncio.sleep(2)
continue
return None
def parse_url_string(url_string):
"""
Parse URL string separated by spaces, commas, or newlines
Args:
url_string: URL string
Returns:
list: List of URLs
"""
if isinstance(url_string, str):
if (url_string.startswith('"') and url_string.endswith('"')) or \
(url_string.startswith("'") and url_string.endswith("'")):
url_string = url_string[1:-1]
urls = []
for part in url_string.split():
if ',' in part:
urls.extend(part.split(','))
elif '\n' in part:
urls.extend(part.split('\n'))
else:
urls.append(part)
cleaned_urls = []
for url in urls:
if (url.startswith('"') and url.endswith('"')) or \
(url.startswith("'") and url.endswith("'")):
cleaned_urls.append(url[1:-1])
else:
cleaned_urls.append(url)
return cleaned_urls
def parse_path_string(path_string):
"""
Parse file path string separated by spaces, commas, or newlines
Args:
path_string: File path string
Returns:
list: List of file paths
"""
if isinstance(path_string, str):
if (path_string.startswith('"') and path_string.endswith('"')) or \
(path_string.startswith("'") and path_string.endswith("'")):
path_string = path_string[1:-1]
paths = []
for part in path_string.split():
if ',' in part:
paths.extend(part.split(','))
elif '\n' in part:
paths.extend(part.split('\n'))
else:
paths.append(part)
cleaned_paths = []
for path in paths:
if (path.startswith('"') and path.endswith('"')) or \
(path.startswith("'") and path.endswith("'")):
cleaned_paths.append(path[1:-1])
else:
cleaned_paths.append(path)
return cleaned_paths
# Create MCP server
mcp = FastMCP("PDF to Markdown Conversion Service")
@mcp.tool()
async def convert_pdf_url(url: str, enable_ocr: bool = True) -> Dict[str, Any]:
"""
Convert PDF URL to Markdown, supports single URL or URL list
Args:
url: PDF file URL or URL list, can be separated by spaces, commas, or newlines
enable_ocr: Whether to enable OCR (default: True)
Returns:
dict: Conversion result information
"""
if not MINERU_API_KEY:
return {"success": False, "error": "Missing API key, please set environment variable MINERU_API_KEY"}
if isinstance(url, str):
urls = parse_url_string(url)
else:
urls = [url]
async with httpx.AsyncClient(timeout=300.0) as client:
try:
files = []
for i, url_item in enumerate(urls):
files.append({
"url": url_item,
"is_ocr": enable_ocr,
"data_id": f"url_convert_{i+1}_{int(time.time())}"
})
batch_data = {
"enable_formula": True,
"language": "auto",
"layout_model": "doclayout_yolo",
"enable_table": True,
"files": files
}
response = await client.post(
MINERU_BATCH_API,
headers=HEADERS,
json=batch_data,
timeout=300.0
)
if response.status_code != 200:
return {"success": False, "error": f"Request failed: {response.status_code}"}
try:
status_data = response.json()
if status_data.get("code") != 0 and status_data.get("code") != 200:
error_msg = status_data.get("msg", "Unknown error")
return {"success": False, "error": f"API returned error: {error_msg}"}
batch_id = status_data.get("data", {}).get("batch_id", "")
if not batch_id:
return {"success": False, "error": "Failed to get batch ID"}
task_status = await check_task_status(client, batch_id)
if not task_status.get("success"):
return task_status
downloaded_files = await download_batch_results(client, task_status.get("extract_results", []))
return {
"success": True,
"downloaded_files": downloaded_files,
"batch_id": batch_id,
"total_urls": len(urls),
"processed_urls": len(downloaded_files)
}
except json.JSONDecodeError as e:
return {"success": False, "error": f"Failed to parse JSON: {e}"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool()
async def convert_pdf_file(file_path: str, enable_ocr: bool = True) -> Dict[str, Any]:
"""
Convert local PDF file to Markdown, supports single file or file list
Args:
file_path: PDF file local path or path list, can be separated by spaces, commas, or newlines
enable_ocr: Whether to enable OCR (default: True)
Returns:
dict: Conversion result information
"""
if not MINERU_API_KEY:
return {"success": False, "error": "Missing API key, please set environment variable MINERU_API_KEY"}
if isinstance(file_path, str):
file_paths = parse_path_string(file_path)
else:
file_paths = [file_path]
for path in file_paths:
if not os.path.exists(path):
return {"success": False, "error": f"File does not exist: {path}"}
else:
if not path.lower().endswith('.pdf'):
return {"success": False, "error": f"File is not in PDF format: {path}"}
async with httpx.AsyncClient(timeout=300.0) as client:
try:
file_names = [os.path.basename(path) for path in file_paths]
files_data = []
for i, name in enumerate(file_names):
files_data.append({
"name": name,
"is_ocr": enable_ocr,
"data_id": f"file_convert_{i+1}_{int(time.time())}"
})
file_url_data = {
"enable_formula": True,
"language": "auto",
"layout_model": "doclayout_yolo",
"enable_table": True,
"files": files_data
}
file_url_response = await client.post(
MINERU_FILE_URLS_API,
headers=HEADERS,
json=file_url_data,
timeout=60.0
)
if file_url_response.status_code != 200:
return {"success": False, "error": f"Failed to get upload link: {file_url_response.status_code}"}
file_url_result = file_url_response.json()
if file_url_result.get("code") != 0 and file_url_result.get("code") != 200:
error_msg = file_url_result.get("msg", "Unknown error")
return {"success": False, "error": f"Failed to get upload link: {error_msg}"}
batch_id = file_url_result.get("data", {}).get("batch_id", "")
file_urls = file_url_result.get("data", {}).get("file_urls", [])
if not batch_id or not file_urls or len(file_urls) != len(file_paths):
return {"success": False, "error": "Failed to get upload link or batch ID"}
upload_results = []
for i, (file_path, upload_url) in enumerate(zip(file_paths, file_urls)):
try:
with open(file_path, 'rb') as f:
file_content = f.read()
upload_response = await client.put(
upload_url,
content=file_content,
headers={},
timeout=300.0
)
if upload_response.status_code != 200:
upload_results.append({"file": file_names[i], "success": False})
else:
upload_results.append({"file": file_names[i], "success": True})
except Exception as e:
upload_results.append({"file": file_names[i], "success": False, "error": str(e)})
if not any(result["success"] for result in upload_results):
return {"success": False, "error": "All files failed to upload", "upload_results": upload_results}
task_status = await check_task_status(client, batch_id)
if not task_status.get("success"):
return task_status
downloaded_files = await download_batch_results(client, task_status.get("extract_results", []))
return {
"success": True,
"downloaded_files": downloaded_files,
"batch_id": batch_id,
"upload_results": upload_results,
"total_files": len(file_paths),
"processed_files": len(downloaded_files)
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.prompt()
def default_prompt() -> str:
"""Create default tool usage prompt"""
return """
PDF to Markdown Conversion Service provides two tools, each with different functions:
- convert_pdf_url: Specifically designed for handling single or multiple URL links
- convert_pdf_file: Specifically designed for handling single or multiple local file paths
Please choose the appropriate tool based on the input type:
- If it's a single or multiple URL, use convert_pdf_url
- If it's a single or multiple local file, use convert_pdf_file
- If it's a mix of URL and local file, please call the above two tools separately to handle the corresponding input
The converted Markdown files will be saved in the specified output directory, and the temporary downloaded ZIP files will be automatically deleted after unzipping to save space.
"""
@mcp.prompt()
def pdf_prompt(path: str) -> str:
"""Create PDF processing prompt"""
return f"""
Please convert the following PDF to Markdown format:
{path}
Please choose the appropriate tool based on the input type:
- If it's a single or multiple URL, use convert_pdf_url
- If it's a single or multiple local file, use convert_pdf_file
The converted Markdown files will be saved in the specified output directory, and the temporary downloaded ZIP files will be automatically deleted after unzipping to save space.
"""
@mcp.resource("status://api")
def get_api_status() -> str:
"""Get API status information"""
if not MINERU_API_KEY:
return "API status: Not configured (missing API key)"
return f"API status: Configured\nAPI base URL: {MINERU_API_BASE}\nAPI key: {MINERU_API_KEY[:10]}..."
@mcp.resource("help://usage")
def get_usage_help() -> str:
"""Get tool usage help information"""
return """
# PDF to Markdown Conversion Service
## Available tools:
1. **convert_pdf_url** - Convert PDF URL to Markdown, supports single or multiple URLs
- Parameters:
- url: PDF file URL or URL list, can be separated by spaces, commas, or newlines
- enable_ocr: Whether to enable OCR (default: True)
2. **convert_pdf_file** - Convert local PDF file to Markdown, supports single or multiple file paths
- Parameters:
- file_path: PDF file local path or path list, can be separated by spaces, commas, or newlines
- enable_ocr: Whether to enable OCR (default: True)
## Tool functions:
- **convert_pdf_url**: Specifically designed for handling URL links, suitable for single or multiple URL inputs
- **convert_pdf_file**: Specifically designed for handling local files, suitable for single or multiple file path inputs
## Mixed input handling:
When handling both URL and local file inputs, please call the above two tools separately to handle the corresponding input parts.
## Usage examples:
```python
# Convert single URL
result = await convert_pdf_url("https://example.com/document.pdf")
# Convert multiple URLs (batch processing)
result = await convert_pdf_url('''
https://example.com/document1.pdf
https://example.com/document2.pdf
https://example.com/document3.pdf
''')
# Convert multiple URLs with comma separation
result = await convert_pdf_url("https://example.com/doc1.pdf, https://example.com/doc2.pdf")
# Convert single local file
result = await convert_pdf_file("C:/Documents/document.pdf")
# Convert multiple local files (batch processing)
result = await convert_pdf_file('''
C:/Documents/document1.pdf
C:/Documents/document2.pdf
C:/Documents/document3.pdf
''')
# Mixed input handling (URLs and local files)
url_result = await convert_pdf_url('''
https://example.com/doc1.pdf
https://example.com/doc2.pdf
''')
file_result = await convert_pdf_file('''
C:/Documents/doc1.pdf
C:/Documents/doc2.pdf
''')
```
## Conversion results:
Successful conversion returns a dictionary containing conversion result information, and the converted Markdown files will be saved in the specified output directory, with temporary downloaded ZIP files automatically deleted after unzipping to save space.
"""
if __name__ == "__main__":
if not MINERU_API_KEY:
print("Warning: API key not set, please set environment variable MINERU_API_KEY")
mcp.run()