Skip to main content
Glama

Unstructured API MCP Server

Official
firecrawl.py20 kB
import asyncio import hashlib import os import tempfile import time from typing import Any, Dict, Literal import boto3 # Import the FirecrawlApp client from firecrawl import FirecrawlApp # Define job types Firecrawl_JobType = Literal["crawlhtml", "llmfulltxt"] def _prepare_firecrawl_config() -> Dict[str, str]: """Prepare the Firecrawl configuration by retrieving and validating the API key. Returns: A dictionary containing either an API key or an error message """ api_key = os.getenv("FIRECRAWL_API_KEY") if not api_key: return { "error": "Firecrawl API key is required. Set FIRECRAWL_API_KEY environment variable.", } return {"api_key": api_key} def _ensure_valid_s3_uri(s3_uri: str) -> str: """Ensure S3 URI is properly formatted. Args: s3_uri: S3 URI to validate Returns: Properly formatted S3 URI Raises: ValueError: If S3 URI doesn't start with 's3://' """ if not s3_uri: raise ValueError("S3 URI is required") if not s3_uri.startswith("s3://"): raise ValueError("S3 URI must start with 's3://'") # Ensure URI ends with a slash if not s3_uri.endswith("/"): s3_uri += "/" return s3_uri async def invoke_firecrawl_crawlhtml( url: str, s3_uri: str, limit: int = 100, ) -> Dict[str, Any]: """Start an asynchronous web crawl job using Firecrawl to retrieve HTML content. Args: url: URL to crawl s3_uri: S3 URI where results will be uploaded limit: Maximum number of pages to crawl (default: 100) Returns: Dictionary with crawl job information including the job ID """ # Call the generic invoke function with crawl-specific parameters params = { "limit": limit, "scrapeOptions": { "formats": ["html"], # Only use HTML format TODO: Bring in other features of this API }, } return await _invoke_firecrawl_job( url=url, s3_uri=s3_uri, job_type="crawlhtml", job_params=params, ) async def invoke_firecrawl_llmtxt( url: str, s3_uri: str, max_urls: int = 10, ) -> Dict[str, Any]: """Start an asynchronous llmfull.txt generation job using Firecrawl. This file is a standardized markdown file containing information to help LLMs use a website at inference time. The llmstxt endpoint leverages Firecrawl to crawl your website and extracts data using gpt-4o-mini Args: url: URL to crawl s3_uri: S3 URI where results will be uploaded max_urls: Maximum number of pages to crawl (1-100, default: 10) Returns: Dictionary with job information including the job ID """ # Call the generic invoke function with llmfull.txt-specific parameters params = {"maxUrls": max_urls, "showFullText": False} return await _invoke_firecrawl_job( url=url, s3_uri=s3_uri, job_type="llmfulltxt", job_params=params, ) async def _invoke_firecrawl_job( url: str, s3_uri: str, job_type: Firecrawl_JobType, job_params: Dict[str, Any], ) -> Dict[str, Any]: """Generic function to start a Firecrawl job (either HTML crawl or llmfull.txt generation). Args: url: URL to process s3_uri: S3 URI where results will be uploaded job_type: Type of job ('crawlhtml' or 'llmtxt') job_params: Parameters specific to the job type Returns: Dictionary with job information including the job ID """ # Get configuration with API key config = _prepare_firecrawl_config() # Check if config contains an error if "error" in config: return {"error": config["error"]} # Validate and normalize S3 URI first - # doing this outside the try block to handle validation errors specifically try: validated_s3_uri = _ensure_valid_s3_uri(s3_uri) except ValueError as ve: return {"error": f"Invalid S3 URI: {str(ve)}"} try: # Initialize the Firecrawl client firecrawl = FirecrawlApp(api_key=config["api_key"]) # Start the job based on job_type if job_type == "crawlhtml": job_status = firecrawl.async_crawl_url(url, params=job_params) elif job_type == "llmfulltxt": job_status = firecrawl.async_generate_llms_text(url, params=job_params) else: return {"error": f"Unknown job type: {job_type}"} # Handle the response if "id" in job_status: job_id = job_status["id"] # Start background task without waiting for it asyncio.create_task(wait_for_job_completion(job_id, validated_s3_uri, job_type)) # Prepare and return the response response = { "id": job_id, "status": job_status.get("status", "started"), "s3_uri": f"{validated_s3_uri}{job_id}/", "message": f"Firecrawl {job_type} job started " f"and will be auto-processed when complete", } return response else: return {"error": f"Failed to start Firecrawl {job_type} job", "details": job_status} except Exception as e: return {"error": f"Error starting Firecrawl {job_type} job: {str(e)}"} async def check_crawlhtml_status( crawl_id: str, ) -> Dict[str, Any]: """Check the status of an existing Firecrawl HTML crawl job. Args: crawl_id: ID of the crawl job to check Returns: Dictionary containing the current status of the crawl job """ return await _check_job_status(crawl_id, "crawlhtml") async def check_llmtxt_status( job_id: str, ) -> Dict[str, Any]: """Check the status of an existing llmfull.txt generation job. Args: job_id: ID of the llmfull.txt generation job to check Returns: Dictionary containing the current status of the job and text content if completed """ return await _check_job_status(job_id, "llmfulltxt") async def _check_job_status( job_id: str, job_type: Firecrawl_JobType, ) -> Dict[str, Any]: """Generic function to check the status of a Firecrawl job. Args: job_id: ID of the job to check job_type: Type of job ('crawlhtml' or 'llmtxt') Returns: Dictionary containing the current status of the job """ # Get configuration with API key config = _prepare_firecrawl_config() # Check if config contains an error if "error" in config: return {"error": config["error"]} try: # Initialize the Firecrawl client firecrawl = FirecrawlApp(api_key=config["api_key"]) # Check status based on job type if job_type == "crawlhtml": result = firecrawl.check_crawl_status(job_id) # Return a more user-friendly response for crawl jobs status_info = { "id": job_id, "status": result.get("status", "unknown"), "completed_urls": result.get("completed", 0), "total_urls": result.get("total", 0), } elif job_type == "llmfulltxt": result = firecrawl.check_generate_llms_text_status(job_id) # Return a more user-friendly response for llmfull.txt jobs status_info = { "id": job_id, "status": result.get("status", "unknown"), } # Add llmfull.txt content if job is completed if result.get("status") == "completed" and "data" in result: status_info["llmfulltxt"] = result["data"].get("llmsfulltxt", "") else: return {"error": f"Unknown job type: {job_type}"} return status_info except Exception as e: return {"error": f"Error checking {job_type} status: {str(e)}"} def _upload_directory_to_s3(local_dir: str, s3_uri: str) -> Dict[str, Any]: """Upload a directory to S3. Args: local_dir: Local directory to upload s3_uri: S3 URI to upload to (already validated) Returns: Dict with upload stats """ # Parse the S3 URI to get bucket and prefix (assume already validated) # Remove s3:// prefix and split by first / uri_parts = s3_uri[5:].split("/", 1) bucket_name = uri_parts[0] prefix = uri_parts[1] if len(uri_parts) > 1 else "" # Initialize boto3 S3 client s3_client = boto3.client( "s3", aws_access_key_id=os.environ.get("AWS_KEY"), aws_secret_access_key=os.environ.get("AWS_SECRET"), ) # Track upload stats stats = {"uploaded_files": 0, "failed_files": 0, "total_bytes": 0} # Walk through the directory for root, _, files in os.walk(local_dir): for file in files: local_path = os.path.join(root, file) # Determine the S3 key (path within the bucket) # Remove the local_dir prefix from the file path to get relative path relative_path = os.path.relpath(local_path, local_dir) # Create the S3 key by joining the prefix with the relative path # Replace backslashes with forward slashes for S3 s3_key = os.path.join(prefix, relative_path).replace("\\", "/") try: # Upload the file s3_client.upload_file(local_path, bucket_name, s3_key) # Update stats stats["uploaded_files"] += 1 stats["total_bytes"] += os.path.getsize(local_path) except Exception as e: print(f"Error uploading {local_path}: {str(e)}") stats["failed_files"] += 1 return stats async def wait_for_crawlhtml_completion( crawl_id: str, s3_uri: str, poll_interval: int = 30, timeout: int = 3600, ) -> Dict[str, Any]: """Poll a Firecrawl HTML crawl job until completion and upload results to S3. Args: crawl_id: ID of the crawl job to monitor s3_uri: S3 URI where results will be uploaded (already validated) poll_interval: How often to check job status in seconds (default: 30) timeout: Maximum time to wait in seconds (default: 1 hour) Returns: Dictionary with information about the completed job and S3 URI """ return await wait_for_job_completion(crawl_id, s3_uri, "crawlhtml", poll_interval, timeout) async def wait_for_job_completion( job_id: str, s3_uri: str, job_type: Firecrawl_JobType, poll_interval: int = 30, timeout: int = 3600, ) -> Dict[str, Any]: """Poll a Firecrawl job until completion and upload results to S3. Args: job_id: ID of the job to monitor s3_uri: S3 URI where results will be uploaded (already validated) job_type: Type of job ('crawlhtml' or 'llmtxt') poll_interval: How often to check job status in seconds (default: 30) timeout: Maximum time to wait in seconds (default: 1 hour) Returns: Dictionary with information about the completed job and S3 URI """ # Get configuration with API key config = _prepare_firecrawl_config() # Check if config contains an error if "error" in config: return {"error": config["error"]} try: # Initialize the Firecrawl client firecrawl = FirecrawlApp(api_key=config["api_key"]) start_time = time.time() # Poll until completion or timeout while True: # Check status based on job type if job_type == "crawlhtml": result = firecrawl.check_crawl_status(job_id) elif job_type == "llmfulltxt": result = firecrawl.check_generate_llms_text_status(job_id) else: return {"error": f"Unknown job type: {job_type}", "id": job_id} # Check if job is completed if result.get("status") == "completed": break # Check for timeout if time.time() - start_time > timeout: return { "id": job_id, "status": "timeout", "error": f"Timeout waiting for {job_type} job {job_id} to complete", "elapsed_time": time.time() - start_time, } # Wait before polling again await asyncio.sleep(poll_interval) # Job completed - process results based on job type with tempfile.TemporaryDirectory() as temp_dir: # Create a job-specific subdirectory job_dir = os.path.join(temp_dir, job_id) os.makedirs(job_dir, exist_ok=True) # Process results based on job type if job_type == "crawlhtml": file_count = await _process_crawlhtml_results(result, job_dir) elif job_type == "llmfulltxt": file_count = _process_llmtxt_results(result, job_dir) else: return {"error": f"Unknown job type: {job_type}", "id": job_id} # Upload to S3 final_s3_uri = f"{s3_uri}{job_id}/" upload_stats = _upload_directory_to_s3(job_dir, final_s3_uri) # Return combined results response = { "id": job_id, "status": "completed", "s3_uri": final_s3_uri, "file_count": file_count, "uploaded_files": upload_stats["uploaded_files"], "failed_uploads": upload_stats["failed_files"], "upload_size_bytes": upload_stats["total_bytes"], "elapsed_time": time.time() - start_time, } # Add job-type specific information if job_type == "crawlhtml": response.update( { "completed_urls": result.get("completed", 0), "total_urls": result.get("total", 0), }, ) elif job_type == "llmfulltxt" and "data" in result: response.update( { "processed_urls_count": len(result["data"].get("processedUrls", [])), }, ) return response except Exception as e: return {"error": f"Error in wait_for_{job_type}_completion: {str(e)}", "id": job_id} async def _process_crawlhtml_results(crawl_result: Dict[str, Any], output_dir: str) -> int: """Process HTML crawl results by saving HTML files. Args: crawl_result: The result from the completed crawl output_dir: Directory where to save the files Returns: Number of files created """ file_paths = [] # Process crawl_result['data'], which is a list of dicts, each with an 'html' key if "data" in crawl_result and isinstance(crawl_result["data"], list): for i, page_data in enumerate(crawl_result["data"]): # Skip if no HTML content if "html" not in page_data: continue # Get the URL from metadata if available, otherwise use index url = page_data.get("metadata", {}).get("url", f"page-{i}") content = page_data.get("html", f"<html><body>Content for {url}</body></html>") # Clean the URL to create a valid filename filename = _clean_url_to_filename(url) file_path = os.path.join(output_dir, filename) # Write the HTML content to file with open(file_path, "w", encoding="utf-8") as f: f.write(content) file_paths.append(file_path) return len(file_paths) def _clean_url_to_filename(url: str) -> str: """Convert a URL to a valid filename. Args: url: The URL to convert Returns: A valid filename derived from the URL """ # Remove protocol prefixes filename = url.replace("https://", "").replace("http://", "") # Replace special characters with underscores filename = filename.replace("/", "_").replace("?", "_").replace("&", "_") filename = filename.replace(":", "_") # Additional character cleaning # Ensure the filename isn't too long if len(filename) > 200: # Use the domain and a hash of the full URL if too long domain = filename.split("_")[0] filename_hash = hashlib.md5(url.encode()).hexdigest() return f"{domain}_{filename_hash}.html" else: return f"{filename}.html" def _process_llmtxt_results(result: Dict[str, Any], output_dir: str) -> int: """Process llmfull.txt generation results by saving text files. Args: result: The result from the completed job output_dir: Directory where to save the files Returns: Number of files created """ file_count = 0 # Save llmfull.txt to be accurate w firecrawl documentation if "data" in result and "llmsfulltxt" in result["data"]: llmtxt_path = os.path.join(output_dir, "llmfull.txt") with open(llmtxt_path, "w", encoding="utf-8") as f: f.write(result["data"]["llmsfulltxt"]) file_count += 1 return file_count async def cancel_crawlhtml_job( crawl_id: str, ) -> Dict[str, Any]: """Cancel an in-progress Firecrawl HTML crawl job. Args: crawl_id: ID of the crawl job to cancel Returns: Dictionary containing the result of the cancellation """ return await _cancel_job(crawl_id, "crawlhtml") async def cancel_llmtxt_job( job_id: str, ) -> Dict[str, Any]: """Function to cancel an in-progress Firecrawl LLM text generation job. WARNING: This function is NOT SUPPORTED by the underlying Firecrawl API for LLM text generation jobs. It is provided for API consistency only but will fail when called. LLM text generation jobs cannot be cancelled once started and must run to completion. Args: job_id: ID of the LLM text generation job to cancel Returns: Dictionary containing an error message indicating the operation is not supported """ return await _cancel_job(job_id, "llmfulltxt") async def _cancel_job( job_id: str, job_type: Firecrawl_JobType, ) -> Dict[str, Any]: """Generic function to cancel a Firecrawl job. Args: job_id: ID of the job to cancel job_type: Type of job ('crawlhtml' or 'llmtxt') Returns: Dictionary containing the result of the cancellation """ # Get configuration with API key config = _prepare_firecrawl_config() # Check if config contains an error if "error" in config: return {"error": config["error"]} # Special case for LLM text generation jobs - not supported if job_type == "llmfulltxt": return { "id": job_id, "status": "error", "message": ( "Cancelling LLM text generation jobs is not supported." " The job must complete." ), "details": {"status": "error", "reason": "unsupported_operation"}, } else: try: # Initialize the Firecrawl client firecrawl = FirecrawlApp(api_key=config["api_key"]) # Cancel the job result = firecrawl.cancel_crawl(job_id) # Check if the cancellation was successful (result has 'status': 'cancelled') is_successful = result.get("status") == "cancelled" # Return a user-friendly response return { "id": job_id, "status": "cancelled" if is_successful else "error", "message": f"Firecrawl {job_type} job cancelled successfully" if is_successful else "Failed to cancel job", "details": result, } except Exception as e: return {"error": f"Error cancelling {job_type} job: {str(e)}"}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Unstructured-IO/UNS-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server