MCP Development Framework

""" PDF解析工具,用于解析PDF文件内容,支持快速预览和完整解析两种模式 """ import os import tempfile import shutil import fitz # PyMuPDF import PyPDF2 import pymupdf4llm import traceback from typing import Dict, List, Any import mcp.types as types from . import BaseTool, ToolRegistry from PIL import Image import io import pytesseract import base64 import imghdr @ToolRegistry.register class PdfTool(BaseTool): """ PDF解析工具,支持两种模式: 1. 快速预览模式:仅提取文本内容,适用于大型PDF文件 2. 完整解析模式:提取文本和图片内容,提供更详细的文档分析 """ name = "pdf" description = "解析PDF文件内容,支持快速预览和完整解析两种模式" input_schema = { "type": "object", "required": ["file_path"], "properties": { "file_path": { "type": "string", "description": "PDF文件的本地路径,例如'/path/to/document.pdf'", }, "mode": { "type": "string", "description": "解析模式:'quick'(仅文本)或'full'(文本和图片),默认为'full'", "enum": ["quick", "full"], "default": "full" } }, } async def execute(self, arguments: Dict[str, Any]) -> List[types.TextContent | types.ImageContent | types.EmbeddedResource]: """ 解析PDF文件 Args: arguments: 参数字典,必须包含'file_path'键,可选'mode'键 Returns: PDF内容列表 """ if "file_path" not in arguments: return [types.TextContent( type="text", text="错误: 缺少必要参数 'file_path'" )] file_path = arguments["file_path"] mode = arguments.get("mode", "full") # 检查文件是否存在 if not os.path.exists(file_path): return [types.TextContent( type="text", text=f"错误: 文件不存在: {file_path}" )] # 检查文件扩展名 if not file_path.lower().endswith('.pdf'): return [types.TextContent( type="text", text=f"错误: 文件不是PDF格式: {file_path}" )] try: if mode == "quick": return await self._quick_preview_pdf(file_path) else: return await self._full_parse_pdf(file_path) except Exception as e: error_details = traceback.format_exc() return [types.TextContent( type="text", text=f"错误: 处理PDF文件时发生错误: {str(e)}\n{error_details}" )] async def _quick_preview_pdf(self, file_path: str) -> List[types.TextContent | types.ImageContent | types.EmbeddedResource]: """ 快速预览PDF文件,仅提取文本内容 """ try: # 使用PyMuPDF提取文本 doc = fitz.open(file_path) text_content = [] # 添加文件信息 text_content.append(f"文件名: {os.path.basename(file_path)}") text_content.append(f"页数: {doc.page_count}") text_content.append("---") # 提取每页文本 for page_num in range(doc.page_count): page = doc[page_num] text = page.get_text() if text.strip(): text_content.append(f"第{page_num + 1}页:") text_content.append(text) text_content.append("---") doc.close() return [types.TextContent( type="text", text="\n".join(text_content) )] except Exception as e: error_details = traceback.format_exc() return [types.TextContent( type="text", text=f"错误: 快速预览PDF时发生错误: {str(e)}\n{error_details}" )] def _get_image_mime_type(self, image_bytes: bytes) -> str: """ 获取图片的MIME类型 """ image_type = imghdr.what(None, image_bytes) if image_type: return f"image/{image_type}" return "image/png" # 默认返回PNG类型 async def _analyze_image(self, image_bytes: bytes, lang: str = 'chi_sim+eng') -> str: """ 分析图片内容,识别文字和场景 Args: image_bytes: 图片的二进制数据 lang: OCR语言,默认中文简体+英文 Returns: str: 图片分析结果 """ try: # 将二进制数据转换为PIL Image对象 image = Image.open(io.BytesIO(image_bytes)) # 进行OCR文字识别 text = pytesseract.image_to_string(image, lang=lang) # 如果识别出文字,返回结果 if text.strip(): return f"图片中识别出的文字:\n{text.strip()}" else: return "未在图片中识别出文字" except Exception as e: return f"图片分析失败: {str(e)}" async def _full_parse_pdf(self, file_path: str) -> List[types.TextContent | types.ImageContent | types.EmbeddedResource]: """ 完整解析PDF文件,提取文本和图片内容 """ results = [] try: # 使用PyMuPDF提取文本和图片 doc = fitz.open(file_path) # 添加文件信息 results.append(types.TextContent( type="text", text=f"文件名: {os.path.basename(file_path)}\n页数: {doc.page_count}\n---" )) # 处理每一页 for page_num in range(doc.page_count): page = doc[page_num] # 提取文本 text = page.get_text() if text.strip(): results.append(types.TextContent( type="text", text=f"第{page_num + 1}页:\n{text}\n---" )) # 提取图片并进行OCR识别 image_list = page.get_images() if image_list: results.append(types.TextContent( type="text", text=f"第{page_num + 1}页包含{len(image_list)}张图片" )) for img_idx, img_info in enumerate(image_list): try: xref = img_info[0] base_image = doc.extract_image(xref) image_bytes = base_image["image"] # 分析图片内容 image_analysis = await self._analyze_image(image_bytes) # 只添加OCR识别结果 results.append(types.TextContent( type="text", text=f"第{page_num + 1}页 图片{img_idx + 1}分析结果:\n{image_analysis}\n---" )) except Exception as img_error: results.append(types.TextContent( type="text", text=f"警告: 处理第{page_num + 1}页图片{img_idx + 1}时出错: {str(img_error)}" )) doc.close() return results except Exception as e: error_details = traceback.format_exc() return [types.TextContent( type="text", text=f"错误: 完整解析PDF时发生错误: {str(e)}\n{error_details}" )]