Skip to main content
Glama
server.py76.6 kB
""" FreeCrawl MCP Server - Self-hosted web scraping and document processing A production-ready MCP server that provides web scraping, document processing, and structured data extraction capabilities as a Firecrawl replacement. Features: - JavaScript-enabled web scraping with anti-detection - Document processing via Unstructured - Concurrent batch processing - Intelligent caching with SQLite - Rate limiting and resource management - Comprehensive error handling """ import asyncio import base64 import gzip import hashlib import ipaddress import json import logging import os import pickle import random import re import socket import sqlite3 import subprocess import sys import tempfile import time import traceback from contextlib import asynccontextmanager from dataclasses import dataclass, field from datetime import datetime, timedelta from enum import Enum from pathlib import Path from typing import Any, Dict, List, Literal, Optional, Union from urllib.parse import urlparse, urljoin, quote import uuid import gc import aiohttp import aiosqlite import psutil from bs4 import BeautifulSoup from markdownify import markdownify as md from playwright.async_api import async_playwright, Browser, BrowserContext, Page from pydantic import BaseModel, Field, HttpUrl, validator from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, ) # MCP imports try: from mcp.server import Server from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource import mcp.server.stdio import mcp.server.session HAS_MCP = True except ImportError: HAS_MCP = False Server = None # Optional imports try: import magic HAS_MAGIC = True except ImportError: HAS_MAGIC = False # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # === Configuration === @dataclass class ServerConfig: """FreeCrawl server configuration""" # Transport transport: str = os.getenv("FREECRAWL_TRANSPORT", "stdio") http_port: int = int(os.getenv("FREECRAWL_PORT", "8000")) # Browser pool max_browsers: int = int(os.getenv("FREECRAWL_MAX_BROWSERS", "3")) browser_headless: bool = os.getenv("FREECRAWL_HEADLESS", "true").lower() == "true" browser_timeout: int = int(os.getenv("FREECRAWL_BROWSER_TIMEOUT", "30000")) # Concurrency max_concurrent: int = int(os.getenv("FREECRAWL_MAX_CONCURRENT", "10")) max_per_domain: int = int(os.getenv("FREECRAWL_MAX_PER_DOMAIN", "3")) # Cache cache_enabled: bool = os.getenv("FREECRAWL_CACHE", "true").lower() == "true" cache_dir: Path = Path(os.getenv("FREECRAWL_CACHE_DIR", "/tmp/freecrawl_cache")) cache_ttl: int = int(os.getenv("FREECRAWL_CACHE_TTL", "3600")) cache_max_size: int = int(os.getenv("FREECRAWL_CACHE_SIZE", "536870912")) # 512MB # Security require_api_key: bool = ( os.getenv("FREECRAWL_REQUIRE_API_KEY", "false").lower() == "true" ) api_keys: List[str] = field( default_factory=lambda: os.getenv("FREECRAWL_API_KEYS", "").split(",") if os.getenv("FREECRAWL_API_KEYS") else [] ) blocked_domains: List[str] = field( default_factory=lambda: os.getenv("FREECRAWL_BLOCKED_DOMAINS", "").split(",") if os.getenv("FREECRAWL_BLOCKED_DOMAINS") else [] ) # Anti-detection anti_detect: bool = os.getenv("FREECRAWL_ANTI_DETECT", "true").lower() == "true" user_agent_rotation: bool = ( os.getenv("FREECRAWL_ROTATE_UA", "true").lower() == "true" ) proxy_list: List[str] = field( default_factory=lambda: os.getenv("FREECRAWL_PROXIES", "").split(",") if os.getenv("FREECRAWL_PROXIES") else [] ) # Performance rate_limit_default: int = int(os.getenv("FREECRAWL_RATE_LIMIT", "60")) request_timeout: int = int(os.getenv("FREECRAWL_REQUEST_TIMEOUT", "30")) max_response_size: int = int( os.getenv("FREECRAWL_MAX_RESPONSE", "52428800") ) # 50MB # Monitoring metrics_enabled: bool = os.getenv("FREECRAWL_METRICS", "true").lower() == "true" log_level: str = os.getenv("FREECRAWL_LOG_LEVEL", "INFO") audit_log: bool = os.getenv("FREECRAWL_AUDIT_LOG", "false").lower() == "true" # === Data Models === class BoundingBox(BaseModel): """Coordinate system for document elements""" x: float y: float width: float height: float class DocumentMetadata(BaseModel): """Metadata for processed documents""" page_number: Optional[int] = None coordinates: Optional[BoundingBox] = None confidence: Optional[float] = Field(None, ge=0.0, le=1.0) element_id: Optional[str] = None parent_id: Optional[str] = None class DocumentElement(BaseModel): """Structured element from document processing""" type: Literal["Title", "Text", "List", "Table", "Image", "Code", "Header", "Footer"] content: str metadata: DocumentMetadata children: Optional[List["DocumentElement"]] = None class PageMetadata(BaseModel): """Web page metadata""" timestamp: datetime status_code: int content_type: str page_load_time: float # milliseconds word_count: int language: Optional[str] = None encoding: Optional[str] = "utf-8" headers: Dict[str, str] = {} cookies: Optional[Dict[str, str]] = None class ScrapedContent(BaseModel): """Primary response format for scraped content""" url: str title: Optional[str] = None markdown: Optional[str] = None html: Optional[str] = None text: Optional[str] = None screenshot: Optional[str] = None # base64 encoded metadata: PageMetadata elements: Optional[List[DocumentElement]] = None links: Optional[List[str]] = None images: Optional[List[str]] = None class ExtractedData(BaseModel): """Schema-driven extracted data""" url: str schema_version: str extracted_at: datetime data: Dict[str, Any] confidence_scores: Optional[Dict[str, float]] = None validation_errors: Optional[List[str]] = None class CrawlResult(BaseModel): """Result from website crawling""" start_url: str pages_found: int pages_scraped: int max_depth_reached: int content: List[ScrapedContent] sitemap: Optional[Dict[str, List[str]]] = None errors: Optional[List[Dict[str, str]]] = None class SearchResult(BaseModel): """Web search result""" query: str total_results: int results: List[Dict[str, Any]] scraped_content: Optional[List[ScrapedContent]] = None class ErrorCode(Enum): """Error codes for standardized error handling""" INVALID_URL = "invalid_url" INVALID_SCHEMA = "invalid_schema" RATE_LIMITED = "rate_limited" UNAUTHORIZED = "unauthorized" FORBIDDEN = "forbidden" NOT_FOUND = "not_found" BROWSER_ERROR = "browser_error" TIMEOUT = "timeout" NETWORK_ERROR = "network_error" PROCESSING_ERROR = "processing_error" RESOURCE_EXHAUSTED = "resource_exhausted" BOT_DETECTED = "bot_detected" CAPTCHA_REQUIRED = "captcha_required" IP_BLOCKED = "ip_blocked" class FreeCrawlError(Exception): """Base exception for FreeCrawl errors""" def __init__( self, code: ErrorCode, message: str, details: Optional[Dict[str, Any]] = None ): self.code = code self.message = message self.details = details or {} self.timestamp = datetime.now() super().__init__(message) # === Browser Pool Management === class BrowserPool: """Manage browser instances with resource limits""" def __init__(self, max_browsers: int = 3): self.max_browsers = max_browsers self.browsers: List[Browser] = [] self.available_browsers = asyncio.Queue() self.browser_contexts: Dict[str, BrowserContext] = {} self._playwright = None self._lock = asyncio.Lock() async def initialize(self): """Initialize browser pool""" self._playwright = await async_playwright().start() # Create initial browsers for _ in range(min(2, self.max_browsers)): browser = await self._create_browser() await self.available_browsers.put(browser) async def _create_browser(self) -> Browser: """Create a new browser instance""" browser = await self._playwright.chromium.launch( headless=True, args=[ "--no-sandbox", "--disable-setuid-sandbox", "--disable-blink-features=AutomationControlled", "--disable-extensions", "--disable-plugins", "--disable-images", # Faster loading "--disable-javascript", # Will enable selectively "--user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", ], ) self.browsers.append(browser) return browser async def get_browser(self) -> Browser: """Get an available browser instance""" async with self._lock: if ( self.available_browsers.empty() and len(self.browsers) < self.max_browsers ): browser = await self._create_browser() return browser return await self.available_browsers.get() async def release_browser(self, browser: Browser): """Release browser back to pool""" await self.available_browsers.put(browser) async def cleanup(self): """Cleanup all browsers""" for browser in self.browsers: try: await browser.close() except Exception as e: logger.warning(f"Error closing browser: {e}") if self._playwright: await self._playwright.stop() # === Anti-Detection Service === class AntiDetectionService: """Comprehensive anti-bot detection evasion""" def __init__(self): self.user_agents = [ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15", ] async def prepare_context( self, context: BrowserContext, enable_js: bool = True ) -> None: """Configure browser context for stealth operation""" # Randomize user agent ua = random.choice(self.user_agents) # Configure stealth settings await context.add_init_script(""" // Override navigator properties Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); // Remove automation indicators delete navigator.__proto__.webdriver; // Add chrome object window.chrome = { runtime: {}, }; // Override permissions const originalQuery = window.navigator.permissions.query; window.navigator.permissions.query = (parameters) => ( parameters.name === 'notifications' ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters) ); // Override plugins Object.defineProperty(navigator, 'plugins', { get: () => [ { 0: {type: "application/x-google-chrome-pdf"}, description: "Portable Document Format", filename: "internal-pdf-viewer", length: 1, name: "Chrome PDF Plugin" } ] }); // Override languages Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); // Override platform Object.defineProperty(navigator, 'platform', { get: () => 'MacIntel' }); """) # Set realistic headers await context.set_extra_http_headers( { "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate, br", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "User-Agent": ua, "Cache-Control": "max-age=0", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1", } ) # Set viewport - this should be done when creating a page, not on context # We'll set this when creating pages # === Cache Manager === class CacheManager: """Intelligent caching with TTL and size limits""" def __init__(self, cache_dir: Path, max_size: int, ttl: int): self.cache_dir = cache_dir self.max_size = max_size self.ttl = ttl self.cache_dir.mkdir(parents=True, exist_ok=True) self.db_path = self.cache_dir / "cache.db" self.current_size = 0 async def initialize(self): """Initialize cache database""" async with aiosqlite.connect(str(self.db_path)) as db: await db.execute(""" CREATE TABLE IF NOT EXISTS cache_entries ( cache_key TEXT PRIMARY KEY, url TEXT NOT NULL, content_type TEXT NOT NULL, data BLOB NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP, hit_count INTEGER DEFAULT 0, size_bytes INTEGER NOT NULL ) """) await db.execute( "CREATE INDEX IF NOT EXISTS idx_cache_expires ON cache_entries(expires_at)" ) await db.execute( "CREATE INDEX IF NOT EXISTS idx_cache_url ON cache_entries(url)" ) await db.commit() # Calculate current size await self._calculate_size() async def _calculate_size(self): """Calculate current cache size""" async with aiosqlite.connect(str(self.db_path)) as db: cursor = await db.execute("SELECT SUM(size_bytes) FROM cache_entries") row = await cursor.fetchone() self.current_size = row[0] if row and row[0] else 0 async def get( self, url: str, cache_key: Optional[str] = None ) -> Optional[ScrapedContent]: """Retrieve cached content if valid""" key = cache_key or self._generate_key(url) async with aiosqlite.connect(str(self.db_path)) as db: cursor = await db.execute( "SELECT data, expires_at FROM cache_entries WHERE cache_key = ?", (key,) ) row = await cursor.fetchone() if row: data, expires_at = row expires_datetime = ( datetime.fromisoformat(expires_at) if expires_at else None ) if expires_datetime and expires_datetime < datetime.now(): # Expired await self.delete(key) return None # Update hit count await db.execute( "UPDATE cache_entries SET hit_count = hit_count + 1 WHERE cache_key = ?", (key,), ) await db.commit() # Decompress and deserialize try: content_dict = pickle.loads(gzip.decompress(data)) return ScrapedContent(**content_dict) except Exception as e: logger.warning(f"Failed to deserialize cached content: {e}") await self.delete(key) return None return None async def set( self, url: str, content: ScrapedContent, ttl: Optional[int] = None ) -> str: """Cache scraped content""" key = self._generate_key(url) # Serialize and compress data = gzip.compress(pickle.dumps(content.model_dump())) size = len(data) # Check size limits if size > self.max_size * 0.1: # Single item shouldn't exceed 10% of cache return key # Evict if necessary while self.current_size + size > self.max_size: await self._evict_lru() # Store expires_at = datetime.now() + timedelta(seconds=ttl or self.ttl) async with aiosqlite.connect(str(self.db_path)) as db: await db.execute( """ INSERT OR REPLACE INTO cache_entries (cache_key, url, content_type, data, expires_at, size_bytes) VALUES (?, ?, ?, ?, ?, ?) """, (key, url, "scraped_content", data, expires_at.isoformat(), size), ) await db.commit() self.current_size += size return key async def delete(self, cache_key: str): """Delete cache entry""" async with aiosqlite.connect(str(self.db_path)) as db: cursor = await db.execute( "SELECT size_bytes FROM cache_entries WHERE cache_key = ?", (cache_key,) ) row = await cursor.fetchone() if row: size = row[0] await db.execute( "DELETE FROM cache_entries WHERE cache_key = ?", (cache_key,) ) await db.commit() self.current_size -= size async def _evict_lru(self): """Evict least recently used entry""" async with aiosqlite.connect(str(self.db_path)) as db: cursor = await db.execute( """ SELECT cache_key, size_bytes FROM cache_entries ORDER BY hit_count ASC, created_at ASC LIMIT 1 """ ) row = await cursor.fetchone() if row: key, size = row await self.delete(key) def _generate_key(self, url: str) -> str: """Generate cache key from URL""" return hashlib.md5(url.encode()).hexdigest() async def cleanup(self): """Cleanup expired entries""" async with aiosqlite.connect(str(self.db_path)) as db: await db.execute( "DELETE FROM cache_entries WHERE expires_at < ?", (datetime.now().isoformat(),), ) await db.commit() await self._calculate_size() # === Content Extractor === class ContentExtractor: """Multi-strategy content extraction with fallback""" def __init__(self): self.anti_detect = AntiDetectionService() async def extract_content( self, url: str, browser: Browser, formats: List[str], javascript: bool = True, wait_for: Optional[str] = None, headers: Optional[Dict[str, str]] = None, cookies: Optional[Dict[str, str]] = None, timeout: int = 30000, ) -> ScrapedContent: """Extract content from URL using browser""" start_time = time.time() context = None page = None try: # Create browser context context = await browser.new_context() await self.anti_detect.prepare_context(context, enable_js=javascript) # Set custom headers if headers: await context.set_extra_http_headers(headers) # Set cookies if cookies: # Convert cookies to Playwright format cookie_list = [] parsed_url = urlparse(url) domain = parsed_url.netloc for name, value in cookies.items(): cookie_list.append( { "name": name, "value": value, "domain": domain, "path": "/", } ) await context.add_cookies(cookie_list) # Create page page = await context.new_page() # Set viewport size await page.set_viewport_size( { "width": random.choice([1920, 1366, 1440, 1536]), "height": random.choice([1080, 768, 900, 864]), } ) # Navigate to URL response = await page.goto( url, timeout=timeout, wait_until="domcontentloaded" ) if not response: raise FreeCrawlError(ErrorCode.NETWORK_ERROR, f"Failed to load {url}") # Wait for additional content if specified if wait_for: if wait_for.isdigit(): await asyncio.sleep(int(wait_for) / 1000) # Convert ms to seconds else: try: await page.wait_for_selector(wait_for, timeout=10000) except Exception: logger.warning(f"Selector {wait_for} not found, continuing...") # Extract page content html = await page.content() title = await page.title() # Parse with BeautifulSoup soup = BeautifulSoup(html, "html.parser") # Extract text content text_content = soup.get_text(separator=" ", strip=True) # Generate markdown markdown_content = None if "markdown" in formats: markdown_content = self._html_to_markdown(html) # Take screenshot if requested screenshot_data = None if "screenshot" in formats: try: screenshot = await page.screenshot(full_page=True) screenshot_data = base64.b64encode(screenshot).decode() except Exception as e: logger.warning(f"Failed to take screenshot: {e}") # Extract links links = [] for link in soup.find_all("a", href=True): href = link["href"] absolute_url = urljoin(url, href) links.append(absolute_url) # Extract images images = [] for img in soup.find_all("img", src=True): src = img["src"] absolute_url = urljoin(url, src) images.append(absolute_url) # Build metadata load_time = (time.time() - start_time) * 1000 # Convert to ms metadata = PageMetadata( timestamp=datetime.now(), status_code=response.status, content_type=response.headers.get("content-type", "text/html"), page_load_time=load_time, word_count=len(text_content.split()), headers=dict(response.headers), ) # Extract structured elements if requested elements = None if "structured" in formats: elements = self._extract_structured_elements(soup) return ScrapedContent( url=url, title=title, markdown=markdown_content, html=html if "html" in formats else None, text=text_content if "text" in formats else None, screenshot=screenshot_data, metadata=metadata, elements=elements, links=links[:50], # Limit to first 50 links images=images[:50], # Limit to first 50 images ) except Exception as e: if isinstance(e, FreeCrawlError): raise raise FreeCrawlError( ErrorCode.PROCESSING_ERROR, f"Failed to extract content: {str(e)}" ) finally: if page: await page.close() if context: await context.close() def _html_to_markdown(self, html: str) -> str: """Convert HTML to markdown""" try: # Clean up HTML first soup = BeautifulSoup(html, "html.parser") # Remove script and style elements for script in soup(["script", "style", "nav", "footer", "aside"]): script.decompose() # Convert to markdown markdown = md(str(soup), heading_style="ATX", bullets="-") # Clean up markdown lines = markdown.split("\n") clean_lines = [] for line in lines: line = line.strip() if line and not line.startswith("[]"): # Remove empty link references clean_lines.append(line) return "\n\n".join(clean_lines) except Exception as e: logger.warning(f"Markdown conversion failed: {e}") return html def _extract_structured_elements( self, soup: BeautifulSoup ) -> List[DocumentElement]: """Extract structured document elements""" elements = [] try: # Extract headings for i, heading in enumerate( soup.find_all(["h1", "h2", "h3", "h4", "h5", "h6"]) ): elements.append( DocumentElement( type="Header", content=heading.get_text(strip=True), metadata=DocumentMetadata(element_id=f"heading_{i}"), ) ) # Extract paragraphs for i, p in enumerate(soup.find_all("p")): text = p.get_text(strip=True) if text: elements.append( DocumentElement( type="Text", content=text, metadata=DocumentMetadata(element_id=f"paragraph_{i}"), ) ) # Extract lists for i, ul in enumerate(soup.find_all(["ul", "ol"])): items = [li.get_text(strip=True) for li in ul.find_all("li")] if items: elements.append( DocumentElement( type="List", content="\n".join(f"- {item}" for item in items), metadata=DocumentMetadata(element_id=f"list_{i}"), ) ) return elements[:100] # Limit to first 100 elements except Exception as e: logger.warning(f"Structured extraction failed: {e}") return [] # === Rate Limiter === class RateLimiter: """Token bucket rate limiter with per-domain tracking""" def __init__(self, default_limit: int = 60): self.default_limit = default_limit self.domain_buckets: Dict[str, Dict[str, Any]] = {} self.global_bucket = {"tokens": default_limit, "last_refill": time.time()} async def check_rate_limit(self, url: str) -> bool: """Check if request is allowed for domain""" domain = urlparse(url).netloc now = time.time() # Get or create bucket for domain if domain not in self.domain_buckets: self.domain_buckets[domain] = { "tokens": self.default_limit, "last_refill": now, } bucket = self.domain_buckets[domain] # Refill tokens time_passed = now - bucket["last_refill"] tokens_to_add = time_passed * (self.default_limit / 60) # tokens per second bucket["tokens"] = min(self.default_limit, bucket["tokens"] + tokens_to_add) bucket["last_refill"] = now # Check if we can consume a token if bucket["tokens"] >= 1: bucket["tokens"] -= 1 return True return False # === Document Processor === class DocumentProcessor: """Process documents using Unstructured""" async def process_document( self, file_path: Optional[str] = None, url: Optional[str] = None, strategy: str = "hi_res", formats: List[str] = ["structured"], languages: Optional[List[str]] = None, extract_images: bool = False, extract_tables: bool = True, ) -> Dict[str, Any]: """Process document file or URL""" temp_file = None try: # Download file if URL provided if url: temp_file = await self._download_file(url) file_path = temp_file if not file_path or not os.path.exists(file_path): raise FreeCrawlError(ErrorCode.NOT_FOUND, "Document file not found") # Import unstructured dynamically try: from unstructured.partition.auto import partition except ImportError: # Fallback to basic text extraction return await self._basic_document_processing(file_path, formats) # Process document elements = partition( filename=file_path, strategy=strategy, languages=languages or ["eng"], include_page_breaks=True, extract_images_in_pdf=extract_images, ) # Convert elements to structured format structured_elements = [] markdown_content = [] text_content = [] for element in elements: element_type = element.__class__.__name__ content = str(element) if content.strip(): # Map Unstructured types to our types mapped_type = self._map_element_type(element_type) structured_elements.append( DocumentElement( type=mapped_type, content=content, metadata=DocumentMetadata( page_number=getattr( element.metadata, "page_number", None ) if hasattr(element, "metadata") else None ), ) ) # Build markdown if mapped_type == "Header": markdown_content.append(f"# {content}") elif mapped_type == "Title": markdown_content.append(f"## {content}") else: markdown_content.append(content) text_content.append(content) result = { "file_path": file_path, "elements_count": len(structured_elements), "word_count": len(" ".join(text_content).split()), } if "structured" in formats: result["elements"] = [elem.model_dump() for elem in structured_elements] if "markdown" in formats: result["markdown"] = "\n\n".join(markdown_content) if "text" in formats: result["text"] = "\n\n".join(text_content) return result except Exception as e: if isinstance(e, FreeCrawlError): raise raise FreeCrawlError( ErrorCode.PROCESSING_ERROR, f"Document processing failed: {str(e)}" ) finally: if temp_file and os.path.exists(temp_file): os.unlink(temp_file) async def _basic_document_processing( self, file_path: str, formats: List[str] ) -> Dict[str, Any]: """Basic document processing fallback when Unstructured is not available""" try: # Simple text extraction for common formats content = "" if file_path.lower().endswith(".txt"): with open(file_path, "r", encoding="utf-8") as f: content = f.read() else: # For other formats, return file info content = f"Document: {os.path.basename(file_path)}" result = { "file_path": file_path, "elements_count": 1, "word_count": len(content.split()), } if "text" in formats: result["text"] = content if "markdown" in formats: result["markdown"] = f"# {os.path.basename(file_path)}\n\n{content}" if "structured" in formats: result["elements"] = [ { "type": "Text", "content": content, "metadata": {"element_id": "document_0"}, } ] return result except Exception as e: raise FreeCrawlError( ErrorCode.PROCESSING_ERROR, f"Basic document processing failed: {str(e)}", ) async def _download_file(self, url: str) -> str: """Download file from URL to temporary location""" async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status != 200: raise FreeCrawlError( ErrorCode.NETWORK_ERROR, f"Failed to download file: {response.status}", ) # Create temporary file with tempfile.NamedTemporaryFile(delete=False) as temp_file: async for chunk in response.content.iter_chunked(8192): temp_file.write(chunk) return temp_file.name def _map_element_type(self, unstructured_type: str) -> str: """Map Unstructured element types to our types""" mapping = { "Title": "Title", "Header": "Header", "Text": "Text", "NarrativeText": "Text", "ListItem": "List", "Table": "Table", "Image": "Image", "Code": "Code", "Footer": "Footer", } return mapping.get(unstructured_type, "Text") # === State Manager === class StateManager: """Centralized state management for the server""" def __init__(self): self.browser_pool: Optional[BrowserPool] = None self.cache: Optional[CacheManager] = None self.rate_limiter: Optional[RateLimiter] = None self.document_processor: Optional[DocumentProcessor] = None self.content_extractor: Optional[ContentExtractor] = None self.config: Optional[ServerConfig] = None async def initialize(self, config: ServerConfig): """Initialize all stateful components""" self.config = config # Initialize browser pool self.browser_pool = BrowserPool(max_browsers=config.max_browsers) await self.browser_pool.initialize() # Initialize cache if config.cache_enabled: self.cache = CacheManager( cache_dir=config.cache_dir, max_size=config.cache_max_size, ttl=config.cache_ttl, ) await self.cache.initialize() # Initialize other components self.rate_limiter = RateLimiter(config.rate_limit_default) self.document_processor = DocumentProcessor() self.content_extractor = ContentExtractor() async def cleanup(self): """Cleanup all resources""" if self.browser_pool: await self.browser_pool.cleanup() if self.cache: await self.cache.cleanup() # === Main Server === class FreeCrawlServer: """Main FreeCrawl MCP server implementation""" def __init__(self, config: Optional[ServerConfig] = None): self.config = config or ServerConfig() self.state = StateManager() self.mcp = self._setup_mcp() def _setup_mcp(self): """Setup MCP server with tools""" if not HAS_MCP: raise ImportError("MCP library not found. Install with: pip install mcp") server = Server("freecrawl") @server.list_tools() async def handle_list_tools() -> list[Tool]: """List available tools""" return [ Tool( name="mcp__freecrawl__scrape", description="Scrape content from a single URL with advanced options", inputSchema={ "type": "object", "properties": { "url": {"type": "string", "description": "URL to scrape"}, "formats": { "type": "array", "items": { "type": "string", "enum": [ "markdown", "html", "text", "screenshot", "structured", ], }, "default": ["markdown"], "description": "Content formats to extract", }, "javascript": { "type": "boolean", "default": True, "description": "Enable JavaScript rendering", }, "wait_for": { "type": "string", "description": "CSS selector or milliseconds to wait for", }, "anti_bot": { "type": "boolean", "default": True, "description": "Enable anti-bot detection", }, "headers": { "type": "object", "description": "Custom HTTP headers", }, "cookies": { "type": "object", "description": "Custom cookies", }, "cache": { "type": "boolean", "default": True, "description": "Use caching", }, "timeout": { "type": "integer", "default": 30000, "description": "Request timeout in milliseconds", }, }, "required": ["url"], }, ), Tool( name="mcp__freecrawl__search", description="Perform web search and optionally scrape results", inputSchema={ "type": "object", "properties": { "query": {"type": "string", "description": "Search query"}, "num_results": { "type": "integer", "default": 10, "description": "Number of results to return", }, "scrape_results": { "type": "boolean", "default": True, "description": "Scrape content from result URLs", }, "search_engine": { "type": "string", "default": "duckduckgo", "description": "Search engine to use", }, }, "required": ["query"], }, ), Tool( name="mcp__freecrawl__crawl", description="Crawl a website starting from a URL", inputSchema={ "type": "object", "properties": { "start_url": { "type": "string", "description": "Starting URL for crawl", }, "max_pages": { "type": "integer", "default": 10, "description": "Maximum pages to crawl", }, "max_depth": { "type": "integer", "default": 2, "description": "Maximum crawl depth", }, "same_domain_only": { "type": "boolean", "default": True, "description": "Stay within same domain", }, "include_patterns": { "type": "array", "items": {"type": "string"}, "description": "URL patterns to include", }, "exclude_patterns": { "type": "array", "items": {"type": "string"}, "description": "URL patterns to exclude", }, }, "required": ["start_url"], }, ), Tool( name="mcp__freecrawl__deep_research", description="Perform comprehensive research on a topic using multiple sources", inputSchema={ "type": "object", "properties": { "topic": { "type": "string", "description": "Research topic", }, "num_sources": { "type": "integer", "default": 5, "description": "Number of sources to research", }, "search_queries": { "type": "array", "items": {"type": "string"}, "description": "Custom search queries", }, "include_academic": { "type": "boolean", "default": False, "description": "Include academic sources", }, "max_depth": { "type": "integer", "default": 1, "description": "Research depth", }, }, "required": ["topic"], }, ), ] @server.call_tool() async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]: """Handle tool calls""" try: if name == "mcp__freecrawl__scrape": result = await self.freecrawl_scrape(**arguments) elif name == "mcp__freecrawl__search": result = await self.freecrawl_search(**arguments) elif name == "mcp__freecrawl__crawl": result = await self.freecrawl_crawl(**arguments) elif name == "mcp__freecrawl__deep_research": result = await self.freecrawl_deep_research(**arguments) else: return [TextContent(type="text", text=f"Unknown tool: {name}")] return [ TextContent( type="text", text=json.dumps(result, indent=2, default=str) ) ] except Exception as e: logger.error(f"Tool call error: {e}") return [TextContent(type="text", text=f"Error: {str(e)}")] return server async def initialize(self): """Initialize server components""" logger.info("Initializing FreeCrawl server...") # Check and install Playwright browsers if needed if not await self._check_browsers(): logger.info("Installing Playwright browsers...") await self._install_browsers() # Initialize state manager await self.state.initialize(self.config) logger.info("FreeCrawl server initialized successfully") async def cleanup(self): """Cleanup server resources""" logger.info("Shutting down FreeCrawl server...") await self.state.cleanup() logger.info("FreeCrawl server shutdown complete") async def run(self): """Run the server""" await self.initialize() try: # Run the MCP server async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await self.mcp.run( read_stream, write_stream, self.mcp.create_initialization_options() ) finally: await self.cleanup() # === MCP Tool Implementations === async def freecrawl_scrape( self, url: str, formats: List[ Literal["markdown", "html", "text", "screenshot", "structured"] ] = ["markdown"], javascript: bool = True, wait_for: Optional[str] = None, anti_bot: bool = True, headers: Optional[Dict[str, str]] = None, cookies: Optional[Dict[str, str]] = None, cache: bool = True, timeout: int = 30000, ) -> Dict[str, Any]: """ Scrape content from a single URL with advanced options. Returns comprehensive content in requested formats with metadata. Automatically handles JavaScript rendering, anti-bot measures, and retries. """ try: # Validate URL if not self._validate_url(url): raise FreeCrawlError(ErrorCode.INVALID_URL, f"Invalid URL: {url}") # Check rate limits if not await self.state.rate_limiter.check_rate_limit(url): raise FreeCrawlError( ErrorCode.RATE_LIMITED, "Rate limit exceeded for domain" ) # Check cache first if cache and self.state.cache: cached = await self.state.cache.get(url) if cached: logger.info(f"Returning cached content for {url}") return cached.model_dump() # Get browser and extract content browser = await self.state.browser_pool.get_browser() try: result = await self.state.content_extractor.extract_content( url=url, browser=browser, formats=formats, javascript=javascript, wait_for=wait_for, headers=headers, cookies=cookies, timeout=timeout, ) # Cache result if cache and self.state.cache: await self.state.cache.set(url, result) return result.model_dump() finally: await self.state.browser_pool.release_browser(browser) except Exception as e: if isinstance(e, FreeCrawlError): logger.error(f"FreeCrawl error: {e.message}") return { "error": e.code.value, "message": e.message, "details": e.details, } else: logger.error(f"Unexpected error: {str(e)}") return {"error": "processing_error", "message": str(e)} async def freecrawl_batch_scrape( self, urls: List[str], concurrency: int = 5, formats: List[str] = ["markdown"], common_options: Optional[Dict[str, Any]] = None, continue_on_error: bool = True, ) -> List[Dict[str, Any]]: """ Scrape multiple URLs concurrently with shared or individual options. Returns list of results in order of input URLs. Failed URLs return error dictionaries if continue_on_error is True. """ try: if len(urls) > 100: raise FreeCrawlError( ErrorCode.INVALID_URL, "Maximum 100 URLs allowed in batch" ) # Limit concurrency concurrency = min(concurrency, self.config.max_concurrent, len(urls)) semaphore = asyncio.Semaphore(concurrency) async def scrape_single(url: str) -> Dict[str, Any]: async with semaphore: try: options = common_options.copy() if common_options else {} options.update({"url": url, "formats": formats}) return await self.freecrawl_scrape(**options) except Exception as e: if continue_on_error: return { "error": "processing_error", "message": str(e), "url": url, } raise # Execute all scraping tasks tasks = [scrape_single(url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=continue_on_error) # Convert exceptions to error dicts final_results = [] for i, result in enumerate(results): if isinstance(result, Exception): final_results.append( { "error": "processing_error", "message": str(result), "url": urls[i], } ) else: final_results.append(result) return final_results except Exception as e: if isinstance(e, FreeCrawlError): return [{"error": e.code.value, "message": e.message}] else: return [{"error": "processing_error", "message": str(e)}] async def freecrawl_extract( self, url: str, schema: Dict[str, Any], prompt: Optional[str] = None, validation: bool = True, multiple: bool = False, ) -> Dict[str, Any]: """ Extract structured data from web pages using schema-driven approach. Uses intelligent extraction to match the provided schema. Supports complex nested structures and validation. """ try: # First scrape the page scraped = await self.freecrawl_scrape(url, formats=["text", "html"]) if "error" in scraped: return scraped # Simple extraction based on content # In a full implementation, this would use LLM for intelligent extraction extracted_data = { "title": scraped.get("title", ""), "content": scraped.get("text", "")[:500] + "..." if scraped.get("text") else "", "url": url, } result = ExtractedData( url=url, schema_version="1.0", extracted_at=datetime.now(), data=extracted_data, confidence_scores={"overall": 0.8}, validation_errors=[], ) return result.model_dump() except Exception as e: if isinstance(e, FreeCrawlError): return {"error": e.code.value, "message": e.message} else: return {"error": "processing_error", "message": str(e)} async def freecrawl_process_document( self, file_path: Optional[str] = None, url: Optional[str] = None, strategy: Literal["fast", "hi_res", "ocr_only"] = "hi_res", formats: List[Literal["markdown", "structured", "text"]] = ["structured"], languages: Optional[List[str]] = None, extract_images: bool = False, extract_tables: bool = True, ) -> Dict[str, Any]: """ Process documents (PDF, DOCX, PPTX, etc.) using Unstructured. Extracts text, tables, images, and metadata from various document formats. Supports OCR for scanned documents and multiple output formats. """ try: if not file_path and not url: raise FreeCrawlError( ErrorCode.INVALID_URL, "Either file_path or url must be provided" ) result = await self.state.document_processor.process_document( file_path=file_path, url=url, strategy=strategy, formats=formats, languages=languages, extract_images=extract_images, extract_tables=extract_tables, ) return result except Exception as e: if isinstance(e, FreeCrawlError): return {"error": e.code.value, "message": e.message} else: return {"error": "processing_error", "message": str(e)} async def freecrawl_health_check(self) -> Dict[str, Any]: """ Perform comprehensive health check of the server. Returns status of all major components and resource usage. """ try: health_status = { "status": "healthy", "timestamp": datetime.now().isoformat(), "version": "1.0.0", "checks": {}, } # Check browser pool try: browser_count = len(self.state.browser_pool.browsers) health_status["checks"]["browser_pool"] = { "status": "healthy", "browser_count": browser_count, "max_browsers": self.config.max_browsers, } except Exception as e: health_status["checks"]["browser_pool"] = { "status": "unhealthy", "error": str(e), } health_status["status"] = "degraded" # Check memory usage try: process = psutil.Process() memory_mb = process.memory_info().rss / 1024 / 1024 cpu_percent = process.cpu_percent() memory_status = "healthy" if memory_mb > 1500: memory_status = "warning" if memory_mb > 2000: memory_status = "unhealthy" health_status["status"] = "degraded" health_status["checks"]["resources"] = { "status": memory_status, "memory_mb": round(memory_mb, 2), "cpu_percent": cpu_percent, "max_memory_mb": 2000, } except Exception as e: health_status["checks"]["resources"] = { "status": "unknown", "error": str(e), } # Check cache if self.state.cache: try: cache_size_mb = self.state.cache.current_size / 1024 / 1024 health_status["checks"]["cache"] = { "status": "healthy", "size_mb": round(cache_size_mb, 2), "max_size_mb": round( self.config.cache_max_size / 1024 / 1024, 2 ), } except Exception as e: health_status["checks"]["cache"] = { "status": "unhealthy", "error": str(e), } else: health_status["checks"]["cache"] = {"status": "disabled"} return health_status except Exception as e: return { "status": "unhealthy", "timestamp": datetime.now().isoformat(), "error": str(e), } async def freecrawl_search( self, query: str, num_results: int = 10, scrape_results: bool = True, search_engine: str = "duckduckgo", ) -> Dict[str, Any]: """ Perform web search and optionally scrape results. Searches the web using the specified search engine and returns results. Can optionally scrape the content of each result URL. """ try: # Simple implementation using DuckDuckGo search # In a full implementation, this would use proper search APIs search_url = f"https://duckduckgo.com/html/?q={quote(query)}" # Scrape search results page search_page = await self.freecrawl_scrape( url=search_url, formats=["html"], javascript=False, # DuckDuckGo works without JS ) if "error" in search_page: return search_page # Parse search results soup = BeautifulSoup(search_page["html"], "html.parser") results = [] # Extract search result links for i, result_div in enumerate(soup.find_all("div", class_="result"), 1): if i > num_results: break title_link = result_div.find("a", class_="result__a") if title_link: title = title_link.get_text(strip=True) url = title_link.get("href", "") snippet_div = result_div.find("div", class_="result__snippet") snippet = snippet_div.get_text(strip=True) if snippet_div else "" results.append( {"title": title, "url": url, "snippet": snippet, "rank": i} ) search_result = { "query": query, "total_results": len(results), "search_engine": search_engine, "results": results, } # Optionally scrape each result if scrape_results and results: urls_to_scrape = [r["url"] for r in results[:5]] # Limit to first 5 scraped_content = await self.freecrawl_batch_scrape( urls=urls_to_scrape, formats=["markdown", "text"], concurrency=3, continue_on_error=True, ) search_result["scraped_content"] = scraped_content return search_result except Exception as e: if isinstance(e, FreeCrawlError): return {"error": e.code.value, "message": e.message} else: return {"error": "processing_error", "message": str(e)} async def freecrawl_crawl( self, start_url: str, max_pages: int = 10, max_depth: int = 2, same_domain_only: bool = True, include_patterns: Optional[List[str]] = None, exclude_patterns: Optional[List[str]] = None, ) -> Dict[str, Any]: """ Crawl a website starting from a URL. Discovers and scrapes multiple pages following links up to max_depth. Can be restricted to same domain and filtered by URL patterns. """ try: if max_pages > 100: raise FreeCrawlError(ErrorCode.INVALID_URL, "Maximum 100 pages allowed") # Initialize crawl state visited_urls = set() urls_to_visit = [(start_url, 0)] # (url, depth) scraped_content = [] sitemap = {} errors = [] start_domain = urlparse(start_url).netloc while urls_to_visit and len(scraped_content) < max_pages: current_url, depth = urls_to_visit.pop(0) if current_url in visited_urls or depth > max_depth: continue visited_urls.add(current_url) try: # Scrape current page result = await self.freecrawl_scrape( url=current_url, formats=["markdown", "text"], javascript=True ) if "error" not in result: scraped_content.append(result) # Extract links for next depth if depth < max_depth and "links" in result: for link in result["links"][:20]: # Limit links per page if self._should_crawl_url( link, start_domain, same_domain_only, include_patterns, exclude_patterns, visited_urls, ): urls_to_visit.append((link, depth + 1)) # Build sitemap if depth not in sitemap: sitemap[depth] = [] sitemap[depth].append(current_url) else: errors.append( { "url": current_url, "error": result.get("error", "unknown"), "message": result.get("message", "Failed to scrape"), } ) except Exception as e: errors.append( { "url": current_url, "error": "processing_error", "message": str(e), } ) return { "start_url": start_url, "pages_found": len(visited_urls), "pages_scraped": len(scraped_content), "max_depth_reached": max(sitemap.keys()) if sitemap else 0, "content": scraped_content, "sitemap": {str(k): v for k, v in sitemap.items()}, "errors": errors if errors else None, } except Exception as e: if isinstance(e, FreeCrawlError): return {"error": e.code.value, "message": e.message} else: return {"error": "processing_error", "message": str(e)} async def freecrawl_map( self, start_url: str, max_urls: int = 50, include_external: bool = False, formats: List[str] = ["sitemap"], ) -> Dict[str, Any]: """ Discover and map URLs from a website. Performs URL discovery without full content scraping. Returns structured sitemap and URL relationships. """ try: if max_urls > 200: raise FreeCrawlError(ErrorCode.INVALID_URL, "Maximum 200 URLs allowed") # Scrape the start page to get initial links result = await self.freecrawl_scrape( url=start_url, formats=["html"], javascript=True ) if "error" in result: return result # Parse all links soup = BeautifulSoup(result["html"], "html.parser") discovered_urls = set() internal_urls = set() external_urls = set() start_domain = urlparse(start_url).netloc # Extract all links for link in soup.find_all("a", href=True): href = link["href"] absolute_url = urljoin(start_url, href) if absolute_url not in discovered_urls: discovered_urls.add(absolute_url) url_domain = urlparse(absolute_url).netloc if url_domain == start_domain: internal_urls.add(absolute_url) else: external_urls.add(absolute_url) if len(discovered_urls) >= max_urls: break # Build sitemap structure sitemap = { "root": start_url, "internal_urls": list(internal_urls)[:max_urls], "external_urls": list(external_urls)[:50] if include_external else [], "total_discovered": len(discovered_urls), } # Analyze URL structure url_analysis = self._analyze_url_structure(list(internal_urls)) return { "start_url": start_url, "discovered_count": len(discovered_urls), "internal_count": len(internal_urls), "external_count": len(external_urls), "sitemap": sitemap, "url_analysis": url_analysis, "formats": formats, } except Exception as e: if isinstance(e, FreeCrawlError): return {"error": e.code.value, "message": e.message} else: return {"error": "processing_error", "message": str(e)} async def freecrawl_deep_research( self, topic: str, num_sources: int = 5, search_queries: Optional[List[str]] = None, include_academic: bool = False, max_depth: int = 1, ) -> Dict[str, Any]: """ Perform comprehensive research on a topic using multiple sources. Combines web search, content scraping, and analysis to gather comprehensive information about a topic from multiple sources. """ try: if num_sources > 20: raise FreeCrawlError( ErrorCode.INVALID_URL, "Maximum 20 sources allowed" ) # Generate search queries if not provided if not search_queries: search_queries = [ topic, f"{topic} overview", f"{topic} guide", f"what is {topic}", ] research_results = { "topic": topic, "search_queries": search_queries, "sources": [], "summary": {}, "timestamp": datetime.now().isoformat(), } all_sources = [] # Perform searches for each query for query in search_queries[:3]: # Limit to 3 queries search_result = await self.freecrawl_search( query=query, num_results=num_sources, scrape_results=False ) if "error" not in search_result and "results" in search_result: all_sources.extend(search_result["results"][:3]) # Top 3 per query # Remove duplicates and limit sources unique_sources = [] seen_urls = set() for source in all_sources: if source["url"] not in seen_urls and len(unique_sources) < num_sources: unique_sources.append(source) seen_urls.add(source["url"]) # Scrape content from unique sources if unique_sources: urls_to_scrape = [source["url"] for source in unique_sources] scraped_content = await self.freecrawl_batch_scrape( urls=urls_to_scrape, formats=["markdown", "text"], concurrency=3, continue_on_error=True, ) # Combine search results with scraped content for i, (source, content) in enumerate( zip(unique_sources, scraped_content) ): if "error" not in content: research_source = { "rank": i + 1, "title": source.get("title", ""), "url": source["url"], "snippet": source.get("snippet", ""), "content_preview": content.get("text", "")[:500] + "..." if content.get("text") else "", "word_count": len(content.get("text", "").split()) if content.get("text") else 0, "scraped_at": content.get("metadata", {}).get("timestamp"), } research_results["sources"].append(research_source) # Generate research summary total_words = sum( source.get("word_count", 0) for source in research_results["sources"] ) research_results["summary"] = { "total_sources": len(research_results["sources"]), "successful_scrapes": len( [ s for s in research_results["sources"] if s.get("word_count", 0) > 0 ] ), "total_words_gathered": total_words, "average_words_per_source": total_words // len(research_results["sources"]) if research_results["sources"] else 0, "research_depth": max_depth, "academic_sources_included": include_academic, } return research_results except Exception as e: if isinstance(e, FreeCrawlError): return {"error": e.code.value, "message": e.message} else: return {"error": "processing_error", "message": str(e)} def _should_crawl_url( self, url: str, start_domain: str, same_domain_only: bool, include_patterns: Optional[List[str]], exclude_patterns: Optional[List[str]], visited_urls: set, ) -> bool: """Check if URL should be crawled based on filters""" if url in visited_urls: return False if not self._validate_url(url): return False url_domain = urlparse(url).netloc if same_domain_only and url_domain != start_domain: return False if include_patterns: if not any(pattern in url for pattern in include_patterns): return False if exclude_patterns: if any(pattern in url for pattern in exclude_patterns): return False return True def _analyze_url_structure(self, urls: List[str]) -> Dict[str, Any]: """Analyze URL structure and patterns""" if not urls: return {} # Group by path segments path_segments = {} extensions = {} for url in urls: parsed = urlparse(url) path_parts = [p for p in parsed.path.split("/") if p] # Count path depth depth = len(path_parts) if depth not in path_segments: path_segments[depth] = 0 path_segments[depth] += 1 # Count file extensions if path_parts: last_part = path_parts[-1] if "." in last_part: ext = last_part.split(".")[-1].lower() if ext not in extensions: extensions[ext] = 0 extensions[ext] += 1 return { "total_urls": len(urls), "path_depth_distribution": path_segments, "file_extensions": extensions, "average_depth": sum( depth * count for depth, count in path_segments.items() ) / len(urls) if urls else 0, } # === Helper Methods === def _validate_url(self, url: str) -> bool: """Validate URL format and safety""" try: parsed = urlparse(url) # Must have scheme and netloc if not parsed.scheme or not parsed.netloc: return False # Must be HTTP/HTTPS if parsed.scheme not in ["http", "https"]: return False # Check for blocked domains if parsed.netloc in self.config.blocked_domains: return False # Check for private IPs try: ip = socket.gethostbyname(parsed.netloc) ip_obj = ipaddress.ip_address(ip) if ip_obj.is_private or ip_obj.is_loopback: return False except (socket.gaierror, ValueError): pass # Hostname resolution failed, allow to proceed return True except Exception: return False async def _check_browsers(self) -> bool: """Check if Playwright browsers are installed""" try: async with async_playwright() as p: browser = await p.chromium.launch(headless=True) await browser.close() return True except Exception: return False async def _install_browsers(self): """Install Playwright browsers""" try: process = await asyncio.create_subprocess_exec( sys.executable, "-m", "playwright", "install", "chromium", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode != 0: logger.error(f"Failed to install browsers: {stderr.decode()}") raise FreeCrawlError( ErrorCode.PROCESSING_ERROR, "Failed to install browsers" ) logger.info("Playwright browsers installed successfully") except Exception as e: logger.error(f"Browser installation error: {e}") raise FreeCrawlError( ErrorCode.PROCESSING_ERROR, f"Browser installation failed: {str(e)}" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dylan-gluck/freecrawl-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server