Skip to main content
Glama
server.py59.8 kB
""" LandingAI ADE MCP Server ===================================================== A Model Context Protocol server that directly integrates with LandingAI's ADE REST API. """ import os import json import logging import httpx import asyncio from typing import Dict, Any, Optional, List, Union from pathlib import Path from fastmcp import FastMCP from pydantic import BaseModel, Field from datetime import datetime import hashlib from collections import defaultdict # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Initialize the MCP Server mcp = FastMCP( "landingai-ade-mcp", instructions="Model Context Protocol server for LandingAI's Agentic Document Extraction API. Direct REST API integration for document parsing, data extraction, and async job management.", version="2.0.0" ) # Configuration API_BASE_URL = "https://api.va.landing.ai" API_BASE_URL_EU = "https://api.va.eu-west-1.landing.ai" # Get API key from environment API_KEY = os.environ.get("LANDINGAI_API_KEY") if not API_KEY: logger.warning("No API key found. Set LANDINGAI_API_KEY environment variable.") # ============= Helper Functions ============= def get_headers() -> Dict[str, str]: """Get authorization headers for API requests""" if not API_KEY: raise ValueError("API key not configured") return { "Authorization": f"Bearer {API_KEY}" } async def handle_api_response(response: httpx.Response, add_status: bool = True) -> Dict[str, Any]: """Handle API response and errors""" try: if response.status_code == 200: result = response.json() if add_status: return {"status": "success", **result} return result elif response.status_code == 202: # Parse job created result = response.json() if add_status: return {"status": "accepted", **result} return result elif response.status_code == 401: return { "status": "error", "error": "Authentication failed. Check your API key.", "status_code": 401 } elif response.status_code == 403: return { "status": "error", "error": "Access forbidden. Check your permissions.", "status_code": 403 } elif response.status_code == 404: return { "status": "error", "error": "Resource not found.", "status_code": 404 } elif response.status_code == 413: return { "status": "error", "error": "File too large. Use parse job for files over 50MB.", "status_code": 413 } elif response.status_code == 429: return { "status": "error", "error": "Rate limit exceeded. Please wait and try again.", "status_code": 429 } elif response.status_code == 422: error_detail = response.json() if response.text else "Validation error" return { "status": "error", "error": f"Validation error: {error_detail}", "status_code": 422 } else: return { "status": "error", "error": f"API error: {response.status_code} - {response.text}", "status_code": response.status_code } except json.JSONDecodeError: return { "status": "error", "error": f"Invalid JSON response: {response.text}", "status_code": response.status_code } # ============= Internal API Functions ============= # These functions contain the actual API logic and can be called by both MCP tools and process_folder async def _parse_document_internal( document_path: Optional[str] = None, document_url: Optional[str] = None, model: Optional[str] = None, split: Optional[str] = None ) -> Dict[str, Any]: """Internal function for document parsing - contains the actual API logic""" if document_path: logger.info(f"Parsing document: {document_path}") elif document_url: logger.info(f"Parsing document from URL: {document_url}") else: return { "status": "error", "error": "Must provide either document_path or document_url" } if not API_KEY: return { "status": "error", "error": "API key not configured. Set LANDINGAI_API_KEY environment variable." } try: async with httpx.AsyncClient(timeout=60.0) as client: if document_path: # Check if file exists path = Path(document_path) if not path.exists(): return { "status": "error", "error": f"File not found: {document_path}" } # Check file size file_size_mb = path.stat().st_size / (1024 * 1024) if file_size_mb > 50: logger.warning(f"Large file detected ({file_size_mb:.2f} MB). Consider using create_parse_job for better performance.") # Prepare multipart form data with open(path, 'rb') as f: files = {'document': (path.name, f, 'application/octet-stream')} data = {} if model: data['model'] = model if split: data['split'] = split response = await client.post( f"{API_BASE_URL}/v1/ade/parse", headers=get_headers(), files=files, data=data if data else None ) else: # document_url # Prepare form data with URL data = {'document_url': document_url} if model: data['model'] = model if split: data['split'] = split response = await client.post( f"{API_BASE_URL}/v1/ade/parse", headers=get_headers(), data=data ) result = await handle_api_response(response) if result.get("status") == "success": logger.info("Document parsed successfully") # Always save parsed markdown to local directory if "markdown" in result: # Create persistent output directory output_dir = Path.home() / "landingai_ade_output" output_dir.mkdir(exist_ok=True) # Extract filename for naming if document_path: source_filename = Path(document_path).stem elif document_url: source_filename = document_url.split('/')[-1].split('?')[0].replace('.', '_') else: source_filename = "unknown" # Create output filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = str(output_dir / f"parse_{source_filename}_{timestamp}.md") # Write markdown to file with open(output_file, 'w', encoding='utf-8') as f: f.write(result["markdown"]) logger.info(f"Parse results saved to: {output_file}") result["saved_to"] = output_file result["markdown_length"] = len(result["markdown"]) # Add helpful summary info if "chunks" in result: result["chunks_count"] = len(result["chunks"]) if "metadata" in result and "page_count" in result["metadata"]: result["page_count"] = result["metadata"]["page_count"] return result except FileNotFoundError: return { "status": "error", "error": f"File not found: {document_path}" } except PermissionError: return { "status": "error", "error": f"Permission denied accessing file: {document_path}" } except httpx.TimeoutException: return { "status": "error", "error": "Request timeout. File may be too large - consider using async job." } except Exception as e: logger.error(f"Error parsing document: {e}") return { "status": "error", "error": str(e), "error_type": type(e).__name__ } async def _extract_data_internal( schema: Union[Dict[str, Any], str], markdown: Optional[str] = None, markdown_url: Optional[str] = None, model: Optional[str] = None ) -> Dict[str, Any]: """Internal function for data extraction - contains the actual API logic""" logger.info("Extracting data with provided schema") # Parse schema if it's a string if isinstance(schema, str): try: schema = json.loads(schema) except json.JSONDecodeError as e: return { "status": "error", "error": f"Invalid JSON schema: {str(e)}" } if not schema: return { "status": "error", "error": "schema is required" } if not API_KEY: return { "status": "error", "error": "API key not configured. Set LANDINGAI_API_KEY environment variable." } try: # Validate schema if not isinstance(schema, dict): return { "status": "error", "error": "Schema must be a valid JSON object (dict)" } # Ensure schema is properly formatted if "type" not in schema: logger.warning("Schema missing 'type' field, adding 'object' as default") schema = {"type": "object", "properties": schema} # Convert schema to JSON string schema_json = json.dumps(schema) # Validate input sources if markdown is None and markdown_url is None: return { "status": "error", "error": "Must provide either markdown (file path or content) or markdown_url" } if markdown is not None and markdown_url is not None: return { "status": "error", "error": "Provide only ONE of: markdown or markdown_url" } # Prepare and send request based on input type async with httpx.AsyncClient(timeout=60.0) as client: if markdown is not None: # Check if markdown is a file path or content string is_file = False path = None # Try to interpret as file path try: # Handle both Path objects and strings if isinstance(markdown, Path): path = markdown else: path = Path(markdown) # Check if it's an existing file if path.exists() and path.is_file(): is_file = True except: # Not a valid path, treat as content is_file = False if is_file: # Markdown file path provided logger.info(f"Extracting from markdown file: {path}") with open(path, 'rb') as f: files = { 'schema': (None, schema_json, 'application/json'), 'markdown': (path.name, f, 'text/markdown') } data = {} if model: data['model'] = model response = await client.post( f"{API_BASE_URL}/v1/ade/extract", headers=get_headers(), files=files, data=data if data else None ) else: # Markdown content provided as string logger.info("Extracting from markdown content string") files = { 'schema': (None, schema_json, 'application/json'), 'markdown': ('content.md', str(markdown).encode('utf-8'), 'text/markdown') } data = {} if model: data['model'] = model response = await client.post( f"{API_BASE_URL}/v1/ade/extract", headers=get_headers(), files=files, data=data if data else None ) else: # markdown_url is not None # Markdown URL provided logger.info(f"Extracting from markdown URL: {markdown_url}") # When using URL, send as form data (not multipart) data = { 'schema': schema_json, 'markdown_url': markdown_url } if model: data['model'] = model response = await client.post( f"{API_BASE_URL}/v1/ade/extract", headers=get_headers(), data=data ) # Handle response if response.status_code == 200: result = response.json() # Validate expected response structure if all(key in result for key in ['extraction', 'extraction_metadata', 'metadata']): logger.info("Data extraction successful") # Always save extraction results to local directory # Create persistent output directory output_dir = Path.home() / "landingai_ade_output" output_dir.mkdir(exist_ok=True) # Extract filename for naming source_filename = "data" if is_file: source_filename = Path(path).stem elif markdown and len(markdown) < 100: # Short markdown might be a filename source_filename = markdown[:20].replace('/', '_').replace(' ', '_') # Create output filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = str(output_dir / f"extract_{source_filename}_{timestamp}.json") # Write extraction results to file with open(output_file, 'w', encoding='utf-8') as f: json.dump(result.get('extraction', {}), f, indent=2) logger.info(f"Extraction results saved to: {output_file}") result["saved_to"] = output_file # Add helpful summary extraction = result.get('extraction', {}) metadata = result.get('metadata', {}) # Add summary info prefixed with underscore result['_summary'] = { 'fields_extracted': len(extraction), 'duration_ms': metadata.get('duration_ms'), 'credit_usage': metadata.get('credit_usage'), 'extracted_keys': list(extraction.keys()) } # Add success status for consistency result['status'] = 'success' else: logger.warning("Response missing expected fields") result['status'] = 'success' # Still successful API call return result elif response.status_code == 206: # Partial success - some schema validation issues result = response.json() result['status'] = 'partial' result['_message'] = 'Extraction partially successful with schema validation warnings' return result else: # Handle error responses result = await handle_api_response(response) return result except json.JSONDecodeError as e: logger.error(f"Invalid JSON in schema: {e}") return { "status": "error", "error": f"Invalid JSON schema: {str(e)}" } except Exception as e: logger.error(f"Error extracting data: {e}") return { "status": "error", "error": str(e), "error_type": type(e).__name__ } async def _create_parse_job_internal( document_path: Optional[str] = None, document_url: Optional[str] = None, model: Optional[str] = None, split: Optional[str] = None, output_save_url: Optional[str] = None ) -> Dict[str, Any]: """Internal function for creating parse jobs - contains the actual API logic""" if document_path: logger.info(f"Creating parse job for file: {document_path}") elif document_url: logger.info(f"Creating parse job for URL: {document_url}") else: return { "status": "error", "error": "Must provide either document_path or document_url" } if not API_KEY: return { "status": "error", "error": "API key not configured. Set LANDINGAI_API_KEY environment variable." } try: async with httpx.AsyncClient(timeout=30.0) as client: if document_path: path = Path(document_path) if not path.exists(): return { "status": "error", "error": f"File not found: {document_path}" } # Log file size for user awareness file_size_mb = path.stat().st_size / (1024 * 1024) logger.info(f"File size: {file_size_mb:.2f} MB") with open(path, 'rb') as f: files = {'document': (path.name, f, 'application/octet-stream')} # Build data dict with only provided parameters data = {} if model: data['model'] = model if split: data['split'] = split if output_save_url: data['output_save_url'] = output_save_url response = await client.post( f"{API_BASE_URL}/v1/ade/parse/jobs", headers=get_headers(), files=files, data=data if data else None ) else: # document_url # Build data dict with URL and optional parameters data = {'document_url': document_url} if model: data['model'] = model if split: data['split'] = split if output_save_url: data['output_save_url'] = output_save_url response = await client.post( f"{API_BASE_URL}/v1/ade/parse/jobs", headers=get_headers(), data=data ) # Handle 202 Accepted response if response.status_code == 202: result = response.json() job_id = result.get('job_id') logger.info(f"Parse job created successfully: {job_id}") return { "status": "success", "job_id": job_id, "message": "Parse job created. Use get_parse_job_status to check progress.", "note": "For Zero Data Retention, results will be saved to output_save_url if provided." } else: # Handle error responses result = await handle_api_response(response) return result except Exception as e: logger.error(f"Error creating parse job: {e}") return { "status": "error", "error": str(e) } # ============= MCP Tools ============= @mcp.tool() async def parse_document( document_path: Optional[str] = None, document_url: Optional[str] = None, model: Optional[str] = None, split: Optional[str] = None ) -> Dict[str, Any]: """ Parse a document (PDF and images) and extract its content. Supported formats: APNG, BMP, DCX, DDS, DIB, DOC, DOCX, GD, GIF, ICNS, JP2 (JP2000), JPEG, JPG, ODP, ODT, PCX, PDF, PNG, PPT, PPTX, PPM, PSD, TGA, TIFF, WEBP See full list: https://docs.landing.ai/ade/ade-file-types Args: document_path: Path to local document file (provide this OR document_url) document_url: URL of document to parse (provide this OR document_path) model: Model version to use for parsing (optional, e.g., "dpt-2-latest") split: Set to "page" to split document by pages (optional) Returns: Response containing: - markdown: Full document as markdown text - chunks: Array of document chunks with markdown, type, id, and grounding - splits: Page/section splits if requested (with page numbers and content) - grounding: Location information mapping text to coordinates - metadata: Processing details (filename, page_count, duration_ms, credit_usage, etc.) """ return await _parse_document_internal(document_path, document_url, model, split) @mcp.tool() async def extract_data( schema: Union[Dict[str, Any], str], markdown: Optional[str] = None, markdown_url: Optional[str] = None, model: Optional[str] = None ) -> Dict[str, Any]: """ Extract structured data from markdown content using a JSON schema. The schema determines what key-value pairs are extracted from the markdown. Args: schema: JSON schema dict or string for field extraction (required). Defines what to extract. Must be a valid JSON object with type definitions. markdown: The markdown file path or markdown content string (provide this OR markdown_url) markdown_url: URL to markdown file (provide this OR markdown) model: Model version for extraction (optional, e.g., "extract-latest" for latest version) Returns: Response with exactly these fields: - extraction: Object containing the extracted key-value pairs matching your schema - extraction_metadata: Object with extracted pairs and chunk references for each value - metadata: Object containing: - filename: Name of processed file - org_id: Organization ID - duration_ms: Processing time in milliseconds - credit_usage: Credits consumed - job_id: Unique job identifier - version: API version used """ return await _extract_data_internal(schema, markdown, markdown_url, model) @mcp.tool() async def create_parse_job( document_path: Optional[str] = None, document_url: Optional[str] = None, model: Optional[str] = None, split: Optional[str] = None, output_save_url: Optional[str] = None ) -> Dict[str, Any]: """ Create a parse job for large documents (asynchronous processing). Use this for documents over 50MB or when you need non-blocking processing. Provide either document_path (local file) or document_url. Supported formats: APNG, BMP, DCX, DDS, DIB, DOC, DOCX, GD, GIF, ICNS, JP2 (JP2000), JPEG, JPG, ODP, ODT, PCX, PDF, PNG, PPT, PPTX, PPM, PSD, TGA, TIFF, WEBP See full list: https://docs.landing.ai/ade/ade-file-types Args: document_path: Path to local document file (provide this OR document_url) document_url: URL of document to parse (provide this OR document_url) model: Model version to use for parsing (optional, e.g., "dpt-2-latest") split: Set to "page" to split document by pages (optional) output_save_url: URL to save output for Zero Data Retention (optional) Returns: Response with: - job_id: Unique identifier for tracking the parse job - status: "success" if job created - message: Helpful status message """ return await _create_parse_job_internal(document_path, document_url, model, split, output_save_url) @mcp.tool() async def get_parse_job_status(job_id: str) -> Dict[str, Any]: """ Get the status and results of a parse job. Args: job_id: The unique job identifier to check Returns: Complete job information including: - job_id: The job identifier - status: Current status (pending, processing, completed, failed, cancelled) - received_at: Timestamp when job was received - progress: Progress from 0.0 to 1.0 - org_id: Organization ID (optional) - version: API version (optional) - data: Parse results when completed (null if not complete or using output_url) - markdown: Full document as markdown text - chunks: Array with markdown, type, id, and grounding per chunk - splits: Page/section splits if requested - grounding: Location information - metadata: Processing details - output_url: URL to download results if >1MB or output_save_url was used (optional) - metadata: Job processing metadata (optional) - failure_reason: Error message if job failed (optional) """ logger.info(f"Getting status for job: {job_id}") if not job_id: return { "status": "error", "error": "job_id is required" } if not API_KEY: return { "status": "error", "error": "API key not configured. Set LANDINGAI_API_KEY environment variable." } try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( f"{API_BASE_URL}/v1/ade/parse/jobs/{job_id}", headers=get_headers() ) if response.status_code == 200: result = response.json() # Add helper information for better user experience job_status = result.get("status", "unknown") progress = result.get("progress", 0) # Add human-readable message based on status if job_status == "completed": if "data" in result and result["data"]: # Data is included in response result["_message"] = "Job completed successfully. Results available in 'data' field." if "markdown" in result["data"]: result["_summary"] = { "markdown_length": len(result["data"]["markdown"]), "chunks_count": len(result["data"].get("chunks", [])), "has_splits": "splits" in result["data"] and bool(result["data"]["splits"]) } elif "output_url" in result and result["output_url"]: # Results available via URL (>1MB or Zero Data Retention) output_url = result["output_url"] logger.info(f"Output URL found: {output_url}") result["_message"] = f"Job completed. Output URL: {output_url}" # Try to fetch the results from the pre-signed URL try: logger.info(f"Fetching results from output URL...") # Use a new client without auth headers for the pre-signed URL async with httpx.AsyncClient(timeout=60.0) as fetch_client: fetch_response = await fetch_client.get(output_url) if fetch_response.status_code == 200: # Parse the fetched data fetched_data = fetch_response.json() # Since we're fetching from output_url, this is a large file # Save it to disk to avoid context window issues if isinstance(fetched_data, dict) and "markdown" in fetched_data: markdown_content = fetched_data["markdown"] markdown_length = len(markdown_content) # Create persistent output directory output_dir = Path.home() / "landingai_ade_output" output_dir.mkdir(exist_ok=True) # Extract original filename from metadata if available source_filename = "document" if "metadata" in fetched_data: filename_meta = fetched_data["metadata"].get("filename", "") if filename_meta: source_filename = Path(filename_meta).stem # Create output filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = str(output_dir / f"{source_filename}_{job_id}_{timestamp}.md") # Write markdown to file with open(output_file, 'w', encoding='utf-8') as f: f.write(markdown_content) file_size_mb = markdown_length / (1024 * 1024) logger.info(f"Large result saved to: {output_file} ({file_size_mb:.2f} MB)") # Return file info instead of full content result["data_file"] = output_file result["_message"] = f"Job completed. Large results ({file_size_mb:.2f} MB) saved to: {output_file}" result["_fetch_status"] = "success_saved_to_file" result["_summary"] = { "markdown_length": markdown_length, "chunks_count": len(fetched_data.get("chunks", [])), "has_splits": "splits" in fetched_data and bool(fetched_data.get("splits")), "file_path": output_file, "file_size_mb": file_size_mb } # Include only metadata and structure info, not the large content result["metadata"] = fetched_data.get("metadata", {}) # Include a preview of the content (first 1000 chars) result["preview"] = markdown_content[:1000] + "..." if markdown_length > 1000 else markdown_content else: # No markdown or not a dict, include as-is result["data"] = fetched_data result["_message"] = "Job completed. Results fetched from output URL successfully." result["_fetch_status"] = "success" else: logger.warning(f"Failed to fetch from output URL: {fetch_response.status_code}") result["_message"] = f"Job completed. Results at output_url (fetch failed: {fetch_response.status_code})" result["_fetch_status"] = f"failed_{fetch_response.status_code}" result["_fetch_note"] = "Results are available at the output_url" except Exception as e: logger.error(f"Error fetching from output URL: {e}") result["_message"] = "Job completed. Results at output_url (auto-fetch failed)" result["_fetch_status"] = "error" result["_fetch_error"] = str(e) result["_fetch_note"] = "Use download_from_url tool with the output_url to retry" else: result["_message"] = "Job completed but no data available." elif job_status == "processing": result["_message"] = f"Job in progress: {progress*100:.1f}% complete" result["_estimated_wait"] = "Check again in 5-10 seconds" elif job_status == "pending": result["_message"] = "Job is queued and waiting to be processed" result["_estimated_wait"] = "Processing should start soon" elif job_status == "failed": failure_reason = result.get("failure_reason", "Unknown error") result["_message"] = f"Job failed: {failure_reason}" elif job_status == "cancelled": result["_message"] = "Job was cancelled" # Add timestamp conversion for readability if "received_at" in result and isinstance(result["received_at"], (int, float)): try: from datetime import datetime dt = datetime.fromtimestamp(result["received_at"] / 1000) # Assuming milliseconds result["_received_at_iso"] = dt.isoformat() except: pass # Keep original format if conversion fails logger.info(f"Job {job_id} status: {job_status} ({progress*100:.1f}% complete)") return result elif response.status_code == 404: return { "status": "error", "error": f"Job not found: {job_id}", "status_code": 404 } else: # Handle other error responses result = await handle_api_response(response, add_status=False) return result except Exception as e: logger.error(f"Error getting job status: {e}") return { "status": "error", "error": str(e), "error_type": type(e).__name__ } @mcp.tool() async def list_parse_jobs( page: Optional[int] = None, pageSize: Optional[int] = None, status: Optional[str] = None ) -> Dict[str, Any]: """ List all parse jobs with optional filtering. Args: page: Page number, 0-indexed (optional, default: 0, min: 0) pageSize: Number of items per page (optional, default: 10, range: 1-100) status: Filter by job status - one of: cancelled, completed, failed, pending, processing (optional) Returns: Response containing: - jobs: Array of job objects, each with: - job_id: Unique job identifier - status: Current job status - received_at: Timestamp when job was received - progress: Progress from 0.0 to 1.0 - failure_reason: Error message if job failed (optional) - org_id: Organization ID (optional) - has_more: Boolean indicating if more pages are available """ # Set defaults and validate ranges actual_page = page if page is not None else 0 actual_page_size = pageSize if pageSize is not None else 10 # Validate page is non-negative if actual_page < 0: return { "status": "error", "error": "Page must be >= 0" } # Validate and cap pageSize if actual_page_size < 1: return { "status": "error", "error": "Page size must be >= 1" } if actual_page_size > 100: actual_page_size = 100 logger.info(f"Page size capped at maximum of 100") # Validate status filter if provided valid_statuses = ["cancelled", "completed", "failed", "pending", "processing"] if status and status not in valid_statuses: return { "status": "error", "error": f"Invalid status filter. Must be one of: {', '.join(valid_statuses)}" } logger.info(f"Listing parse jobs (page: {actual_page}, pageSize: {actual_page_size}, status: {status})") if not API_KEY: return { "status": "error", "error": "API key not configured. Set LANDINGAI_API_KEY environment variable." } try: async with httpx.AsyncClient(timeout=30.0) as client: # Build query parameters params = {} if page is not None: params['page'] = actual_page if pageSize is not None: params['pageSize'] = actual_page_size if status: params['status'] = status response = await client.get( f"{API_BASE_URL}/v1/ade/parse/jobs", headers=get_headers(), params=params if params else None ) if response.status_code == 200: result = response.json() # Add status to indicate success result["status"] = "success" # Add helpful summary information jobs = result.get("jobs", []) logger.info(f"Found {len(jobs)} jobs on page {actual_page}") # Add pagination info result["pagination"] = { "current_page": actual_page, "page_size": actual_page_size, "items_on_page": len(jobs), "has_more": result.get("has_more", False) } # Count jobs by status for convenience if jobs: status_counts = {} for job in jobs: job_status = job.get("status", "unknown") status_counts[job_status] = status_counts.get(job_status, 0) + 1 result["status_summary"] = status_counts # Add human-readable timestamps if available for job in jobs: if "received_at" in job and isinstance(job["received_at"], (int, float)): # Convert timestamp to ISO format for readability try: from datetime import datetime dt = datetime.fromtimestamp(job["received_at"] / 1000) # Assuming milliseconds job["received_at_iso"] = dt.isoformat() except: pass # Keep original format if conversion fails return result else: # Handle error responses result = await handle_api_response(response) return result except Exception as e: logger.error(f"Error listing parse jobs: {e}") return { "status": "error", "error": str(e) } # Note: process_folder will call the existing parse_document and extract_data functions directly # since they are just async functions, not MCP tool decorators that prevent direct calls @mcp.tool() async def process_folder( folder_path: str, operation: str = "parse", schema: Optional[Union[Dict[str, Any], str]] = None, file_types: Optional[str] = None, model: Optional[str] = None, split: Optional[str] = None, max_concurrent: int = 15, save_results: bool = True ) -> Dict[str, Any]: """ Process all supported files in a folder - parse documents or extract structured data. Supported formats: - Images: APNG, BMP, DCX, DDS, DIB, GD, GIF, ICNS, JP2, JPEG, JPG, PCX, PNG, PPM, PSD, TGA, TIFF, WEBP - Documents: PDF, DOC, DOCX, PPT, PPTX, ODP, ODT Args: folder_path: Path to folder containing documents operation: "parse" (extract text/tables) or "extract" (extract structured data) schema: JSON schema dict or string for extraction (required if operation="extract") file_types: Comma-separated file extensions to process (e.g., "pdf,jpg"). None = all supported model: Model version to use (optional, e.g., "dpt-2-latest") split: Set to "page" to split documents by pages (optional) max_concurrent: Maximum number of concurrent operations (default: 3) save_results: Save results to files in ade_results folder (default: True) Returns: Processing summary with file results, paths, and aggregated data (for extract) Examples: # Parse all PDFs await process_folder("/path/to/docs", operation="parse", file_types="pdf") # Extract invoice data from all documents schema = {"type": "object", "properties": {"invoice_no": {"type": "string"}}} await process_folder("/path/to/invoices", operation="extract", schema=schema) """ # Validate inputs if operation not in ["parse", "extract"]: return { "status": "error", "error": "Operation must be 'parse' or 'extract'" } if operation == "extract" and not schema: return { "status": "error", "error": "Schema is required for extract operation" } # Parse schema if it's a string if schema and isinstance(schema, str): try: schema = json.loads(schema) except json.JSONDecodeError as e: return { "status": "error", "error": f"Invalid JSON schema: {str(e)}" } folder = Path(folder_path) if not folder.exists() or not folder.is_dir(): return { "status": "error", "error": f"Folder not found or not a directory: {folder_path}" } if not API_KEY: return { "status": "error", "error": "API key not configured. Set LANDINGAI_API_KEY environment variable." } # Define supported extensions SUPPORTED_EXTENSIONS = { # Images '.apng', '.bmp', '.dcx', '.dds', '.dib', '.gd', '.gif', '.icns', '.jp2', '.jpeg', '.jpg', '.pcx', '.png', '.ppm', '.psd', '.tga', '.tiff', '.tif', '.webp', # Documents '.pdf', '.doc', '.docx', '.ppt', '.pptx', '.odp', '.odt' } # Filter extensions if specified if file_types: requested_exts = {f".{ext.lower().strip()}" for ext in file_types.split(",")} allowed_exts = requested_exts & SUPPORTED_EXTENSIONS else: allowed_exts = SUPPORTED_EXTENSIONS # Find all matching files all_files = [] for ext in allowed_exts: all_files.extend(folder.glob(f"*{ext}")) all_files.extend(folder.glob(f"*{ext.upper()}")) # Remove duplicates and sort all_files = sorted(set(all_files)) if not all_files: return { "status": "success", "message": f"No supported files found in {folder_path}", "file_types_searched": list(allowed_exts), "total_files": 0 } logger.info(f"Found {len(all_files)} files to process in {folder_path}") # Prepare output directory if saving results output_dir = None if save_results: output_dir = folder / "ade_results" output_dir.mkdir(exist_ok=True) logger.info(f"Results will be saved to: {output_dir}") # Track processing results processed_files = [] failed_files = [] aggregated_data = [] if operation == "extract" else None start_time = datetime.now() # Group files by size for optimal processing small_files = [] # < 50MB large_files = [] # >= 50MB for file_path in all_files: size_mb = file_path.stat().st_size / (1024 * 1024) if size_mb < 50: small_files.append(file_path) else: large_files.append(file_path) logger.info(f"Processing {len(small_files)} small files (<50MB) and {len(large_files)} large files (>=50MB)") # Process small files directly in batches for i in range(0, len(small_files), max_concurrent): batch = small_files[i:i + max_concurrent] tasks = [] for file_path in batch: if operation == "parse": tasks.append(_parse_document_internal(document_path=str(file_path), model=model, split=split)) else: # extract # For extraction, we need to parse first then extract async def parse_and_extract(fp=file_path, s=schema, m=model, sp=split): parse_result = await _parse_document_internal(document_path=str(fp), model=m, split=sp) if parse_result.get("status") == "error": return parse_result markdown = parse_result.get("markdown", "") if not markdown: return {"status": "error", "error": "No markdown content to extract from"} extract_result = await _extract_data_internal(schema=s, markdown=markdown) if extract_result.get("status") == "success": return { "status": "success", "extraction": extract_result.get("extraction"), "markdown": markdown, "metadata": parse_result.get("metadata") } return extract_result tasks.append(parse_and_extract()) # Execute batch concurrently results = await asyncio.gather(*tasks, return_exceptions=True) # Process results for file_path, result in zip(batch, results): if isinstance(result, Exception): failed_files.append({ "filename": file_path.name, "error": str(result) }) logger.error(f"Failed to process {file_path.name}: {result}") elif result.get("status") == "error": failed_files.append({ "filename": file_path.name, "error": result.get("error", "Unknown error") }) else: # Save results if requested output_path = None if save_results and output_dir: file_output_dir = output_dir / f"{file_path.stem}_{operation}" file_output_dir.mkdir(exist_ok=True) output_path = str(file_output_dir) if operation == "parse": # Save markdown content if "markdown" in result: (file_output_dir / "content.md").write_text( result["markdown"], encoding="utf-8" ) # Save metadata if "metadata" in result: (file_output_dir / "metadata.json").write_text( json.dumps(result["metadata"], indent=2), encoding="utf-8" ) # Save chunks if present if "chunks" in result and result["chunks"]: (file_output_dir / "chunks.json").write_text( json.dumps(result["chunks"], indent=2), encoding="utf-8" ) else: # extract # Save extracted data if "extraction" in result: (file_output_dir / "data.json").write_text( json.dumps(result["extraction"], indent=2), encoding="utf-8" ) # Add to aggregated data aggregated_entry = { "source_file": file_path.name, **result["extraction"] } aggregated_data.append(aggregated_entry) # Save source markdown if "markdown" in result: (file_output_dir / "source.md").write_text( result["markdown"], encoding="utf-8" ) processed_files.append({ "filename": file_path.name, "status": "success", "pages": result.get("metadata", {}).get("page_count", 0) if operation == "parse" else None, "has_data": bool(result.get("extraction")) if operation == "extract" else None, "output_path": output_path }) # Process large files using parse jobs if large_files: logger.info(f"Creating parse jobs for {len(large_files)} large files") jobs = [] # Create jobs for large files for file_path in large_files: try: job_result = await _create_parse_job_internal( document_path=str(file_path), model=model, split=split ) if job_result.get("status") == "success": jobs.append({ "file_path": file_path, "job_id": job_result["job_id"] }) else: failed_files.append({ "filename": file_path.name, "error": job_result.get("error", "Failed to create parse job") }) except Exception as e: failed_files.append({ "filename": file_path.name, "error": str(e) }) # Monitor jobs until completion if jobs: logger.info(f"Monitoring {len(jobs)} parse jobs...") pending_jobs = jobs.copy() while pending_jobs: await asyncio.sleep(5) # Check every 5 seconds for job in pending_jobs[:]: try: status_result = await get_parse_job_status(job["job_id"]) if status_result.get("status") == "completed": pending_jobs.remove(job) # Process based on operation if operation == "extract" and status_result.get("data"): # Extract from parsed markdown markdown = status_result["data"].get("markdown", "") extract_result = await _extract_data_internal( schema=schema, markdown=markdown ) if extract_result.get("status") == "success": result = { "extraction": extract_result.get("extraction"), "markdown": markdown, "metadata": status_result["data"].get("metadata") } else: result = extract_result else: result = status_result.get("data", status_result) # Save results output_path = None if save_results and output_dir: file_output_dir = output_dir / f"{job['file_path'].stem}_{operation}" file_output_dir.mkdir(exist_ok=True) output_path = str(file_output_dir) if operation == "parse" and "data_file" in status_result: # Large file saved to disk import shutil shutil.copy(status_result["data_file"], file_output_dir / "content.md") elif operation == "parse" and result.get("markdown"): (file_output_dir / "content.md").write_text( result["markdown"], encoding="utf-8" ) elif operation == "extract" and result.get("extraction"): (file_output_dir / "data.json").write_text( json.dumps(result["extraction"], indent=2), encoding="utf-8" ) aggregated_data.append({ "source_file": job["file_path"].name, **result["extraction"] }) processed_files.append({ "filename": job["file_path"].name, "status": "success", "job_id": job["job_id"], "output_path": output_path }) elif status_result.get("status") == "failed": pending_jobs.remove(job) failed_files.append({ "filename": job["file_path"].name, "error": status_result.get("failure_reason", "Job failed") }) except Exception as e: logger.error(f"Error checking job {job['job_id']}: {e}") if pending_jobs: logger.info(f"Still waiting for {len(pending_jobs)} jobs to complete...") # Calculate summary processing_time = (datetime.now() - start_time).total_seconds() # Save summary if requested if save_results and output_dir: summary = { "operation": operation, "timestamp": datetime.now().isoformat(), "folder_path": str(folder_path), "processing_time_s": processing_time, "total_files": len(all_files), "processed": len(processed_files), "failed": len(failed_files), "processed_files": processed_files, "failed_files": failed_files } if aggregated_data: summary["aggregated_data"] = aggregated_data # Also save aggregated data separately (output_dir / "extracted_data.json").write_text( json.dumps(aggregated_data, indent=2), encoding="utf-8" ) (output_dir / "summary.json").write_text( json.dumps(summary, indent=2), encoding="utf-8" ) # Prepare response result = { "status": "success", "operation": operation, "summary": { "total_files": len(all_files), "processed": len(processed_files), "failed": len(failed_files), "processing_time_s": round(processing_time, 1) }, "processed_files": processed_files, "failed_files": failed_files } if save_results and output_dir: result["results_path"] = str(output_dir) if aggregated_data: result["aggregated_data"] = aggregated_data return result @mcp.tool() async def health_check() -> Dict[str, Any]: """ Check the health status of the MCP server and API connectivity. Returns: Server status, configuration details, and API connectivity test """ result = { "status": "healthy", "server": "LandingAI ADE MCP Server (Direct API)", "version": "2.0.0", "api_key_configured": bool(API_KEY), "api_base_url": API_BASE_URL, "available_tools": [ "parse_document", "extract_data", "create_parse_job", "get_parse_job_status", "list_parse_jobs", "process_folder", "health_check" ], "timestamp": datetime.now().isoformat() } # Test API connectivity if key is configured if API_KEY: try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get( f"{API_BASE_URL}/v1/ade/parse/jobs", headers=get_headers(), params={'pageSize': 1} ) if response.status_code == 200: result["api_connectivity"] = "connected" elif response.status_code == 401: result["api_connectivity"] = "invalid_api_key" else: result["api_connectivity"] = f"error_{response.status_code}" except Exception as e: result["api_connectivity"] = f"connection_failed: {str(e)}" else: result["api_connectivity"] = "no_api_key" return result # ============= Main Entry Point ============= def main(): """Main entry point for the MCP server""" logger.info("Starting LandingAI ADE MCP Server (Direct API)") logger.info("API Key configured: " + ("Yes" if API_KEY else "No")) # Use FastMCP's built-in run method mcp.run() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/avaxia8/landingai-ade-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server