crawl4ai_mcp_server.pyā¢19.9 kB
#!/usr/bin/env python3
"""
Crawl4AI MCP Server
A FastMCP server that provides web scraping and crawling capabilities using Crawl4AI.
This server exposes three core tools:
1. get_page_structure - The "eyes" for analyzing webpage structure
2. crawl_with_schema - The "hands" for executing precise extraction schemas
3. take_screenshot - Media capture for visual representation
This server is designed to work with client-side AI that acts as the "brain"
to analyze and command the scraping operations.
Architecture:
- FastMCP handles MCP protocol and tool registration
- AsyncWebCrawler provides web scraping capabilities
- Proper logging to stderr prevents MCP stdio corruption
- All tools use async/await patterns for non-blocking operation
"""
import logging
import sys
from typing import Dict, Any
from pathlib import Path
from fastmcp import FastMCP, Context
from pydantic import Field
from typing_extensions import Annotated
from crawl4ai import AsyncWebCrawler
# Configure logging to stderr to prevent MCP stdio corruption
# This is critical - MCP uses stdio for communication, so all logging must go to stderr
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stderr,
force=True # Override any existing handlers
)
# Create logger for this module
logger = logging.getLogger(__name__)
# Ensure no print statements pollute stdio
def safe_print(*args, **kwargs):
"""Safe printing that goes to stderr instead of stdout."""
print(*args, file=sys.stderr, **kwargs)
# Initialize FastMCP server with comprehensive configuration
mcp = FastMCP(
name="Crawl4AI-MCP-Server",
version="1.0.0",
instructions=(
"This server provides web scraping capabilities using Crawl4AI. "
"The server acts as the 'hands and eyes' while the client AI acts as the 'brain'. "
"\n\nAvailable tools:\n"
"⢠get_page_structure: Analyze webpage structure and content\n"
"⢠crawl_with_schema: Execute precise data extraction using schemas\n"
"⢠take_screenshot: Capture visual representation of webpages\n"
"\nAll tools support proper error handling and async operation."
)
)
@mcp.tool()
async def server_status(ctx: Context) -> Dict[str, Any]:
"""
Get the current status and capabilities of the Crawl4AI MCP server.
This tool provides comprehensive information about server health, available features,
configuration status, and operational capabilities. Use this to verify server
connectivity and understand what web scraping capabilities are available.
Returns:
dict: Server status information including:
- server_name: The name of the MCP server
- version: Current server version
- status: Operational status (operational/error)
- transport: Communication transport type (stdio)
- working_directory: Current server working directory
- capabilities: List of available server capabilities
- dependencies: Status of key dependencies
- message: Human-readable status message
Example response:
{
"server_name": "Crawl4AI-MCP-Server",
"version": "1.0.0",
"status": "operational",
"capabilities": ["web_crawling", "content_extraction", "screenshot_capture", "schema_based_extraction"]
}
"""
try:
await ctx.info("Checking server status...")
# Get current working directory and basic system info
cwd = Path.cwd()
status = {
"server_name": "Crawl4AI-MCP-Server",
"version": "1.0.0",
"status": "operational",
"transport": "stdio",
"working_directory": str(cwd),
"capabilities": [
"web_crawling",
"content_extraction",
"screenshot_capture",
"schema_based_extraction"
],
"dependencies": {
"fastmcp": "installed",
"crawl4ai": "ready",
"playwright": "configured"
},
"message": "Server is ready to accept crawling requests"
}
await ctx.info("Server status check completed successfully")
return status
except Exception as e:
error_msg = f"Failed to get server status: {str(e)}"
logger.error(error_msg)
await ctx.error(error_msg)
return {
"status": "error",
"error": error_msg,
"server_name": "Crawl4AI-MCP-Server"
}
@mcp.tool()
async def get_page_structure(
url: Annotated[str, Field(description="The URL of the webpage to analyze")],
format: Annotated[str, Field(description="Output format: 'html' for cleaned HTML or 'markdown' for raw markdown", pattern="^(html|markdown)$")] = "html",
ctx: Context = None
) -> str:
"""
Fetch and analyze the structural content of a webpage for AI analysis.
This is the fundamental "eyes" tool that provides the raw material for client AI
to understand webpage structure. It returns clean, structured content without
executing any extraction schemas.
Args:
url: The URL of the webpage to crawl and analyze
format: Output format - 'html' for cleaned HTML or 'markdown' for raw markdown
ctx: MCP context for logging and progress reporting
Returns:
str: The webpage content in the requested format (HTML or Markdown)
Raises:
Exception: If the webpage cannot be accessed or processed
"""
if ctx:
await ctx.info(f"Starting webpage analysis for: {url}")
try:
# Validate URL format
if not url.startswith(('http://', 'https://')):
error_msg = f"Invalid URL format: {url}. URL must start with http:// or https://"
if ctx:
await ctx.error(error_msg)
logger.error(error_msg)
return f"ERROR: {error_msg}"
# Initialize AsyncWebCrawler
async with AsyncWebCrawler(verbose=False) as crawler:
if ctx:
await ctx.info("Initializing web crawler...")
# Crawl the webpage
if ctx:
await ctx.report_progress(progress=25, total=100, message="Fetching webpage...")
result = await crawler.arun(url=url)
if ctx:
await ctx.report_progress(progress=75, total=100, message="Processing content...")
# Check if crawling was successful
if not result.success:
error_msg = f"Failed to crawl {url}: {result.error_message or 'Unknown error'}"
if ctx:
await ctx.error(error_msg)
logger.error(error_msg)
return f"ERROR: {error_msg}"
# Return content in requested format
if format == "markdown":
content = result.markdown.raw_markdown if result.markdown else ""
if not content:
content = "No markdown content available"
if ctx:
await ctx.info(f"Returning markdown content ({len(content)} characters)")
else: # html format (default)
content = result.cleaned_html or ""
if not content:
content = "No HTML content available"
if ctx:
await ctx.info(f"Returning cleaned HTML content ({len(content)} characters)")
if ctx:
await ctx.report_progress(progress=100, total=100, message="Analysis complete")
# Add metadata header for context
metadata = "<!-- Webpage Analysis Results -->\n"
metadata += f"<!-- URL: {url} -->\n"
metadata += f"<!-- Format: {format} -->\n"
metadata += f"<!-- Content Length: {len(content)} characters -->\n"
metadata += f"<!-- Success: {result.success} -->\n\n"
return metadata + content
except Exception as e:
error_msg = f"Unexpected error analyzing {url}: {str(e)}"
logger.error(error_msg, exc_info=True)
if ctx:
await ctx.error(error_msg)
return f"ERROR: {error_msg}"
@mcp.tool()
async def crawl_with_schema(
url: Annotated[str, Field(description="The URL of the webpage to crawl and extract data from")],
extraction_schema: Annotated[str, Field(description="JSON string containing the extraction schema with field names and CSS selectors. Example: '{\"title\": \"h1\", \"price\": \".price\", \"description\": \".desc\"}'")],
ctx: Context = None
) -> str:
"""
Execute precision data extraction using AI-generated schemas with JsonCssExtractionStrategy.
This is the 'hands' tool that performs targeted data extraction based on schemas
provided by the client AI. It uses CSS selectors to extract specific data points
from webpages and returns structured JSON results.
Args:
url: The URL of the webpage to crawl and extract data from
extraction_schema: JSON string defining field names and their CSS selectors
ctx: MCP context for logging and progress reporting
Returns:
str: JSON string containing the extracted data according to the schema
Raises:
Exception: If the webpage cannot be accessed, schema is invalid, or extraction fails
"""
if ctx:
await ctx.info(f"Starting schema-based extraction for: {url}")
try:
# Import here to avoid linter removing unused imports
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
import json
# Validate URL format
if not url.startswith(('http://', 'https://')):
error_msg = f"Invalid URL format: {url}. URL must start with http:// or https://"
if ctx:
await ctx.error(error_msg)
logger.error(error_msg)
return json.dumps({"error": error_msg})
# Validate and parse extraction schema
try:
schema_dict = json.loads(extraction_schema)
if not isinstance(schema_dict, dict):
raise ValueError("Schema must be a JSON object")
if not schema_dict:
raise ValueError("Schema cannot be empty")
except (json.JSONDecodeError, ValueError) as e:
error_msg = f"Invalid extraction schema: {str(e)}"
if ctx:
await ctx.error(error_msg)
logger.error(error_msg)
return json.dumps({"error": error_msg})
if ctx:
await ctx.info(f"Parsed extraction schema with {len(schema_dict)} fields: {list(schema_dict.keys())}")
# Create JsonCssExtractionStrategy with the provided schema
extraction_strategy = JsonCssExtractionStrategy(schema_dict, verbose=False)
# Initialize AsyncWebCrawler with extraction strategy
async with AsyncWebCrawler(verbose=False) as crawler:
if ctx:
await ctx.info("Initializing web crawler with extraction strategy...")
# Crawl the webpage with schema extraction
if ctx:
await ctx.report_progress(progress=25, total=100, message="Fetching webpage...")
result = await crawler.arun(
url=url,
extraction_strategy=extraction_strategy
)
if ctx:
await ctx.report_progress(progress=75, total=100, message="Extracting data with schema...")
# Check if crawling was successful
if not result.success:
error_msg = f"Failed to crawl {url}: {result.error_message or 'Unknown error'}"
if ctx:
await ctx.error(error_msg)
logger.error(error_msg)
return json.dumps({"error": error_msg})
# Get extracted data
extracted_data = result.extracted_content
if ctx:
await ctx.report_progress(progress=90, total=100, message="Processing extracted data...")
# Validate extracted data
if not extracted_data:
if ctx:
await ctx.info("No data extracted - returning empty result")
return json.dumps({"extracted_data": {}, "message": "No data matched the extraction schema"})
# Try to parse extracted data as JSON if it's a string
if isinstance(extracted_data, str):
try:
extracted_data = json.loads(extracted_data)
except json.JSONDecodeError:
# If parsing fails, wrap the string in a result object
extracted_data = {"raw_extracted_content": extracted_data}
if ctx:
await ctx.report_progress(progress=100, total=100, message="Extraction complete")
await ctx.info(f"Successfully extracted {len(extracted_data) if isinstance(extracted_data, dict) else 1} data points")
# Return structured JSON response
response = {
"url": url,
"extraction_schema": schema_dict,
"extracted_data": extracted_data,
"success": True,
"timestamp": result.success_timestamp if hasattr(result, 'success_timestamp') else None
}
return json.dumps(response, indent=2, ensure_ascii=False)
except Exception as e:
error_msg = f"Unexpected error during schema extraction from {url}: {str(e)}"
logger.error(error_msg, exc_info=True)
if ctx:
await ctx.error(error_msg)
return json.dumps({
"error": error_msg,
"url": url,
"success": False
}, indent=2)
@mcp.tool()
async def take_screenshot(
url: Annotated[str, Field(description="The URL of the webpage to capture as a screenshot")],
ctx: Context = None
) -> str:
"""
Capture a visual screenshot of a webpage for media representation.
This is the visual capture tool that provides screenshot images of webpages
for the client AI to analyze. It returns base64-encoded image data that can
be processed by FastMCP's native image handling capabilities.
Args:
url: The URL of the webpage to capture
ctx: MCP context for logging and progress reporting
Returns:
str: JSON string containing base64-encoded screenshot data and metadata
Raises:
Exception: If the webpage cannot be accessed or screenshot fails
"""
if ctx:
await ctx.info(f"Starting screenshot capture for: {url}")
try:
# Import here to avoid linter removing unused imports
import json
import base64
from crawl4ai import CrawlerRunConfig
# Validate URL format
if not url.startswith(('http://', 'https://')):
error_msg = f"Invalid URL format: {url}. URL must start with http:// or https://"
if ctx:
await ctx.error(error_msg)
logger.error(error_msg)
return json.dumps({"error": error_msg})
# Configure crawler to capture screenshots
config = CrawlerRunConfig(
screenshot=True,
verbose=False
)
# Initialize AsyncWebCrawler with screenshot configuration
async with AsyncWebCrawler(verbose=False) as crawler:
if ctx:
await ctx.info("Initializing web crawler for screenshot capture...")
# Crawl the webpage with screenshot enabled
if ctx:
await ctx.report_progress(progress=25, total=100, message="Loading webpage...")
result = await crawler.arun(url=url, config=config)
if ctx:
await ctx.report_progress(progress=75, total=100, message="Capturing screenshot...")
# Check if crawling was successful
if not result.success:
error_msg = f"Failed to capture screenshot from {url}: {result.error_message or 'Unknown error'}"
if ctx:
await ctx.error(error_msg)
logger.error(error_msg)
return json.dumps({"error": error_msg})
# Get screenshot data
screenshot_data = result.screenshot
if ctx:
await ctx.report_progress(progress=90, total=100, message="Processing screenshot data...")
# Validate screenshot data
if not screenshot_data:
error_msg = "No screenshot data captured - screenshot may have failed"
if ctx:
await ctx.error(error_msg)
logger.error(error_msg)
return json.dumps({"error": error_msg})
# Screenshot is typically base64 encoded already
if isinstance(screenshot_data, str):
# If it's already a base64 string, use it directly
screenshot_base64 = screenshot_data
else:
# If it's binary data, encode it to base64
screenshot_base64 = base64.b64encode(screenshot_data).decode('utf-8')
if ctx:
await ctx.report_progress(progress=100, total=100, message="Screenshot capture complete")
await ctx.info(f"Successfully captured screenshot ({len(screenshot_base64)} characters)")
# Return structured JSON response with screenshot data
response = {
"url": url,
"screenshot_data": screenshot_base64,
"format": "base64_png",
"success": True,
"timestamp": result.success_timestamp if hasattr(result, 'success_timestamp') else None,
"metadata": {
"data_size": len(screenshot_base64),
"image_format": "PNG"
}
}
return json.dumps(response, indent=2)
except Exception as e:
error_msg = f"Unexpected error during screenshot capture from {url}: {str(e)}"
logger.error(error_msg, exc_info=True)
if ctx:
await ctx.error(error_msg)
return json.dumps({
"error": error_msg,
"url": url,
"success": False
}, indent=2)
# Main server entry point
def main():
"""Main entry point for the Crawl4AI MCP server."""
try:
logger.info("Initializing Crawl4AI MCP Server...")
logger.info("Server configuration:")
logger.info(f" - Name: {mcp.name}")
logger.info(" - Transport: stdio (MCP compatible)")
logger.info(" - Logging: stderr (stdio-safe)")
logger.info(" - Tools: server_status, get_page_structure, crawl_with_schema, take_screenshot")
logger.info("Starting server with stdio transport...")
# Run the FastMCP server with stdio transport (default for MCP)
mcp.run(transport="stdio")
except KeyboardInterrupt:
logger.info("Server shutdown requested by user")
except Exception as e:
logger.error(f"Server startup failed: {str(e)}")
raise
if __name__ == "__main__":
main()