Skip to main content
Glama
CODE_WALKTHROUGH.md21.2 kB
# Code Walkthrough: LandingAI ADE MCP Server ## Architecture Overview This MCP (Model Context Protocol) server provides direct integration with LandingAI's ADE (Agentic Document Extraction) REST API. It acts as a bridge between Claude Desktop (or other MCP clients) and the LandingAI API. ### Key Design Principles 1. **Direct API Integration**: Uses `httpx` for HTTP requests instead of SDK dependencies 2. **Memory Management**: Automatically saves large files to disk to prevent context window overflow 3. **Transparent Error Handling**: Provides detailed error messages with fallback options 4. **Pre-signed URL Support**: Handles S3 URLs without authentication headers ### Global Configuration ```python API_BASE_URL = "https://api.va.landing.ai" API_KEY = os.environ.get("LANDINGAI_API_KEY") ``` - API key must be set via LANDINGAI_API_KEY environment variable - All API requests use Bearer token authentication - Base URL points to LandingAI's Vision Agent API ### Architecture: Internal Functions Pattern To solve the issue where MCP-decorated functions become FunctionTool objects that can't be called directly, the code uses an internal functions pattern: ```python # Internal function with actual API logic async def _parse_document_internal(...): # All the API logic here # MCP tool is a thin wrapper @mcp.tool() async def parse_document(...): return await _parse_document_internal(...) ``` This pattern allows: - `process_folder` to call internal functions directly - No code duplication between tools - Clean separation of API logic from MCP interface --- ## Tool Logic Explanations ### 1. `parse_document` - Document Parsing **Purpose**: Parse PDFs, images, and office documents to extract text, tables, and visual elements. #### Input Validation ```python if document_path: # Local file processing elif document_url: # URL processing else: return error # Must provide one source ``` #### Core Logic Flow 1. **File Size Check** (for local files): ```python file_size_mb = path.stat().st_size / (1024 * 1024) if file_size_mb > 50: logger.warning("Large file detected. Consider using create_parse_job") ``` - Files > 50MB should use async jobs for better performance 2. **Multipart Form Upload** (for local files): ```python with open(path, 'rb') as f: files = {'document': (path.name, f, 'application/octet-stream')} data = {} if model: data['model'] = model if split: data['split'] = split ``` - File uploaded as multipart form data - Optional parameters only included if specified 3. **URL Processing**: ```python data = {'document_url': document_url} if model: data['model'] = model if split: data['split'] = split ``` - URL sent as form data, not multipart #### Response Handling ```python if result.get("status") == "success": # Add helpful summaries result["chunks_count"] = len(result["chunks"]) result["page_count"] = result["metadata"]["page_count"] result["markdown_length"] = len(result["markdown"]) ``` - Adds summary information for easier consumption - Returns markdown, chunks, splits, grounding, and metadata #### Design Decisions - **Why check file size?** To warn users about potential timeouts - **Why multipart for files but not URLs?** API expects different content types - **Why add summaries?** Helps users quickly understand parse results --- ### 2. `extract_data` - Structured Data Extraction **Purpose**: Extract structured data from markdown using JSON schemas. #### Parameter Handling ```python # markdown parameter can be either: # 1. File path (string or Path object) # 2. Markdown content string ``` #### Core Logic Flow 1. **Schema Validation**: ```python if not isinstance(schema, dict): return error if "type" not in schema: schema = {"type": "object", "properties": schema} ``` - Ensures schema is valid JSON - Adds default type if missing 2. **Input Detection**: ```python # Try to interpret as file path try: path = Path(markdown) if path.exists() and path.is_file(): is_file = True except: is_file = False # Treat as content string ``` - Intelligently detects if markdown is a file path or content - Falls back to content if path check fails 3. **File Upload**: ```python if is_file: with open(path, 'rb') as f: files = { 'schema': (None, schema_json, 'application/json'), 'markdown': (path.name, f, 'text/markdown') } else: # Direct content files = { 'schema': (None, schema_json, 'application/json'), 'markdown': ('content.md', markdown.encode('utf-8'), 'text/markdown') } ``` #### Response Structure ```python # Expected response: { "extraction": {...}, # Extracted key-value pairs "extraction_metadata": {...}, # Extraction details "metadata": {...} # Processing metadata } ``` #### Design Decisions - **Why auto-detect file vs content?** Better user experience - **Why validate schema?** Prevents API errors - **Why use multipart?** API expects schema and markdown as separate parts --- ### 3. `create_parse_job` - Async Job Creation **Purpose**: Handle large documents (>50MB) with background processing. #### Core Logic Flow 1. **File Size Logging**: ```python file_size_mb = path.stat().st_size / (1024 * 1024) logger.info(f"File size: {file_size_mb:.2f} MB") ``` - Helps users understand why async is needed 2. **Optional Parameters**: ```python data = {} if model: data['model'] = model if split: data['split'] = split if output_save_url: data['output_save_url'] = output_save_url ``` - Zero Data Retention support via output_save_url - Only sends parameters that are provided 3. **202 Accepted Response**: ```python if response.status_code == 202: result = response.json() job_id = result.get('job_id') return { "status": "success", "job_id": job_id, "message": "Parse job created. Use get_parse_job_status to check progress." } ``` #### Design Decisions - **Why log file size?** Transparency about processing requirements - **Why 202 status?** Standard HTTP code for accepted async operations - **Why suggest next step?** Guides users through async workflow --- ### 4. `get_parse_job_status` - Job Status & Results Retrieval **Purpose**: Check job progress and retrieve results, handling large files specially. #### Core Logic Flow 1. **Status Check**: ```python job_status = result.get("status", "unknown") progress = result.get("progress", 0) ``` 2. **Three Result Scenarios**: **A. Small Files (< 1MB) - Inline Data**: ```python if "data" in result and result["data"]: # Data included directly in response result["_message"] = "Job completed. Results available in 'data' field." ``` **B. Large Files (> 1MB) - Output URL**: ```python elif "output_url" in result and result["output_url"]: output_url = result["output_url"] print(f"📎 Output URL: {output_url}") # Visibility # Auto-fetch from S3 async with httpx.AsyncClient(timeout=60.0) as fetch_client: fetch_response = await fetch_client.get(output_url) # No auth headers! if fetch_response.status_code == 200: fetched_data = fetch_response.json() # Save to file (it's large!) output_file = f"/tmp/{job_id}_{timestamp}_output.md" with open(output_file, 'w') as f: f.write(fetched_data["markdown"]) # Return path, not content result["data_file"] = output_file result["preview"] = markdown_content[:1000] + "..." ``` **C. No Data**: ```python else: result["_message"] = "Job completed but no data available." ``` 3. **Progress Updates**: ```python elif job_status == "processing": result["_message"] = f"Job in progress: {progress*100:.1f}% complete" elif job_status == "pending": result["_message"] = "Job is queued and waiting to be processed" elif job_status == "failed": result["_message"] = f"Job failed: {failure_reason}" ``` #### Key Design Decisions 1. **Why print output_url?** - Transparency - user sees what's happening - Debugging - URL available if auto-fetch fails 2. **Why save large files to disk?** - Files > 1MB would overflow Claude's context window - Disk storage prevents memory issues - User gets file path for further processing 3. **Why no auth headers for S3?** - Pre-signed URLs include auth in the URL itself - Adding headers causes authentication errors - Matches standard S3 client behavior 4. **Why include preview?** - Shows first 1000 chars for verification - Doesn't overflow context - Helps identify content --- ### 5. `list_parse_jobs` - Job Listing **Purpose**: List all parse jobs with filtering and pagination. #### Parameter Validation ```python # Defaults and limits actual_page = page if page is not None else 0 actual_page_size = pageSize if pageSize is not None else 10 # Validation if actual_page < 0: return error("Page must be >= 0") if actual_page_size < 1: return error("Page size must be >= 1") if actual_page_size > 100: actual_page_size = 100 # Cap at API maximum ``` #### Response Enhancement ```python # Add pagination info result["pagination"] = { "current_page": actual_page, "page_size": actual_page_size, "items_on_page": len(jobs), "has_more": result.get("has_more", False) } # Status summary status_counts = {} for job in jobs: job_status = job.get("status", "unknown") status_counts[job_status] = status_counts.get(job_status, 0) + 1 result["status_summary"] = status_counts ``` #### Design Decisions - **Why validate pagination?** Prevents API errors - **Why add summaries?** Helps users understand job distribution - **Why convert timestamps?** Human-readable dates --- ### 6. `process_folder` - Batch Document Processing **Purpose**: Process all supported files in a folder for parsing or structured data extraction. **Important**: This tool calls the internal functions (`_parse_document_internal`, `_extract_data_internal`, `_create_parse_job_internal`) directly to avoid FunctionTool wrapper issues. #### Input Validation ```python # Validate operation mode if operation not in ["parse", "extract"]: return error # Extract mode requires schema if operation == "extract" and not schema: return error("Schema required for extract") # Check folder exists folder = Path(folder_path) if not folder.exists() or not folder.is_dir(): return error ``` #### Core Logic Flow 1. **File Discovery**: ```python SUPPORTED_EXTENSIONS = { # Images '.apng', '.bmp', '.dcx', '.dds', '.dib', '.gd', '.gif', '.icns', '.jp2', '.jpeg', '.jpg', '.pcx', '.png', '.ppm', '.psd', '.tga', '.tiff', '.tif', '.webp', # Documents '.pdf', '.doc', '.docx', '.ppt', '.pptx', '.odp', '.odt' } # Filter by requested types if file_types: # e.g., "pdf,jpg" requested_exts = {f".{ext.lower()}" for ext in file_types.split(",")} allowed_exts = requested_exts & SUPPORTED_EXTENSIONS # Find all matching files for ext in allowed_exts: files.extend(folder.glob(f"*{ext}")) files.extend(folder.glob(f"*{ext.upper()}")) ``` 2. **Size-Based Grouping**: ```python small_files = [] # < 50MB - direct processing large_files = [] # >= 50MB - use jobs for file in all_files: size_mb = file.stat().st_size / (1024 * 1024) if size_mb < 50: small_files.append(file) else: large_files.append(file) ``` 3. **Batch Processing Small Files**: ```python # Process in batches to respect rate limits for i in range(0, len(small_files), max_concurrent): batch = small_files[i:i + max_concurrent] # Create tasks based on operation - using internal functions! if operation == "parse": tasks = [_parse_document_internal(str(f)) for f in batch] else: # extract # Custom async function for parse + extract async def parse_and_extract(fp=file_path, s=schema): parse_result = await _parse_document_internal(str(fp)) extract_result = await _extract_data_internal(s, parse_result["markdown"]) return combine_results(parse_result, extract_result) tasks.append(parse_and_extract()) # Execute concurrently results = await asyncio.gather(*tasks, return_exceptions=True) ``` 4. **Large File Processing with Jobs**: ```python # Create jobs for large files - using internal function! jobs = [] for file in large_files: job_result = await _create_parse_job_internal(str(file)) if job_result["status"] == "success": jobs.append({"file": file, "job_id": job_result["job_id"]}) # Monitor jobs while pending_jobs: await asyncio.sleep(5) for job in pending_jobs[:]: status = await get_parse_job_status(job["job_id"]) if status["status"] == "completed": pending_jobs.remove(job) # Process results based on operation ``` 5. **Extract Mode Processing**: ```python async def process_file_for_extraction(file_path, schema): # Step 1: Parse document parse_result = await parse_document(file_path) # Step 2: Extract structured data from markdown extract_result = await extract_data( schema=schema, markdown=parse_result["markdown"] ) # Combine results return { "extraction": extract_result["extraction"], "markdown": parse_result["markdown"], "metadata": parse_result["metadata"] } ``` 6. **Result Organization**: ```python # Create output directory output_dir = folder / "ade_results" output_dir.mkdir(exist_ok=True) # Save results for each file for file in processed_files: file_dir = output_dir / f"{file.stem}_{operation}" file_dir.mkdir(exist_ok=True) if operation == "parse": # Save markdown, metadata, chunks (file_dir / "content.md").write_text(markdown) (file_dir / "metadata.json").write_text(json.dumps(metadata)) else: # extract # Save extracted data and source (file_dir / "data.json").write_text(json.dumps(extraction)) (file_dir / "source.md").write_text(markdown) # Save summary (output_dir / "summary.json").write_text(json.dumps({ "operation": operation, "total_files": len(all_files), "processed": len(processed_files), "failed": len(failed_files), "processing_time_s": processing_time })) ``` #### Aggregation for Extract Mode ```python aggregated_data = [] # Collect all extracted data for result in extraction_results: aggregated_data.append({ "source_file": file.name, **result["extraction"] # Spread extracted fields }) # Save aggregated data (output_dir / "extracted_data.json").write_text( json.dumps(aggregated_data, indent=2) ) ``` #### Design Decisions 1. **Why two operation modes?** - Parse: For general document processing, OCR, text extraction - Extract: For structured data extraction using schemas - Covers both exploratory and production use cases 2. **Why size-based grouping?** - Small files (<50MB): Fast enough for direct processing - Large files: Would timeout, need async jobs - Optimizes throughput vs resource usage 3. **Why max_concurrent parameter?** - Prevents API rate limit errors - Default of 15 balances speed and safety - User can adjust based on their API tier 4. **Why save to ade_results folder?** - Keeps original files untouched - Organized structure for downstream processing - Easy to find and delete results 5. **Why continue on errors?** - One corrupt file shouldn't stop batch processing - Failed files tracked in summary - Maximizes successful extractions 6. **Why aggregated_data for extract?** - Common need to combine extracted data - Ready for analysis or database import - Maintains source file tracking #### Error Handling ```python # Individual file errors don't stop batch try: result = await parse_document(file) except Exception as e: failed_files.append({ "filename": file.name, "error": str(e) }) continue # Process next file # Job failures tracked separately if status["status"] == "failed": failed_files.append({ "filename": job["file"].name, "error": status.get("failure_reason") }) ``` #### Usage Patterns **Bulk Invoice Processing**: ```python # Extract data from hundreds of invoices result = await process_folder( folder_path="/invoices/2024", operation="extract", schema=invoice_schema, file_types="pdf,jpg" # PDFs and scanned images ) # Access aggregated data df = pd.DataFrame(result["aggregated_data"]) total_amount = df["total"].sum() ``` **Document Migration**: ```python # Convert all documents to markdown result = await process_folder( folder_path="/legacy_docs", operation="parse", model="dpt-2-latest" ) # All markdown files now in ade_results/ ``` **Mixed Document Analysis**: ```python # Process everything, let API determine best approach result = await process_folder( folder_path="/research_papers", save_results=True # Keep all outputs ) ``` --- ### 7. `health_check` - Server Status **Purpose**: Verify server health and API connectivity. #### Logic Flow ```python result = { "status": "healthy", "server": "LandingAI ADE MCP Server", "api_key_configured": bool(API_KEY), "available_tools": [list of tools] } # Test API connectivity if API_KEY: try: response = await client.get( f"{API_BASE_URL}/v1/ade/parse/jobs", params={'pageSize': 1} ) if response.status_code == 200: result["api_connectivity"] = "connected" elif response.status_code == 401: result["api_connectivity"] = "invalid_api_key" except Exception as e: result["api_connectivity"] = f"connection_failed: {str(e)}" ``` #### Design Decisions - **Why test with list jobs?** Lightweight endpoint for connectivity check - **Why include tool list?** Helps users discover available functionality --- ## Common Patterns ### 1. Error Handling Pattern ```python try: # Main logic response = await client.post(...) return handle_api_response(response) except httpx.TimeoutException: return {"status": "error", "error": "Request timeout"} except Exception as e: return {"status": "error", "error": str(e)} ``` ### 2. Optional Parameter Pattern ```python data = {} if param1: data['param1'] = param1 if param2: data['param2'] = param2 # Only send data if not empty response = await client.post(..., data=data if data else None) ``` ### 3. File Size Management Pattern ```python if len(content) > 500000: # 500KB threshold # Save to file with open(output_file, 'w') as f: f.write(content) return {"data_file": output_file} else: # Include in response return {"data": content} ``` ### 4. Pre-signed URL Pattern ```python # Create new client without auth headers async with httpx.AsyncClient(timeout=60.0) as fetch_client: # No headers parameter! response = await fetch_client.get(pre_signed_url) ``` --- ## Internal Functions Architecture ### Why Internal Functions? The FastMCP framework replaces MCP-decorated functions with FunctionTool objects that cannot be called directly from within Python code. To solve this, the codebase uses a pattern where: 1. **Internal functions** (`_parse_document_internal`, `_extract_data_internal`, `_create_parse_job_internal`) contain all the actual API logic 2. **MCP tools** are thin wrappers that just call the internal functions 3. **process_folder** calls the internal functions directly, avoiding FunctionTool issues ### Internal Function List - `_parse_document_internal()` - Document parsing logic - `_extract_data_internal()` - Data extraction logic - `_create_parse_job_internal()` - Job creation logic ### Benefits - **No code duplication** - Logic exists in one place - **Testability** - Internal functions can be tested independently - **Flexibility** - Any function can call internal functions - **Clean separation** - API logic separated from MCP interface ## Key Insights 1. **Context Window Management**: The server automatically detects large responses and saves them to files, preventing Claude's context window from being overwhelmed. 2. **Progressive Disclosure**: Tools provide summaries and previews before full data, letting users decide what to access. 3. **Fail-Safe Design**: When auto-fetch fails, the tool still returns the URL so users can access results manually. 4. **Transparency**: URLs and file paths are printed for visibility, making the process debuggable. 5. **Smart Detection**: The server intelligently detects whether inputs are file paths or content strings, improving usability. 6. **S3 Authentication**: Understanding that pre-signed URLs include auth in the URL itself is crucial for successful downloads. 7. **Internal Functions Pattern**: Solves the FunctionTool wrapper issue by separating API logic from MCP decorators. This architecture ensures reliable document processing while managing memory efficiently and providing clear feedback throughout the process.

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/avaxia8/landingai-ade-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server