Skip to main content
Glama
document_conversion_and_processing_demo.py86.5 kB
#!/usr/bin/env python """ DETAILED Demonstration script for the STANDALONE Document Processing functions in Ultimate MCP Server, showcasing integrated OCR, analysis, conversion, and batch capabilities with extensive examples. """ import asyncio import base64 import datetime as dt import json import os import sys import traceback # Added for more detailed error printing if needed import warnings # Added for warning control from pathlib import Path from typing import Any, Awaitable, Callable, Dict, Optional, Tuple import httpx # Filter Docling-related deprecation warnings warnings.filterwarnings("ignore", category=DeprecationWarning, module="docling") warnings.filterwarnings("ignore", category=DeprecationWarning, module="docling_core") warnings.filterwarnings("ignore", message="Could not parse formula with MathML") # Add project root to path for imports when running as script # Adjust this relative path if your script structure is different _PROJECT_ROOT = Path(__file__).resolve().parent.parent if str(_PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(_PROJECT_ROOT)) print(f"INFO: Added {_PROJECT_ROOT} to sys.path") # Rich imports for enhanced terminal UI from rich import box, get_console # noqa: E402 from rich.console import Group # noqa: E402 from rich.layout import Layout # noqa: E402 from rich.markdown import Markdown # noqa: E402 from rich.markup import escape # noqa: E402 from rich.panel import Panel # noqa: E402 from rich.progress import ( # noqa: E402 BarColumn, FileSizeColumn, Progress, TextColumn, TimeRemainingColumn, TransferSpeedColumn, ) from rich.rule import Rule # noqa: E402 from rich.syntax import Syntax # noqa: E402 from rich.table import Table # noqa: E402 from rich.text import Text # noqa: E402 from rich.traceback import install as install_rich_traceback # noqa: E402 # --- Global Constants --- # Maximum number of lines to display for any content MAX_DISPLAY_LINES = 50 # Used to truncate all displayed content # --- Attempt to import required MCP Server components --- try: # Assuming standard MCP Server structure from ultimate_mcp_server.core.server import Gateway from ultimate_mcp_server.exceptions import ToolError, ToolInputError # Import the standalone functions and availability flags from ultimate_mcp_server.tools.document_conversion_and_processing import ( # Import availability flags _DOCLING_AVAILABLE, _PANDAS_AVAILABLE, _TIKTOKEN_AVAILABLE, analyze_pdf_structure, canonicalise_entities, chunk_document, clean_and_format_text_as_markdown, convert_document, detect_content_type, enhance_ocr_text, extract_entities, extract_metrics, extract_tables, flag_risks, generate_qa_pairs, identify_sections, ocr_image, optimize_markdown_formatting, process_document_batch, summarize_document, ) from ultimate_mcp_server.utils import get_logger from ultimate_mcp_server.utils.display import CostTracker # Import CostTracker MCP_COMPONENTS_LOADED = True except ImportError as e: MCP_COMPONENTS_LOADED = False _IMPORT_ERROR_MSG = str(e) # Handle this error gracefully in the main function print(f"\n[ERROR] Failed to import required MCP components: {_IMPORT_ERROR_MSG}") print("Please ensure:") print("1. You are running this script from the correct directory structure.") print("2. The MCP Server environment is activated.") print("3. All dependencies (including optional ones used in the demo) are installed.") sys.exit(1) # Initialize Rich console and logger console = get_console() logger = get_logger("demo.doc_proc_standalone") # Updated logger name # Install rich tracebacks for better error display install_rich_traceback(show_locals=True, width=console.width, extra_lines=2) # --- Configuration --- SCRIPT_DIR = Path(__file__).resolve().parent DEFAULT_SAMPLE_DIR = SCRIPT_DIR / "sample_docs" # Changed dir name slightly DEFAULT_SAMPLE_PDF_URL = "https://arxiv.org/pdf/1706.03762.pdf" # Attention is All You Need DEFAULT_SAMPLE_IMAGE_URL = "https://raw.githubusercontent.com/tesseract-ocr/tesseract/main/testing/phototest.tif" # Use Tesseract sample TIFF SAMPLE_HTML_URL = "https://en.wikipedia.org/wiki/Transformer_(machine_learning_model)" # Additional sample PDFs for testing diversity BUFFETT_SHAREHOLDER_LETTER_URL = "https://www.berkshirehathaway.com/letters/2022ltr.pdf" # Likely digital PDF, good for text/layout BACKPROPAGATION_PAPER_URL = "https://www.iro.umontreal.ca/~vincentp/ift3395/lectures/backprop_old.pdf" # Older, might be scanned/need OCR DOWNLOADED_FILES_DIR = DEFAULT_SAMPLE_DIR / "downloaded" # Config from environment variables USE_GPU = os.environ.get("USE_GPU", "true").lower() == "true" MAX_CONCURRENT_TASKS = int(os.environ.get("MAX_CONCURRENT_TASKS", "3")) ACCELERATOR_DEVICE = "cuda" if USE_GPU else "cpu" SKIP_DOWNLOADS = os.environ.get("SKIP_DOWNLOADS", "false").lower() == "true" LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() # Define result types for type hints ResultData = Dict[str, Any] OperationResult = Tuple[bool, ResultData] FileResult = Optional[Path] # --- Demo Helper Functions (Mostly unchanged, minor adjustments for clarity) --- def create_demo_layout() -> Layout: """Create a Rich layout for the demo UI.""" layout = Layout(name="root") layout.split( Layout(name="header", size=5), Layout(name="body", ratio=1), Layout(name="footer", size=1), ) layout["footer"].update("[dim]Standalone Document Processing Demo Footer[/]") return layout def timestamp_str(short: bool = False) -> str: """Return a formatted timestamp string.""" now = dt.datetime.now() if short: return f"[dim]{now.strftime('%H:%M:%S')}[/]" return f"[dim]{now.strftime('%Y-%m-%d %H:%M:%S')}[/]" def truncate_text_by_lines(text: str, max_lines: int = MAX_DISPLAY_LINES) -> str: """Truncates text to show first/last lines with indicator.""" if not text or not isinstance(text, str): return "" lines = text.splitlines() if len(lines) <= max_lines: return text half_lines = max_lines // 2 return "\n".join(lines[:half_lines] + ["[dim][...TRUNCATED...]"] + lines[-half_lines:]) def format_value_for_display(key: str, value: Any, detail_level: int = 1) -> Any: """Format specific values for better display.""" if value is None: return "[dim]None[/]" if isinstance(value, bool): return "[green]Yes[/]" if value else "[red]No[/]" if isinstance(value, float): # Specific formatting for processing_time if "time" in key.lower() and not key.lower().startswith("creation"): return f"[green]{value:.3f}s[/]" return f"{value:.3f}" # Standard float formatting if isinstance(value, list): if not value: return "[dim]Empty List[/]" list_len = len(value) preview_count = 3 if detail_level < 2 else 5 suffix = f" [dim]... ({list_len} items total)[/]" if list_len > preview_count else "" if detail_level >= 1: previews = [] for item in value[:preview_count]: item_preview = format_value_for_display(f"{key}_item", item, detail_level=0) previews.append(str(item_preview)) return f"[{', '.join(previews)}]{suffix}" else: return f"[List with {list_len} items]" if isinstance(value, dict): if not value: return "[dim]Empty Dict[/]" dict_len = len(value) preview_count = 4 if detail_level < 2 else 8 preview_keys = list(value.keys())[:preview_count] suffix = f" [dim]... ({dict_len} keys total)[/]" if dict_len > preview_count else "" if detail_level >= 1: items_preview = [ f"{repr(k)}: {format_value_for_display(k, value[k], detail_level=0)}" for k in preview_keys ] return f"{{{'; '.join(items_preview)}}}{suffix}" else: return f"[Dict with {dict_len} keys]" if isinstance(value, str): str_len = len(value) # Always truncate by lines first for display consistency truncated_by_lines = truncate_text_by_lines(value, MAX_DISPLAY_LINES) # Then apply character limit if still too long preview_len = 300 if detail_level < 2 else 600 if len(truncated_by_lines) > preview_len: return escape(truncated_by_lines[:preview_len]) + f"[dim]... ({str_len} chars total)[/]" return escape(truncated_by_lines) return escape(str(value)) def display_result(title: str, result: ResultData, display_options: Optional[Dict] = None) -> None: """Display operation result with enhanced formatting using Rich.""" display_options = display_options or {} start_time = dt.datetime.now() title_display = Text.from_markup(escape(title)) if not isinstance(title, Text) else title console.print(Rule(f"[bold cyan]{title_display}[/] {timestamp_str()}", style="cyan")) success = result.get("success", False) detail_level = display_options.get("detail_level", 1) hide_keys_set = set( display_options.get("hide_keys", ["success", "raw_llm_response", "raw_text"]) ) display_keys = display_options.get("display_keys") # --- Summary Panel --- summary_panel_content = Text() summary_panel_content.append( Text.from_markup( f"Status: {'[bold green]Success[/]' if success else '[bold red]Failed[/]'}\n" ) ) if not success: error_code = result.get("error_code", "N/A") error_msg = result.get("error", "Unknown error") summary_panel_content.append( Text.from_markup(f"Error Code: [yellow]{escape(str(error_code))}[/]\n") ) summary_panel_content.append( Text.from_markup(f"Message: [red]{escape(str(error_msg))}[/]\n") ) console.print( Panel( summary_panel_content, title="Operation Status", border_style="red", padding=(1, 2) ) ) return # Stop display if failed top_level_info = { "processing_time": "Processing Time", "extraction_strategy_used": "Strategy Used", "output_format": "Output Format", "was_html": "Input Detected as HTML", # Relevant for clean_and_format... "file_path": "Output File Path", } for key, display_name in top_level_info.items(): if key in result and key not in hide_keys_set: value_str = format_value_for_display(key, result[key], detail_level=0) summary_panel_content.append( Text.from_markup(f"{display_name}: [blue]{value_str}[/]\n") ) console.print( Panel( summary_panel_content, title="Operation Summary", border_style="green", padding=(1, 2) ) ) # --- Details Section --- details_to_display = {} for key, value in result.items(): if ( key in hide_keys_set or key in top_level_info or key.startswith("_") ): # Skip internal keys continue if display_keys and key not in display_keys: continue details_to_display[key] = value if not details_to_display: console.print(Text.from_markup("[dim]No further details requested or available.[/]")) console.print() return console.print(Rule("Details", style="dim")) for key, value in details_to_display.items(): key_title = key.replace("_", " ").title() panel_border = "blue" panel_content: Any = None format_type = "text" # Determine format for content-like keys is_content_key = key.lower() in [ "content", "markdown_text", "optimized_markdown", "summary", "first_table_preview", "tables", ] if is_content_key: if "markdown" in key.lower() or result.get("output_format") == "markdown": format_type = "markdown" elif result.get("output_format") == "html": format_type = "html" elif ( result.get("output_format") == "json" or key == "tables" and result.get("tables") and isinstance(result.get("tables")[0], list) ): format_type = "json" elif ( key == "tables" and result.get("tables") and isinstance(result.get("tables")[0], str) ): # Assuming CSV string format_type = "csv" else: format_type = "text" format_type = display_options.get("format_key", {}).get( key, format_type ) # Allow override if is_content_key and isinstance(value, str): if not value: panel_content = "[dim]Empty Content[/]" else: truncated_value = truncate_text_by_lines(value, MAX_DISPLAY_LINES) if format_type == "markdown": panel_content = Markdown(truncated_value) elif format_type == "csv": panel_content = Syntax( truncated_value, "csv", theme="paraiso-dark", line_numbers=False, word_wrap=True, ) else: panel_content = Syntax( truncated_value, format_type, theme="paraiso-dark", line_numbers=False, word_wrap=True, ) panel_border = "green" if format_type == "markdown" else "white" console.print( Panel( panel_content, title=key_title, border_style=panel_border, padding=(1, 2), expand=False, ) ) elif key.lower() == "chunks" and isinstance(value, list): chunk_table = Table( title=f"Chunk Preview (Total: {len(value)})", box=box.MINIMAL, show_header=True ) chunk_table.add_column("#", style="cyan") chunk_table.add_column("Preview (First 80 chars)", style="white") chunk_table.add_column("Length", style="green") limit = 5 if detail_level < 2 else 10 for i, chunk in enumerate(value[:limit], 1): chunk_str = str(chunk) chunk_preview = truncate_text_by_lines( chunk_str[:80] + ("..." if len(chunk_str) > 80 else ""), 5 ) chunk_table.add_row(str(i), escape(chunk_preview), str(len(chunk_str))) if len(value) > limit: chunk_table.add_row("...", f"[dim]{len(value) - limit} more...[/]", "") console.print(Panel(chunk_table, title=key_title, border_style="blue")) elif key.lower() == "qa_pairs" and isinstance(value, list): qa_text = Text() limit = 3 if detail_level < 2 else 5 for i, qa in enumerate(value[:limit], 1): q_text = truncate_text_by_lines(qa.get("question", ""), 5) a_text = truncate_text_by_lines(qa.get("answer", ""), 10) qa_text.append(f"{i}. Q: ", style="bold cyan") qa_text.append(escape(q_text) + "\n") qa_text.append(" A: ", style="green") qa_text.append(escape(a_text) + "\n\n") if len(value) > limit: qa_text.append(f"[dim]... {len(value) - limit} more ...[/]") console.print(Panel(qa_text, title=key_title, border_style="blue")) elif ( key.lower() == "tables" and isinstance(value, list) and value ): # Handle table list (JSON/Pandas) first_table = value[0] if isinstance(first_table, list): # JSON format panel_content = Syntax( json.dumps(first_table[:5], indent=2), "json", theme="paraiso-dark", line_numbers=False, word_wrap=True, ) panel_title = f"{key_title} (First Table JSON Preview, {len(value)} total)" console.print( Panel(panel_content, title=panel_title, border_style="yellow", padding=(1, 1)) ) elif hasattr(first_table, "to_string"): # Pandas DataFrame panel_content = escape(first_table.head(5).to_string()) panel_title = f"{key_title} (First Table Pandas Preview, {len(value)} total)" console.print( Panel(panel_content, title=panel_title, border_style="yellow", padding=(1, 1)) ) else: # Fallback if format unknown console.print( Panel( f"First table type: {type(first_table).__name__}. Preview:\n{str(first_table)[:500]}...", title=key_title, border_style="yellow", ) ) elif isinstance(value, dict): # General Dict Handling (metadata, metrics, risks, etc.) dict_table = Table(title="Contents", box=box.MINIMAL, show_header=False, expand=False) dict_table.add_column("SubKey", style="magenta", justify="right", no_wrap=True) dict_table.add_column("SubValue", style="white") item_count = 0 max_items = 5 if detail_level == 0 else 20 for k, v in value.items(): dict_table.add_row( escape(str(k)), format_value_for_display(k, v, detail_level=detail_level) ) item_count += 1 if item_count >= max_items: dict_table.add_row("[dim]...[/]", f"[dim]({len(value)} total items)[/]") break panel_border = ( "magenta" if "quality" in key.lower() or "metrics" in key.lower() else "blue" ) console.print( Panel(dict_table, title=key_title, border_style=panel_border, padding=(1, 1)) ) elif isinstance(value, list): # General List Handling list_panel_content = [Text.from_markup(f"[cyan]Total Items:[/] {len(value)}")] limit = 5 if detail_level < 2 else 10 for i, item in enumerate(value[:limit]): item_display = format_value_for_display( f"{key}[{i}]", item, detail_level=detail_level - 1 ) list_panel_content.append(f"[magenta]{i + 1}.[/] {item_display}") if len(value) > limit: list_panel_content.append( Text.from_markup(f"[dim]... {len(value) - limit} more ...[/]") ) console.print(Panel(Group(*list_panel_content), title=key_title, border_style="blue")) else: # Fallback for simple types value_display = format_value_for_display(key, value, detail_level=detail_level) console.print(f"[bold cyan]{key_title}:[/] {value_display}") end_time = dt.datetime.now() elapsed = (end_time - start_time).total_seconds() console.print(Text.from_markup(f"[dim]Result details displayed in {elapsed:.3f}s[/]")) console.print() # Add spacing async def download_file_with_progress( url: str, output_path: Path, description: str, progress: Optional[Progress] = None ) -> FileResult: """Download a file with a detailed progress bar.""" if output_path.exists() and output_path.stat().st_size > 1000: logger.info(f"Using existing file: {output_path}") console.print( Text.from_markup(f"[dim]Using existing file: [blue underline]{output_path.name}[/][/]") ) return output_path if SKIP_DOWNLOADS: console.print( f"[yellow]Skipping download for {description} due to SKIP_DOWNLOADS setting.[/]" ) return None console.print(f"Attempting to download [bold]{description}[/] from [underline]{url}[/]...") output_path.parent.mkdir(parents=True, exist_ok=True) try: async with httpx.AsyncClient(follow_redirects=True, timeout=60.0) as client: async with client.stream("GET", url) as response: if response.status_code == 404: logger.error(f"File not found (404) at {url}") console.print(f"[red]Error: File not found (404) for {description}.[/]") return None response.raise_for_status() total_size = int(response.headers.get("content-length", 0)) task_description = f"Downloading {description}..." local_progress = progress is None if local_progress: progress = Progress( # type: ignore TextColumn("[bold blue]{task.description}", justify="right"), BarColumn(bar_width=None), "[progress.percentage]{task.percentage:>3.1f}%", "•", TransferSpeedColumn(), "•", FileSizeColumn(), "•", TimeRemainingColumn(), console=console, transient=True, ) progress.start() # type: ignore download_task = progress.add_task(task_description, total=total_size) # type: ignore bytes_downloaded = 0 try: with open(output_path, "wb") as f: async for chunk in response.aiter_bytes(): f.write(chunk) bytes_written = len(chunk) bytes_downloaded += bytes_written progress.update(download_task, advance=bytes_written) # type: ignore progress.update( download_task, completed=max(bytes_downloaded, total_size), description=f"Downloaded {description}", ) # type: ignore finally: if local_progress: progress.stop() # type: ignore logger.info(f"Successfully downloaded {description} to {output_path}") console.print( Text.from_markup( f"[green]✓ Downloaded {description} to [blue underline]{output_path.name}[/][/]" ) ) return output_path except httpx.RequestError as e: logger.error(f"Network error downloading {description} from {url}: {e}") console.print( Text.from_markup( f"[red]Network Error downloading {description}: {type(e).__name__}. Check connection or URL.[/]" ) ) return None except Exception as e: logger.error(f"Failed to download {description} from {url}: {e}", exc_info=True) console.print( Text.from_markup(f"[red]Error downloading {description}: {type(e).__name__} - {e}[/]") ) if output_path.exists(): try: output_path.unlink() except OSError: pass return None async def safe_tool_call( operation_name: str, tool_func: Callable[..., Awaitable[Dict]], *args, tracker: Optional[CostTracker] = None, **kwargs, ) -> OperationResult: """Safely call a standalone tool function, handling exceptions and logging.""" console.print( Text.from_markup( f"\n[cyan]Calling Tool:[/][bold] {escape(operation_name)}[/] {timestamp_str(short=True)}" ) ) display_options = kwargs.pop("display_options", {}) # Extract display options # Log arguments carefully log_args_repr = {} MAX_ARG_LEN = 100 for k, v in kwargs.items(): if k == "image_data" and isinstance(v, str): # Don't log full base64 log_args_repr[k] = f"str(len={len(v)}, starting_chars='{v[:10]}...')" elif isinstance(v, (str, bytes)) and len(v) > MAX_ARG_LEN: log_args_repr[k] = f"{type(v).__name__}(len={len(v)})" elif isinstance(v, (list, dict)) and len(v) > 10: log_args_repr[k] = f"{type(v).__name__}(len={len(v)})" else: log_args_repr[k] = repr(v) logger.debug(f"Executing {operation_name} with kwargs: {log_args_repr}") try: # Directly call the standalone function result = await tool_func(*args, **kwargs) if not isinstance(result, dict): logger.error( f"Tool '{operation_name}' returned non-dict type: {type(result)}. Value: {str(result)[:150]}" ) return False, { "success": False, "error": f"Tool returned unexpected type: {type(result).__name__}", "error_code": "INTERNAL_ERROR", "_display_options": display_options, } # Cost tracking (if applicable) if tracker is not None and result.get("success", False): # The standalone functions might not directly return cost info in the same way. # If LLM calls happen internally, cost tracking might need to be done within # the `_standalone_llm_call` or rely on the global tracker if `generate_completion` updates it. # For now, assume cost is tracked elsewhere or add specific fields if needed. if "llm_cost" in result or "cost" in result: # Attempt to track cost if relevant fields exist cost = result.get("cost", result.get("llm_cost", 0.0)) input_tokens = result.get("input_tokens", 0) output_tokens = result.get("output_tokens", 0) provider = result.get("provider", "unknown") model = result.get("model", operation_name) # Use op name as fallback model processing_time = result.get("processing_time", 0.0) tracker.add_call_data( cost, input_tokens, output_tokens, provider, model, processing_time ) result["_display_options"] = display_options # Pass options for display func logger.debug(f"Tool '{operation_name}' completed successfully.") return True, result except ToolInputError as e: logger.warning(f"Input error for {operation_name}: {e}") return False, { "success": False, "error": str(e), "error_code": e.error_code, "_display_options": display_options, } except ToolError as e: logger.error(f"Tool error during {operation_name}: {e}") return False, { "success": False, "error": str(e), "error_code": e.error_code, "_display_options": display_options, } except Exception as e: logger.error(f"Unexpected error during {operation_name}: {e}", exc_info=True) tb_str = traceback.format_exc(limit=1) return False, { "success": False, "error": f"{type(e).__name__}: {e}\n{tb_str}", "error_type": type(e).__name__, "error_code": "UNEXPECTED_ERROR", "_display_options": display_options, } # --- Demo Sections (Updated to call standalone functions) --- async def demo_section_1_conversion_ocr( sample_files: Dict[str, Path], tracker: CostTracker ) -> None: """Demonstrate convert_document with various strategies and OCR.""" console.print(Rule("[bold green]Demo 1: Document Conversion & OCR[/]", style="green")) logger.info("Starting Demo Section 1: Conversion & OCR") pdf_digital = sample_files.get("pdf_digital") buffett_pdf = sample_files.get("buffett_pdf") backprop_pdf = sample_files.get("backprop_pdf") conversion_outputs_dir = sample_files.get("conversion_outputs_dir") pdf_files_to_process = [pdf for pdf in [pdf_digital, buffett_pdf, backprop_pdf] if pdf] if not pdf_files_to_process: console.print("[yellow]Skipping Demo 1: Need at least one sample PDF.[/]") return def get_output_path( input_file: Path, format_name: str, strategy: str, output_format: str ) -> str: base_name = input_file.stem return str(conversion_outputs_dir / f"{base_name}_{strategy}_{format_name}.{output_format}") for pdf_file in pdf_files_to_process: console.print( Panel( Text.from_markup(f"Processing PDF: [cyan]{pdf_file.name}[/]"), border_style="blue" ) ) # 1a: Direct Text Strategy (Raw Text) output_path = get_output_path(pdf_file, "direct", "raw_text", "txt") success, result = await safe_tool_call( f"{pdf_file.name} -> Text (Direct Text)", convert_document, # Call standalone function tracker=tracker, document_path=str(pdf_file), output_format="text", extraction_strategy="direct_text", enhance_with_llm=False, save_to_file=True, output_path=output_path, ) if success: display_result( f"{pdf_file.name} -> Text (Direct Text)", result, {"format_key": {"content": "text"}}, ) # 1b: Direct Text Strategy (Markdown Output + Enhance) output_path = get_output_path(pdf_file, "direct", "enhanced_md", "md") success, result = await safe_tool_call( f"{pdf_file.name} -> MD (Direct Text + Enhance)", convert_document, # Call standalone function tracker=tracker, document_path=str(pdf_file), output_format="markdown", extraction_strategy="direct_text", enhance_with_llm=True, save_to_file=True, output_path=output_path, ) if success: display_result( f"{pdf_file.name} -> MD (Direct + Enhance)", result, {"format_key": {"content": "markdown"}}, ) # 1c: Docling Strategy (Markdown Output) - Check availability if _DOCLING_AVAILABLE: output_path = get_output_path(pdf_file, "docling", "md", "md") success, result = await safe_tool_call( f"{pdf_file.name} -> MD (Docling)", convert_document, # Call standalone function tracker=tracker, document_path=str(pdf_file), output_format="markdown", extraction_strategy="docling", accelerator_device=ACCELERATOR_DEVICE, save_to_file=True, output_path=output_path, ) if success: display_result( f"{pdf_file.name} -> MD (Docling)", result, {"format_key": {"content": "markdown"}}, ) else: console.print("[yellow]Docling unavailable, skipping Docling conversions.[/]") # --- OCR on PDF --- console.print( Panel( f"Processing PDF with OCR Strategy: [cyan]{pdf_file.name}[/]", border_style="blue" ) ) # 1d: OCR Strategy (Raw Text) output_path = get_output_path(pdf_file, "ocr", "raw_text", "txt") success, result = await safe_tool_call( f"{pdf_file.name} -> Text (OCR Raw)", convert_document, # Call standalone function tracker=tracker, document_path=str(pdf_file), output_format="text", extraction_strategy="ocr", enhance_with_llm=False, ocr_options={"language": "eng", "dpi": 150}, save_to_file=True, output_path=output_path, ) if success: display_result( f"{pdf_file.name} -> Text (OCR Raw)", result, {"format_key": {"content": "text"}, "detail_level": 0}, ) # 1e: OCR Strategy (Markdown, Enhanced, Quality Assess) output_path = get_output_path(pdf_file, "ocr", "enhanced_md", "md") success, result = await safe_tool_call( f"{pdf_file.name} -> MD (OCR + Enhance + Quality)", convert_document, # Call standalone function tracker=tracker, document_path=str(pdf_file), output_format="markdown", extraction_strategy="ocr", enhance_with_llm=True, ocr_options={ "language": "eng", "assess_quality": True, "remove_headers": True, "dpi": 200, }, # Try header removal save_to_file=True, output_path=output_path, ) if success: display_result( f"{pdf_file.name} -> MD (OCR + Enhance + Quality)", result, {"format_key": {"content": "markdown"}}, ) # 1f: Hybrid Strategy output_path = get_output_path(pdf_file, "hybrid", "text", "txt") success, result = await safe_tool_call( f"{pdf_file.name} -> Text (Hybrid + Enhance)", convert_document, # Call standalone function tracker=tracker, document_path=str(pdf_file), output_format="text", extraction_strategy="hybrid_direct_ocr", enhance_with_llm=True, save_to_file=True, output_path=output_path, ) if success: display_result( f"{pdf_file.name} -> Text (Hybrid + Enhance)", result, {"format_key": {"content": "text"}}, ) # --- Image Conversion (Using convert_document) --- image_file = sample_files.get("image") if image_file: console.print( Panel( f"Processing Image via convert_document: [cyan]{image_file.name}[/]", border_style="blue", ) ) output_path = get_output_path(image_file, "convert_doc", "md", "md") success, result = await safe_tool_call( f"{image_file.name} -> MD (Convert Doc)", convert_document, # Call standalone function tracker=tracker, document_path=str(image_file), output_format="markdown", # Strategy inferred save_to_file=True, output_path=output_path, ) if success: display_result( f"{image_file.name} -> MD (via convert_document)", result, {"format_key": {"content": "markdown"}}, ) # --- Conversion from Bytes --- if pdf_digital: console.print(Panel("Processing PDF from Bytes Data using OCR", border_style="blue")) try: pdf_bytes = pdf_digital.read_bytes() output_path = get_output_path(pdf_digital, "bytes", "ocr_text", "txt") success, result = await safe_tool_call( "PDF Bytes -> Text (OCR)", convert_document, # Call standalone function tracker=tracker, document_data=pdf_bytes, output_format="text", extraction_strategy="ocr", enhance_with_llm=False, ocr_options={"dpi": 150}, save_to_file=True, output_path=output_path, ) if success: display_result( "PDF Bytes -> Text (OCR Raw)", result, {"format_key": {"content": "text"}, "detail_level": 0}, ) except Exception as e: console.print(f"[red]Error processing PDF bytes: {e}[/]") async def demo_section_2_dedicated_ocr(sample_files: Dict[str, Path], tracker: CostTracker) -> None: """Demonstrate the dedicated ocr_image tool.""" console.print(Rule("[bold green]Demo 2: Dedicated Image OCR Tool[/]", style="green")) logger.info("Starting Demo Section 2: Dedicated Image OCR Tool") image_file = sample_files.get("image") conversion_outputs_dir = sample_files.get("conversion_outputs_dir") if not image_file: console.print("[yellow]Skipping Demo 2: Sample image not available.[/]") return def get_output_path(base_name: str, method: str, output_format: str) -> str: return str(conversion_outputs_dir / f"{base_name}_ocr_{method}.{output_format}") console.print( Panel( f"Processing Image with ocr_image Tool: [cyan]{image_file.name}[/]", border_style="blue" ) ) # 2a: OCR Image from Path (Default: Enhance=True, Output=Markdown) output_path = get_output_path(image_file.stem, "default", "md") success, result = await safe_tool_call( "OCR Image (Path, Defaults)", ocr_image, # Call standalone function tracker=tracker, image_path=str(image_file), ) if success: try: Path(output_path).write_text(result.get("content", ""), encoding="utf-8") console.print(f"[green]✓ Saved OCR output to: [blue underline]{output_path}[/]") except Exception as e: console.print(f"[red]Error saving OCR output: {e}[/]") display_result( "OCR Image (Path, Defaults)", result, {"format_key": {"content": "markdown"}} ) # 2b: OCR Image from Path (Raw Text, Specific Preprocessing) output_path = get_output_path(image_file.stem, "raw_preprocessing", "txt") success, result = await safe_tool_call( "OCR Image (Path, Raw Text, Preprocessing)", ocr_image, # Call standalone function tracker=tracker, image_path=str(image_file), output_format="text", enhance_with_llm=False, ocr_options={ "language": "eng", "preprocessing": {"threshold": "adaptive", "denoise": True, "deskew": False}, }, ) if success: try: Path(output_path).write_text(result.get("content", ""), encoding="utf-8") console.print(f"[green]✓ Saved OCR output to: [blue underline]{output_path}[/]") except Exception as e: console.print(f"[red]Error saving OCR output: {e}[/]") display_result( "OCR Image (Raw Text, Preprocessing)", result, {"format_key": {"content": "text"}} ) # 2c: OCR Image from Base64 Data (Enhance=True, Quality Assess) try: console.print(Panel("Processing Image from Base64 Data", border_style="blue")) img_bytes = image_file.read_bytes() img_base64 = base64.b64encode(img_bytes).decode("utf-8") output_path = get_output_path(image_file.stem, "base64_enhanced", "md") success, result = await safe_tool_call( "OCR Image (Base64, Enhance, Quality)", ocr_image, # Call standalone function tracker=tracker, image_data=img_base64, output_format="markdown", enhance_with_llm=True, ocr_options={"assess_quality": True}, ) if success: try: Path(output_path).write_text(result.get("content", ""), encoding="utf-8") console.print(f"[green]✓ Saved OCR output to: [blue underline]{output_path}[/]") except Exception as e: console.print(f"[red]Error saving OCR output: {e}[/]") display_result( "OCR Image (Base64, Enhance, Quality)", result, {"format_key": {"content": "markdown"}}, ) except Exception as e: console.print(f"[red]Failed to process image from Base64: {type(e).__name__} - {e}[/]") async def demo_section_3_enhance_text(sample_files: Dict[str, Path], tracker: CostTracker) -> None: """Demonstrate enhancing existing noisy text.""" console.print(Rule("[bold green]Demo 3: Enhance Existing OCR Text[/]", style="green")) logger.info("Starting Demo Section 3: Enhance OCR Text") conversion_outputs_dir = sample_files.get("conversion_outputs_dir") noisy_text = """ INVOlCE # 12345 - ACME C0rp. Date: Octobor 25, 2O23 Billed To: Example Inc. , 123 Main St . Anytown USA Itemm Descriptiom Quantlty Unlt Price Tota1 ----------------------------------------------------------------- Wldget Modell A lO $ I5.0O $l5O.OO Gadgett Type B 5 $ 25.5O $l27.5O Assembly Srvlce 2 hrs $ 75.OO $l5O.OO ----------------------------------------------------------------- Subtota1 : $427.5O Tax (8%) : $ 34.2O TOTAL : $461.7O Notes: Payment due ln 3O days. Thank you for yuor buslness! Page I / l - Confidential Document""" console.print(Panel("Original Noisy Text:", border_style="yellow")) console.print( Syntax(truncate_text_by_lines(noisy_text), "text", theme="default", line_numbers=True) ) def get_output_path(base_name: str, format_name: str) -> str: return str(conversion_outputs_dir / f"{base_name}.{format_name}") # 3a: Enhance to Markdown (Remove Headers, Assess Quality) output_path = get_output_path("enhanced_noisy_text_markdown", "md") success, result = await safe_tool_call( "Enhance -> MD (Rm Headers, Quality)", enhance_ocr_text, # Call standalone function tracker=tracker, text=noisy_text, output_format="markdown", enhancement_options={"remove_headers": True, "assess_quality": True}, ) if success: try: Path(output_path).write_text(result.get("content", ""), encoding="utf-8") console.print(f"[green]✓ Saved enhanced markdown to: [blue underline]{output_path}[/]") except Exception as e: console.print(f"[red]Error saving enhanced markdown: {e}[/]") display_result( "Enhance -> MD (Rm Headers, Quality)", result, {"format_key": {"content": "markdown"}} ) # 3b: Enhance to Plain Text (Keep Headers) output_path = get_output_path("enhanced_noisy_text_plain", "txt") success, result = await safe_tool_call( "Enhance -> Text (Keep Headers)", enhance_ocr_text, # Call standalone function tracker=tracker, text=noisy_text, output_format="text", enhancement_options={"remove_headers": False}, ) if success: try: Path(output_path).write_text(result.get("content", ""), encoding="utf-8") console.print(f"[green]✓ Saved enhanced text to: [blue underline]{output_path}[/]") except Exception as e: console.print(f"[red]Error saving enhanced text: {e}[/]") display_result( "Enhance -> Text (Keep Headers)", result, {"format_key": {"content": "text"}} ) async def demo_section_4_html_markdown(sample_files: Dict[str, Path], tracker: CostTracker) -> None: """Demonstrate HTML processing and Markdown utilities.""" console.print(Rule("[bold green]Demo 4: HTML & Markdown Processing[/]", style="green")) logger.info("Starting Demo Section 4: HTML & Markdown Processing") html_file = sample_files.get("html") conversion_outputs_dir = sample_files.get("conversion_outputs_dir") if not html_file: console.print("[yellow]Skipping Demo 4: Sample HTML not available.[/]") return def get_output_path(base_name: str, method: str, format_name: str) -> str: return str(conversion_outputs_dir / f"{base_name}_{method}.{format_name}") console.print(Panel(f"Processing HTML File: [cyan]{html_file.name}[/]", border_style="blue")) try: html_content = html_file.read_text(encoding="utf-8", errors="replace") except Exception as e: console.print(f"[red]Error reading HTML file {html_file}: {e}[/]") return # --- clean_and_format_text_as_markdown --- console.print(Rule("HTML to Markdown Conversion", style="dim")) # 4a: Auto Extraction (Default) output_path = get_output_path(html_file.stem, "auto_extract", "md") success, result_auto = await safe_tool_call( "HTML -> MD (Auto Extract)", clean_and_format_text_as_markdown, # Call standalone function tracker=tracker, text=html_content, extraction_method="auto", preserve_tables=True, ) if success: try: Path(output_path).write_text(result_auto.get("markdown_text", ""), encoding="utf-8") except Exception as e: console.print(f"[red]Error saving markdown: {e}[/]") else: console.print( f"[green]✓ Saved auto-extracted markdown to: [blue underline]{output_path}[/]" ) display_result( "HTML -> MD (Auto Extract)", result_auto, {"format_key": {"markdown_text": "markdown"}} ) # 4b: Readability Extraction (No Tables) output_path = get_output_path(html_file.stem, "readability_no_tables", "md") success, result_read = await safe_tool_call( "HTML -> MD (Readability, No Tables)", clean_and_format_text_as_markdown, # Call standalone function tracker=tracker, text=html_content, extraction_method="readability", preserve_tables=False, ) if success: try: Path(output_path).write_text(result_read.get("markdown_text", ""), encoding="utf-8") except Exception as e: console.print(f"[red]Error saving markdown: {e}[/]") else: console.print( f"[green]✓ Saved readability markdown to: [blue underline]{output_path}[/]" ) display_result( "HTML -> MD (Readability, No Tables)", result_read, {"format_key": {"markdown_text": "markdown"}}, ) # --- optimize_markdown_formatting --- console.print(Rule("Markdown Optimization", style="dim")) markdown_to_optimize = ( result_auto.get("markdown_text") if success else "## Default MD\n* Item 1\n* Item 2\n" ) if markdown_to_optimize: console.print(Panel("Original Markdown for Optimization:", border_style="yellow")) console.print( Syntax(truncate_text_by_lines(markdown_to_optimize), "markdown", theme="default") ) # 4c: Optimize with fixes and wrapping output_path = get_output_path(html_file.stem, "optimized_normalized", "md") success, result_opt1 = await safe_tool_call( "Optimize MD (Normalize, Fix, Wrap)", optimize_markdown_formatting, # Call standalone function tracker=tracker, markdown=markdown_to_optimize, normalize_headings=True, fix_lists=True, fix_links=True, add_line_breaks=True, max_line_length=80, ) if success: try: Path(output_path).write_text( result_opt1.get("optimized_markdown", ""), encoding="utf-8" ) except Exception as e: console.print(f"[red]Error saving markdown: {e}[/]") else: console.print( f"[green]✓ Saved optimized markdown to: [blue underline]{output_path}[/]" ) display_result( "Optimize MD (Normalize, Fix, Wrap)", result_opt1, {"format_key": {"optimized_markdown": "markdown"}}, ) # 4d: Optimize in Compact Mode output_path = get_output_path(html_file.stem, "optimized_compact", "md") success, result_opt2 = await safe_tool_call( "Optimize MD (Compact Mode)", optimize_markdown_formatting, # Call standalone function tracker=tracker, markdown=markdown_to_optimize, compact_mode=True, ) if success: try: Path(output_path).write_text( result_opt2.get("optimized_markdown", ""), encoding="utf-8" ) except Exception as e: console.print(f"[red]Error saving markdown: {e}[/]") else: console.print( f"[green]✓ Saved compact markdown to: [blue underline]{output_path}[/]" ) display_result( "Optimize MD (Compact Mode)", result_opt2, {"format_key": {"optimized_markdown": "markdown"}}, ) else: console.print("[yellow]Skipping optimization as initial conversion failed.[/]") # --- detect_content_type --- console.print(Rule("Content Type Detection", style="dim")) success, result_detect = await safe_tool_call( "Detect Type (HTML)", detect_content_type, text=html_content[:6000], tracker=tracker ) if success: display_result("Detect Type (HTML)", result_detect) md_for_detect = ( result_auto.get("markdown_text", "## Sample\nText") if result_auto else "## Sample\nText" ) success, result_detect = await safe_tool_call( "Detect Type (Markdown)", detect_content_type, text=md_for_detect[:6000], tracker=tracker ) if success: display_result("Detect Type (Markdown)", result_detect) async def demo_section_5_analyze_structure( sample_files: Dict[str, Path], tracker: CostTracker ) -> None: """Demonstrate the dedicated PDF structure analysis tool.""" console.print(Rule("[bold green]Demo 5: Analyze PDF Structure Tool[/]", style="green")) logger.info("Starting Demo Section 5: Analyze PDF Structure") pdf_digital = sample_files.get("pdf_digital") buffett_pdf = sample_files.get("buffett_pdf") backprop_pdf = sample_files.get("backprop_pdf") conversion_outputs_dir = sample_files.get("conversion_outputs_dir") pdf_files_to_process = [pdf for pdf in [pdf_digital, buffett_pdf, backprop_pdf] if pdf] if not pdf_files_to_process: console.print("[yellow]Skipping Demo 5: No PDF files available.[/]") return def get_output_path(file_name: str, analysis_type: str) -> str: return str(conversion_outputs_dir / f"{file_name}_analysis_{analysis_type}.json") for pdf_file in pdf_files_to_process: console.print( Panel(f"Analyzing PDF Structure: [cyan]{pdf_file.name}[/]", border_style="blue") ) # 5a: Analyze Structure (Default options) output_path = get_output_path(pdf_file.stem, "default") success, result = await safe_tool_call( f"Analyze {pdf_file.name} Structure (Defaults)", analyze_pdf_structure, # Call standalone function tracker=tracker, file_path=str(pdf_file), ) if success: try: result_to_save = {k: v for k, v in result.items() if not k.startswith("_")} Path(output_path).write_text(json.dumps(result_to_save, indent=2), encoding="utf-8") console.print(f"[green]✓ Saved PDF analysis to: [blue underline]{output_path}[/]") except Exception as e: console.print(f"[red]Error saving PDF analysis: {e}[/]") display_result(f"Analyze {pdf_file.name} Structure (Defaults)", result) # 5b: Analyze Structure (All options enabled) output_path = get_output_path(pdf_file.stem, "all_options") success, result_all = await safe_tool_call( f"Analyze {pdf_file.name} Structure (All Options)", analyze_pdf_structure, # Call standalone function tracker=tracker, file_path=str(pdf_file), extract_metadata=True, extract_outline=True, extract_fonts=True, extract_images=True, estimate_ocr_needs=True, ) if success: try: result_to_save = {k: v for k, v in result_all.items() if not k.startswith("_")} Path(output_path).write_text(json.dumps(result_to_save, indent=2), encoding="utf-8") console.print( f"[green]✓ Saved detailed PDF analysis to: [blue underline]{output_path}[/]" ) except Exception as e: console.print(f"[red]Error saving PDF analysis: {e}[/]") display_result(f"Analyze {pdf_file.name} Structure (All Options)", result_all) async def demo_section_6_chunking_tables( sample_files: Dict[str, Path], tracker: CostTracker ) -> None: """Demonstrate Document Chunking and Table Extraction tools.""" console.print(Rule("[bold green]Demo 6: Chunking & Table Extraction[/]", style="green")) logger.info("Starting Demo Section 6: Chunking & Table Extraction") pdf_digital = sample_files.get("pdf_digital") buffett_pdf = sample_files.get("buffett_pdf") backprop_pdf = sample_files.get("backprop_pdf") conversion_outputs_dir = sample_files.get("conversion_outputs_dir") pdf_files_to_process = [pdf for pdf in [pdf_digital, buffett_pdf, backprop_pdf] if pdf] if not pdf_files_to_process: console.print("[yellow]Skipping Demo 6: No PDF files available.[/]") return def get_output_path(base_name: str, process_type: str, format_name: str) -> str: return str(conversion_outputs_dir / f"{base_name}_{process_type}.{format_name}") for pdf_file in pdf_files_to_process: try: console.print( Panel( f"Preparing Content for Chunking/Tables from: [cyan]{pdf_file.name}[/]", border_style="dim", ) ) success, conv_result = await safe_tool_call( f"Get MD for {pdf_file.name}", convert_document, # Call standalone function tracker=tracker, document_path=str(pdf_file), output_format="markdown", extraction_strategy="direct_text", enhance_with_llm=False, # Use raw for speed ) if not success or not conv_result.get("content"): console.print( f"[red]Failed to get content for {pdf_file.name}. Skipping chunk/table demo for this file.[/]" ) continue markdown_content = conv_result["content"] console.print("[green]✓ Content prepared.[/]") # --- Chunking Demonstrations --- console.print(Rule(f"Document Chunking for {pdf_file.name}", style="dim")) chunking_configs = [ {"method": "paragraph", "size": 500, "overlap": 50}, {"method": "character", "size": 800, "overlap": 100}, {"method": "token", "size": 200, "overlap": 20}, {"method": "section", "size": 1000, "overlap": 0}, ] for config in chunking_configs: method, size, overlap = config["method"], config["size"], config["overlap"] if method == "token" and not _TIKTOKEN_AVAILABLE: console.print( f"[yellow]Skipping chunking method '{method}': Tiktoken not available.[/]" ) continue output_path = get_output_path(pdf_file.stem, f"chunks_{method}", "json") success, result = await safe_tool_call( f"Chunking {pdf_file.name} ({method.capitalize()})", chunk_document, # Call standalone function tracker=tracker, document=markdown_content, chunk_method=method, chunk_size=size, chunk_overlap=overlap, ) if success: try: result_to_save = {k: v for k, v in result.items() if not k.startswith("_")} Path(output_path).write_text( json.dumps(result_to_save, indent=2), encoding="utf-8" ) console.print(f"[green]✓ Saved chunks to: [blue underline]{output_path}[/]") except Exception as e: console.print(f"[red]Error saving chunks: {e}[/]") display_result(f"Chunking {pdf_file.name} ({method}, size={size})", result) # --- Table Extraction (Requires Docling) --- console.print( Rule(f"Table Extraction for {pdf_file.name} (Requires Docling)", style="dim") ) if _DOCLING_AVAILABLE: tables_dir = conversion_outputs_dir / f"{pdf_file.stem}_tables" tables_dir.mkdir(exist_ok=True) # 6a: Extract as CSV success, result_csv = await safe_tool_call( f"Extract {pdf_file.name} Tables (CSV)", extract_tables, # Call standalone function tracker=tracker, document_path=str(pdf_file), table_mode="csv", output_dir=str(tables_dir / "csv"), ) if success and result_csv.get("tables"): display_result( f"Extract {pdf_file.name} Tables (CSV)", result_csv, {"display_keys": ["tables", "saved_files"], "detail_level": 0}, ) if result_csv["tables"]: console.print( Panel( escape(result_csv["tables"][0][:500]) + "...", title="First Table Preview (CSV)", ) ) elif success: console.print(f"[yellow]No tables found by Docling in {pdf_file.name}.[/]") # 6b: Extract as JSON success, result_json = await safe_tool_call( f"Extract {pdf_file.name} Tables (JSON)", extract_tables, # Call standalone function tracker=tracker, document_path=str(pdf_file), table_mode="json", output_dir=str(tables_dir / "json"), ) if success and result_json.get("tables"): display_result( f"Extract {pdf_file.name} Tables (JSON)", result_json, {"display_keys": ["tables"], "detail_level": 1}, ) # 6c: Extract as Pandas DataFrame (if available) if _PANDAS_AVAILABLE: success, result_pd = await safe_tool_call( f"Extract {pdf_file.name} Tables (Pandas)", extract_tables, # Call standalone function tracker=tracker, document_path=str(pdf_file), table_mode="pandas", output_dir=str(tables_dir / "pandas_csv"), # Save as csv ) if success and result_pd.get("tables"): display_result( f"Extract {pdf_file.name} Tables (Pandas)", result_pd, {"display_keys": ["tables"], "detail_level": 0}, ) if result_pd["tables"]: first_df = result_pd["tables"][0] if hasattr(first_df, "shape") and hasattr( first_df, "columns" ): # Check if it looks like a DataFrame console.print( Panel( f"First DataFrame Info:\nShape: {first_df.shape}\nColumns: {list(first_df.columns)}", title="First DataFrame Preview", ) ) else: console.print( f"[yellow]Pandas result format unexpected: {type(first_df)}[/]" ) else: console.print( "[yellow]Pandas unavailable, skipping Pandas table extraction.[/]" ) else: console.print("[yellow]Docling unavailable, skipping table extraction demo.[/]") except Exception as e: logger.error(f"Error processing {pdf_file.name} in Sec 6: {e}", exc_info=True) console.print(f"[bold red]Error processing {pdf_file.name}:[/] {e}") async def demo_section_7_analysis(sample_files: Dict[str, Path], tracker: CostTracker) -> None: """Demonstrate the document analysis tools.""" console.print(Rule("[bold green]Demo 7: Document Analysis Suite[/]", style="green")) logger.info("Starting Demo Section 7: Document Analysis Suite") pdf_digital = sample_files.get("pdf_digital") buffett_pdf = sample_files.get("buffett_pdf") backprop_pdf = sample_files.get("backprop_pdf") conversion_outputs_dir = sample_files.get("conversion_outputs_dir") pdf_files_to_process = [pdf for pdf in [pdf_digital, buffett_pdf, backprop_pdf] if pdf] if not pdf_files_to_process: console.print("[yellow]Skipping Demo 7: No PDF files available.[/]") return def get_output_path(base_name: str, analysis_type: str, format_name: str = "json") -> str: return str(conversion_outputs_dir / f"{base_name}_analysis_{analysis_type}.{format_name}") for pdf_file in pdf_files_to_process: console.print( Panel(f"Preparing Text for Analysis from: [cyan]{pdf_file.name}[/]", border_style="dim") ) success, conv_result = await safe_tool_call( f"Get Text for {pdf_file.name}", convert_document, # Call standalone function tracker=tracker, document_path=str(pdf_file), output_format="markdown", extraction_strategy="direct_text", enhance_with_llm=False, ) if not success or not conv_result.get("content"): console.print(f"[red]Failed to get text for analysis of {pdf_file.name}.[/]") continue analysis_text = conv_result["content"] console.print("[green]✓ Content prepared.[/]") console.print( Panel( escape(truncate_text_by_lines(analysis_text[:600])), title=f"Text Preview for {pdf_file.name}", border_style="dim", ) ) entities_result_for_canon = None # 7.1 Identify Sections output_path = get_output_path(pdf_file.stem, "sections") success, result = await safe_tool_call( f"Identify Sections in {pdf_file.name}", identify_sections, document=analysis_text, tracker=tracker, ) if success: try: result_to_save = {k: v for k, v in result.items() if not k.startswith("_")} Path(output_path).write_text(json.dumps(result_to_save, indent=2), encoding="utf-8") console.print( f"[green]✓ Saved sections analysis to: [blue underline]{output_path}[/]" ) except Exception as e: console.print(f"[red]Error saving analysis: {e}[/]") display_result(f"Identify Sections ({pdf_file.name})", result) # 7.2 Extract Entities output_path = get_output_path(pdf_file.stem, "entities") success, result = await safe_tool_call( f"Extract Entities from {pdf_file.name}", extract_entities, document=analysis_text, tracker=tracker, ) if success: entities_result_for_canon = result # Save for next step try: result_to_save = {k: v for k, v in result.items() if not k.startswith("_")} Path(output_path).write_text(json.dumps(result_to_save, indent=2), encoding="utf-8") console.print( f"[green]✓ Saved entities analysis to: [blue underline]{output_path}[/]" ) except Exception as e: console.print(f"[red]Error saving analysis: {e}[/]") display_result(f"Extract Entities ({pdf_file.name})", result) # 7.3 Canonicalise Entities if entities_result_for_canon and entities_result_for_canon.get("entities"): output_path = get_output_path(pdf_file.stem, "canon_entities") success, result = await safe_tool_call( f"Canonicalise Entities for {pdf_file.name}", canonicalise_entities, entities_input=entities_result_for_canon, tracker=tracker, ) if success: try: result_to_save = {k: v for k, v in result.items() if not k.startswith("_")} Path(output_path).write_text( json.dumps(result_to_save, indent=2), encoding="utf-8" ) console.print( f"[green]✓ Saved canonicalized entities to: [blue underline]{output_path}[/]" ) except Exception as e: console.print(f"[red]Error saving analysis: {e}[/]") display_result(f"Canonicalise Entities ({pdf_file.name})", result) else: console.print( f"[yellow]Skipping canonicalization for {pdf_file.name} (no entities).[/]" ) # 7.4 Generate QA Pairs output_path = get_output_path(pdf_file.stem, "qa_pairs") success, result = await safe_tool_call( f"Generate QA Pairs for {pdf_file.name}", generate_qa_pairs, document=analysis_text, num_questions=4, tracker=tracker, ) if success: try: result_to_save = {k: v for k, v in result.items() if not k.startswith("_")} Path(output_path).write_text(json.dumps(result_to_save, indent=2), encoding="utf-8") console.print(f"[green]✓ Saved QA pairs to: [blue underline]{output_path}[/]") except Exception as e: console.print(f"[red]Error saving QA pairs: {e}[/]") display_result(f"Generate QA Pairs ({pdf_file.name})", result) # 7.5 Summarize Document output_path = get_output_path(pdf_file.stem, "summary", "md") success, result = await safe_tool_call( f"Summarize {pdf_file.name}", summarize_document, document=analysis_text, max_length=100, tracker=tracker, ) if success: try: Path(output_path).write_text(result.get("summary", ""), encoding="utf-8") except Exception as e: console.print(f"[red]Error saving summary: {e}[/]") else: console.print(f"[green]✓ Saved summary to: [blue underline]{output_path}[/]") display_result( f"Summarize {pdf_file.name}", result, {"format_key": {"summary": "text"}} ) # 7.6 Extract Metrics (Domain specific) output_path = get_output_path(pdf_file.stem, "metrics") success, result = await safe_tool_call( f"Extract Metrics from {pdf_file.name}", extract_metrics, document=analysis_text, tracker=tracker, ) if success: try: result_to_save = {k: v for k, v in result.items() if not k.startswith("_")} Path(output_path).write_text(json.dumps(result_to_save, indent=2), encoding="utf-8") console.print(f"[green]✓ Saved metrics to: [blue underline]{output_path}[/]") except Exception as e: console.print(f"[red]Error saving metrics: {e}[/]") display_result(f"Extract Metrics ({pdf_file.name})", result) if not result.get("metrics"): console.print(f"[yellow]Note: No pre-defined metrics found in {pdf_file.name}.[/]") # 7.7 Flag Risks (Domain specific) output_path = get_output_path(pdf_file.stem, "risks") success, result = await safe_tool_call( f"Flag Risks in {pdf_file.name}", flag_risks, document=analysis_text, tracker=tracker ) if success: try: result_to_save = {k: v for k, v in result.items() if not k.startswith("_")} Path(output_path).write_text(json.dumps(result_to_save, indent=2), encoding="utf-8") console.print(f"[green]✓ Saved risks analysis to: [blue underline]{output_path}[/]") except Exception as e: console.print(f"[red]Error saving risks analysis: {e}[/]") display_result(f"Flag Risks ({pdf_file.name})", result) if not result.get("risks"): console.print(f"[yellow]Note: No pre-defined risks found in {pdf_file.name}.[/]") async def demo_section_8_batch_processing( sample_files: Dict[str, Path], tracker: CostTracker ) -> None: """Demonstrate the standalone batch processing pipeline.""" console.print(Rule("[bold green]Demo 8: Advanced Batch Processing[/]", style="green")) logger.info("Starting Demo Section 8: Batch Processing") pdf_digital = sample_files.get("pdf_digital") buffett_pdf = sample_files.get("buffett_pdf") image_file = sample_files.get("image") conversion_outputs_dir = sample_files.get("conversion_outputs_dir") # noqa: F841 # --- Prepare Batch Inputs --- batch_inputs = [] if pdf_digital: batch_inputs.append({"document_path": str(pdf_digital), "item_id": "pdf1"}) if buffett_pdf: batch_inputs.append({"document_path": str(buffett_pdf), "item_id": "pdf2"}) if image_file: batch_inputs.append({"image_path": str(image_file), "item_id": "img1"}) # Use image_path if not batch_inputs: console.print("[yellow]Skipping batch demo: No suitable input files found.[/]") return console.print(f"Prepared {len(batch_inputs)} items for batch processing.") # --- Define Batch Operations Pipeline --- # NOTE: We access nested results using input_keys_map pointing to the # output_key of a previous step (e.g., "conversion_result") # and then assume the batch processor can handle accessing the nested 'content' field. # If the batch processor *only* supports top-level keys in input_keys_map, # this structure would need further adjustment (e.g., adding intermediate steps # to explicitly pull nested data to the top level if promotion isn't flexible enough). # Let's proceed assuming the worker logic can handle `item_state[state_key]` # where `state_key` refers to a previous output key, and we'll access `.content` inside the worker if needed. # ***Correction***: The worker does NOT handle nested access via dot notation in the map value. # The map value MUST be a key present in the top-level item_state. # WORKAROUND: Do not promote output from Step 1. Have subsequent steps map their # input argument to the desired nested key within the state using `input_keys_map`. # The batch worker needs modification to support this. Let's try the workaround. batch_operations = [ # Step 1: Convert PDF/OCR Image to Markdown { "operation": "convert_document", "output_key": "conversion_result", # Result stored here "params": { "output_format": "markdown", "extraction_strategy": "hybrid_direct_ocr", "enhance_with_llm": True, "ocr_options": {"dpi": 200}, "accelerator_device": ACCELERATOR_DEVICE, }, # REMOVED "promote_output": "content" }, # Step 2: Chunk the resulting markdown content from Step 1 { "operation": "chunk_document", # The worker needs to know the input arg name ('document') and the state key to get it from. "input_keys_map": { "document": "conversion_result" }, # Map 'document' arg to the dict from step 1 "output_key": "chunking_result", "params": {"chunk_method": "paragraph", "chunk_size": 750, "chunk_overlap": 75}, # If we wanted chunks available later, we could promote here: # "promote_output": "chunks" }, # Step 3: Generate QA pairs using the *original* markdown from Step 1 { "operation": "generate_qa_pairs", "input_keys_map": { "document": "conversion_result" }, # Map 'document' arg to the dict from step 1 "output_key": "qa_result", "params": {"num_questions": 3}, }, # Step 4: Summarize the original converted content from Step 1 { "operation": "summarize_document", "input_keys_map": { "document": "conversion_result" }, # Map 'document' arg to the dict from step 1 "output_key": "summary_result", "params": {"max_length": 80}, }, ] # --- Adjusting the worker function to handle dictionary input via input_keys_map --- # The batch processor's worker (_apply_op_to_item_worker) needs a slight modification # to handle the case where input_keys_map points to a dictionary result from a previous step, # and we need to extract a specific field (like 'content') from it. # Let's modify the worker logic conceptually (assuming this change is made in the actual tool file): # Inside _apply_op_to_item_worker, when processing input_keys_map: # ```python # # ... inside worker ... # if isinstance(op_input_map, dict): # for param_name, state_key in op_input_map.items(): # if state_key not in item_state: # raise ToolInputError(...) # # mapped_value = item_state[state_key] # # # *** ADDED LOGIC *** # # If mapped value is a dict from a previous step, and the param_name suggests content ('document', 'text', etc.) # # try to extract the 'content' field from that dictionary. # if isinstance(mapped_value, dict) and param_name in ["document", "text", "content"]: # content_value = mapped_value.get("content") # if content_value is not None: # mapped_value = content_value # else: # # Maybe try other common keys or raise error if 'content' expected but missing # logger.warning(f"Mapped input '{state_key}' is dict, but key 'content' not found for param '{param_name}'") # # Fallback to using the whole dict? Or fail? Let's use whole dict as fallback for now. # # *** END ADDED LOGIC *** # # # Assign the potentially extracted value # if param_name != primary_input_arg_name: # call_kwargs[param_name] = mapped_value # elif call_kwargs.get(primary_input_arg_name) != mapped_value: # Use .get() for safety # logger.warning(...) # call_kwargs[primary_input_arg_name] = mapped_value # # ... rest of worker ... # ``` # **Assuming this modification is made in the `process_document_batch`'s internal worker**, # the pipeline definition above should now work correctly. console.print( Panel("Defined Batch Pipeline (Corrected Input Mapping):", border_style="magenta") ) console.print(Syntax(json.dumps(batch_operations, indent=2), "json", theme="default")) # --- Execute Batch Processing --- console.print(f"\nExecuting batch pipeline with concurrency {MAX_CONCURRENT_TASKS}...") try: # Call the standalone batch processing function batch_results = await process_document_batch( inputs=batch_inputs, operations=batch_operations, max_concurrency=MAX_CONCURRENT_TASKS ) console.print(Rule("[bold]Batch Processing Results[/]", style="blue")) # --- Display Batch Results --- if not batch_results: console.print("[yellow]Batch processing returned no results.[/]") else: console.print(f"Processed {len(batch_results)} items.") for i, item_result in enumerate(batch_results): item_id = item_result.get("item_id", f"Item {i}") status = item_result.get("_status", "unknown") color = ( "green" if status == "processed" else "red" if status == "failed" else "yellow" ) console.print( Rule(f"Result for: [bold {color}]{item_id}[/] (Status: {status})", style=color) ) outputs_table = Table(title="Generated Outputs", box=box.MINIMAL, show_header=False) outputs_table.add_column("Step", style="cyan") outputs_table.add_column("Output Key", style="magenta") outputs_table.add_column("Preview / Status", style="white") for op_spec in batch_operations: key = op_spec["output_key"] step_result = item_result.get(key) preview = "[dim]Not generated[/]" if step_result and isinstance(step_result, dict): step_success = step_result.get("success", False) preview = ( "[green]Success[/]" if step_success else f"[red]Failed: {step_result.get('error_code', 'ERROR')}[/]" ) if step_success: if "content" in step_result and isinstance(step_result["content"], str): preview += f" (Content len: {len(step_result['content'])})" elif "chunks" in step_result and isinstance( step_result["chunks"], list ): preview += f" ({len(step_result['chunks'])} chunks)" elif "summary" in step_result and isinstance( step_result.get("summary"), str ): preview += f" (Summary len: {len(step_result['summary'])})" elif "qa_pairs" in step_result and isinstance( step_result.get("qa_pairs"), list ): preview += f" ({len(step_result['qa_pairs'])} pairs)" elif "metrics" in step_result and isinstance( step_result.get("metrics"), dict ): preview += f" ({len(step_result['metrics'])} metrics)" elif "risks" in step_result and isinstance( step_result.get("risks"), dict ): preview += f" ({len(step_result['risks'])} risks)" # Add other previews as needed outputs_table.add_row(op_spec["operation"], key, preview) console.print(outputs_table) if item_result.get("_error_log"): error_panel_content = Text() for err in item_result["_error_log"]: error_panel_content.append( Text.from_markup(f"- [yellow]{escape(err)}[/]\n") ) console.print( Panel(error_panel_content, title="Error Log", border_style="yellow") ) console.print("-" * 30) # Separator except Exception as e: logger.error(f"Batch processing demo failed: {e}", exc_info=True) console.print(f"[bold red]Error during batch processing execution:[/]\n{e}") async def main(): """Main function to run the DocumentProcessingTool demo.""" console.print(Rule("[bold] Document Processing Standalone Functions Demo [/bold]", style="blue")) if not MCP_COMPONENTS_LOADED: # Error already printed during import attempt sys.exit(1) # Set logger level based on environment variable console.print(f"Docling Available: {_DOCLING_AVAILABLE}") console.print(f"Pandas Available: {_PANDAS_AVAILABLE}") console.print(f"Tiktoken Available: {_TIKTOKEN_AVAILABLE}") console.print(f"Using Accelerator: {ACCELERATOR_DEVICE}") try: # Create a CostTracker instance tracker = CostTracker() # Create gateway - still useful for initializing providers if needed by underlying tools like generate_completion gateway = Gateway("doc-proc-standalone-demo", register_tools=False) # Don't register the old tool logger.info("Initializing gateway and providers (needed for potential LLM calls)...", emoji_key="provider") try: await gateway._initialize_providers() logger.info("Providers initialized.") except Exception as init_e: logger.error(f"Failed to initialize providers: {init_e}", exc_info=True) console.print("[red]Error initializing providers. LLM-dependent operations might fail.[/]") # --- Prepare sample files --- logger.info("Setting up sample files and directories...", emoji_key="setup") DEFAULT_SAMPLE_DIR.mkdir(parents=True, exist_ok=True) DOWNLOADED_FILES_DIR.mkdir(parents=True, exist_ok=True) conversion_outputs_dir = DEFAULT_SAMPLE_DIR / "conversion_outputs" conversion_outputs_dir.mkdir(exist_ok=True) logger.info(f"Outputs will be saved in: {conversion_outputs_dir}") sample_files: Dict[str, Any] = {"conversion_outputs_dir": conversion_outputs_dir} # --- Download Files Concurrently (No shared progress bar) --- # The download_file_with_progress function will create its own transient progress bar # if no 'progress' object is passed. console.print(Rule("Downloading Sample Files", style="blue")) download_tasks = [ download_file_with_progress(DEFAULT_SAMPLE_PDF_URL, DOWNLOADED_FILES_DIR / "attention_is_all_you_need.pdf", "Transformer Paper (PDF)"), # No progress obj passed download_file_with_progress(DEFAULT_SAMPLE_IMAGE_URL, DOWNLOADED_FILES_DIR / "sample_ocr_image.tif", "Sample OCR Image (TIFF)"), # No progress obj passed download_file_with_progress(SAMPLE_HTML_URL, DOWNLOADED_FILES_DIR / "transformer_wiki.html", "Transformer Wiki (HTML)"), # No progress obj passed download_file_with_progress(BUFFETT_SHAREHOLDER_LETTER_URL, DOWNLOADED_FILES_DIR / "buffett_letter_2022.pdf", "Buffett Letter (PDF)"), # No progress obj passed download_file_with_progress(BACKPROPAGATION_PAPER_URL, DOWNLOADED_FILES_DIR / "backprop_paper.pdf", "Backprop Paper (PDF)"), # No progress obj passed ] download_results = await asyncio.gather(*download_tasks) console.print(Rule("Downloads Complete", style="blue")) sample_files["pdf_digital"] = download_results[0] sample_files["image"] = download_results[1] sample_files["html"] = download_results[2] sample_files["buffett_pdf"] = download_results[3] sample_files["backprop_pdf"] = download_results[4] # --- Run Demo Sections --- # Pass the necessary sample_files dict and the tracker await demo_section_1_conversion_ocr(sample_files, tracker) await demo_section_2_dedicated_ocr(sample_files, tracker) await demo_section_3_enhance_text(sample_files, tracker) await demo_section_4_html_markdown(sample_files, tracker) await demo_section_5_analyze_structure(sample_files, tracker) await demo_section_6_chunking_tables(sample_files, tracker) await demo_section_7_analysis(sample_files, tracker) await demo_section_8_batch_processing(sample_files, tracker) # --- Display Final Cost Summary --- console.print(Rule("[bold]Demo Complete - Cost Summary[/]", style="blue")) tracker.display_summary(console) except Exception as e: logger.critical(f"Demo execution failed critically: {str(e)}", exc_info=True) console.print_exception(show_locals=True) # Use Rich's exception printing return 1 logger.info("Demo finished successfully.") return 0 if __name__ == "__main__": # Run the demo exit_code = asyncio.run(main()) sys.exit(exit_code)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kappasig920/Ultimate-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server