Skip to main content
Glama

Mineru MCP Server

by shanshili
main.py12.2 kB
from __future__ import annotations import io import os import shutil import time import uuid import zipfile from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Optional, Tuple import requests from fastmcp import FastMCP BASE_URL = "https://mineru.net" API_TOKEN_ENV = "MINERU_API_TOKEN" DEFAULT_LANGUAGE = "ch" DEFAULT_POLL_INTERVAL = 3.0 DEFAULT_MAX_WAIT = 1800.0 DEFAULT_EXTRA_FORMATS = ["html"] mcp = FastMCP("MineruServer") @dataclass class MineruConfig: api_token: str pdf_folder: Path output_folder: Path language: str enable_table: bool extra_formats: List[str] poll_interval: float max_wait: float rename_assets: bool is_ocr: bool def list_pdf_files(pdf_folder: Path) -> List[Path]: return sorted(path for path in pdf_folder.glob("*.pdf") if path.is_file()) def request_upload_urls(pdf_files: List[Path], config: MineruConfig) -> Tuple[str, Dict[str, str]]: payload_files = [{"name": file.name, "is_ocr": config.is_ocr} for file in pdf_files] request_body = { "enable_table": config.enable_table, "language": config.language, "extra_formats": config.extra_formats, "files": payload_files, } headers = { "Content-Type": "application/json", "Authorization": f"Bearer {config.api_token}", } url = f"{BASE_URL}/api/v4/file-urls/batch" response = requests.post(url, headers=headers, json=request_body, timeout=60) response.raise_for_status() body = response.json() if body.get("code") != 0: raise RuntimeError(f"Mineru API 返回错误: {body.get('msg')}") data = body.get("data", {}) batch_id = data.get("batch_id") if not batch_id: raise RuntimeError("Mineru API 未返回 batch_id") file_urls: List[str] = data.get("file_urls") or [] if len(file_urls) != len(pdf_files): raise RuntimeError("获取的上传链接数量与文件数量不匹配") upload_map = {file.name: url for file, url in zip(pdf_files, file_urls)} return batch_id, upload_map def upload_files(upload_map: Dict[str, str], pdf_files: List[Path]) -> List[Dict[str, str]]: details: List[Dict[str, str]] = [] for file_path in pdf_files: upload_url = upload_map.get(file_path.name) if not upload_url: details.append( { "file": file_path.name, "stage": "upload", "status": "error", "message": "未找到上传链接", } ) continue try: with file_path.open("rb") as handle: response = requests.put(upload_url, data=handle, timeout=600) response.raise_for_status() details.append( { "file": file_path.name, "stage": "upload", "status": "success", "message": "上传成功", } ) except requests.exceptions.RequestException as exc: details.append( { "file": file_path.name, "stage": "upload", "status": "error", "message": f"上传失败: {exc}", } ) return details def download_and_extract(zip_url: str, file_name: str, config: MineruConfig) -> Dict[str, str]: output_folder = config.output_folder base_name = Path(file_name).stem temp_dir = output_folder / f"__mineru_tmp_{uuid.uuid4().hex}" try: response = requests.get(zip_url, timeout=600) response.raise_for_status() with zipfile.ZipFile(io.BytesIO(response.content)) as archive: archive.extractall(temp_dir) html_file = None asset_dirs: List[Path] = [] for path in temp_dir.rglob("*"): if path.is_file() and path.suffix.lower() == ".html": html_file = path if path.is_dir() and path.name.lower() in {"figure", "images", "assets"}: asset_dirs.append(path) if not html_file: return { "file": file_name, "stage": "download", "status": "error", "message": "结果包中未找到 HTML 文件", } if config.rename_assets: rename_assets(asset_dirs, html_file, base_name) final_html_path = output_folder / f"{base_name}.html" final_html_path.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(html_file), final_html_path) for asset_dir in asset_dirs: if not asset_dir.exists(): continue target_dir = output_folder / asset_dir.name if target_dir.exists(): shutil.copytree(asset_dir, target_dir, dirs_exist_ok=True) shutil.rmtree(asset_dir, ignore_errors=True) else: shutil.move(str(asset_dir), target_dir) return { "file": file_name, "stage": "download", "status": "success", "message": f"已保存 {final_html_path.name}", } except requests.exceptions.RequestException as exc: return { "file": file_name, "stage": "download", "status": "error", "message": f"下载失败: {exc}", } except Exception as exc: # pylint: disable=broad-except return { "file": file_name, "stage": "download", "status": "error", "message": f"处理结果包失败: {exc}", } finally: shutil.rmtree(temp_dir, ignore_errors=True) def rename_assets(asset_dirs: List[Path], html_file: Path, base_name: str) -> None: if not asset_dirs: return rename_map: Dict[str, str] = {} for asset_dir in asset_dirs: files = sorted( file for file in asset_dir.iterdir() if file.is_file() ) counter = 1 for file in files: extension = file.suffix.lower() if extension not in {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".svg"}: continue new_name = f"{base_name}_{counter}{extension}" counter += 1 new_path = file.with_name(new_name) file.rename(new_path) relative_old = f"{asset_dir.name}/{file.name}" relative_new = f"{asset_dir.name}/{new_name}" rename_map[relative_old] = relative_new if not rename_map: return html_content = html_file.read_text(encoding="utf-8") for old, new in rename_map.items(): html_content = html_content.replace(f'"{old}"', f'"{new}"') html_content = html_content.replace(f"'{old}'", f"'{new}'") html_file.write_text(html_content, encoding="utf-8") def poll_results(batch_id: str, config: MineruConfig, expected_files: List[str]) -> List[Dict[str, str]]: details: List[Dict[str, str]] = [] completed: Dict[str, bool] = {name: False for name in expected_files} headers = {"Authorization": f"Bearer {config.api_token}"} url = f"{BASE_URL}/api/v4/extract-results/batch/{batch_id}" deadline = time.monotonic() + config.max_wait while not all(completed.values()) and time.monotonic() < deadline: response = requests.get(url, headers=headers, timeout=60) response.raise_for_status() body = response.json() if body.get("code") != 0: raise RuntimeError(f"Mineru 状态接口返回错误: {body.get('msg')}") extract_results = body.get("data", {}).get("extract_result", []) for result in extract_results: file_name = result.get("file_name") if not file_name or file_name not in completed or completed[file_name]: continue state = result.get("state") if state == "done": zip_url = result.get("full_zip_url") if not zip_url: details.append( { "file": file_name, "stage": "download", "status": "error", "message": "状态为 done 但未提供下载链接", } ) else: details.append(download_and_extract(zip_url, file_name, config)) completed[file_name] = True elif state == "failed": message = result.get("err_msg") or "未知错误" details.append( { "file": file_name, "stage": "convert", "status": "error", "message": message, } ) completed[file_name] = True if not all(completed.values()): time.sleep(config.poll_interval) for file_name, is_done in completed.items(): if not is_done: details.append( { "file": file_name, "stage": "convert", "status": "error", "message": "超时未完成", } ) return details @mcp.tool def convert_pdfs_with_mineru( pdf_folder: str, output_folder: str, api_token: Optional[str] = None, language: str = DEFAULT_LANGUAGE, enable_table: bool = True, extra_formats: Optional[List[str]] = None, poll_interval: float = DEFAULT_POLL_INTERVAL, max_wait: float = DEFAULT_MAX_WAIT, rename_assets_flag: bool = True, is_ocr: bool = True, ) -> Dict[str, object]: """批量调用 Mineru API 将 PDF 转为 HTML。""" pdf_path = Path(pdf_folder).expanduser() output_path = Path(output_folder).expanduser() output_path.mkdir(parents=True, exist_ok=True) if not pdf_path.exists() or not pdf_path.is_dir(): raise NotADirectoryError(f"PDF 目录不存在: {pdf_path}") token = api_token or os.getenv(API_TOKEN_ENV) if not token: raise ValueError("缺少 Mineru API Token,请通过参数 api_token 或环境变量 MINERU_API_TOKEN 提供") pdf_files = list_pdf_files(pdf_path) if not pdf_files: return { "pdf_total": 0, "uploaded": 0, "completed": 0, "details": [], "message": "指定目录下未找到 PDF 文件", } formats = list(extra_formats) if extra_formats else list(DEFAULT_EXTRA_FORMATS) if not any(fmt.lower() == "html" for fmt in formats): formats.append("html") config = MineruConfig( api_token=token, pdf_folder=pdf_path, output_folder=output_path, language=language, enable_table=enable_table, extra_formats=formats, poll_interval=max(1.0, poll_interval), max_wait=max(60.0, max_wait), rename_assets=rename_assets_flag, is_ocr=is_ocr, ) batch_id, upload_map = request_upload_urls(pdf_files, config) upload_details = upload_files(upload_map, pdf_files) successful_files = [item["file"] for item in upload_details if item["status"] == "success"] if not successful_files: return { "pdf_total": len(pdf_files), "uploaded": 0, "completed": 0, "details": upload_details, "message": "所有文件上传失败,请检查日志", } poll_details = poll_results(batch_id, config, successful_files) all_details = upload_details + poll_details completed_count = sum(1 for item in poll_details if item["status"] == "success") return { "pdf_total": len(pdf_files), "uploaded": len(successful_files), "completed": completed_count, "details": all_details, "output_directory": str(output_path), } def start_server() -> None: mcp.run( transport="http", host="127.0.0.1", port=4399, log_level="DEBUG", ) if __name__ == "__main__": start_server()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shanshili/mineru_mcp_project_demo'

If you have feedback or need assistance with the MCP directory API, please join our Discord server