We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/walksoda/crawl-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
Crawl4AI MCP Server - FastMCP 2.0 Version
Uses FastMCP 2.0.0 which doesn't have banner output issues.
Clean STDIO transport compatible for perfect MCP communication.
"""
import os
import sys
import warnings
# Set environment variables before any imports
os.environ["FASTMCP_QUIET"] = "true"
os.environ["FASTMCP_NO_BANNER"] = "true"
os.environ["FASTMCP_SILENT"] = "true"
os.environ["PYTHONWARNINGS"] = "ignore"
os.environ["TERM"] = "dumb"
os.environ["SHELL"] = "/bin/sh"
warnings.filterwarnings("ignore")
warnings.simplefilter("ignore")
import logging
logging.disable(logging.CRITICAL)
# Import FastMCP 2.0 - no banner output!
from fastmcp import FastMCP
from typing import Any, Dict, List, Optional, Union, Annotated
from pydantic import Field, BaseModel
# Import refactored modules
from .utils import estimate_tokens, apply_token_limit
from .server_helpers import (
_load_heavy_imports,
_load_tool_modules,
_ensure_browser_setup,
get_system_diagnostics,
_convert_result_to_dict,
_process_content_fields,
_should_trigger_fallback,
get_tool_modules,
is_heavy_imports_loaded,
is_tools_imported,
is_browser_setup_done,
_apply_content_slicing,
_apply_search_content_slicing,
_get_search_cache_key,
_get_cached_search_result,
_cache_search_result,
)
from .validators import validate_crawl_url_params, validate_content_slicing_params
# Create MCP server with clean initialization
mcp = FastMCP("Crawl4AI")
# Backward compatibility aliases
_estimate_tokens = estimate_tokens
_apply_token_limit_fallback = apply_token_limit
_validate_crawl_url_params = validate_crawl_url_params
# Global module references for lazy loading (populated by _load_tool_modules)
web_crawling = None
search = None
youtube = None
file_processing = None
utilities = None
_tools_imported = False
def _get_tool_modules():
"""Get tool modules, loading them if needed."""
global web_crawling, search, youtube, file_processing, utilities, _tools_imported
if not _tools_imported:
_load_tool_modules()
wc, s, yt, fp, ut = get_tool_modules()
web_crawling = wc
search = s
youtube = yt
file_processing = fp
utilities = ut
_tools_imported = is_tools_imported()
return _tools_imported
@mcp.tool()
async def crawl_url(
url: Annotated[str, Field(description="URL to crawl")],
css_selector: Annotated[Optional[str], Field(description="CSS selector for extraction")] = None,
extract_media: Annotated[bool, Field(description="Extract images/videos")] = False,
take_screenshot: Annotated[bool, Field(description="Take screenshot")] = False,
generate_markdown: Annotated[bool, Field(description="Generate markdown")] = True,
include_cleaned_html: Annotated[bool, Field(description="Include cleaned HTML")] = False,
wait_for_selector: Annotated[Optional[str], Field(description="Wait for element to load")] = None,
timeout: Annotated[int, Field(description="Timeout in seconds")] = 60,
wait_for_js: Annotated[bool, Field(description="Wait for JavaScript")] = False,
auto_summarize: Annotated[bool, Field(description="Auto-summarize large content")] = False,
use_undetected_browser: Annotated[bool, Field(description="Bypass bot detection")] = False,
content_limit: Annotated[int, Field(description="Max characters to return (0=unlimited)")] = 0,
content_offset: Annotated[int, Field(description="Start position for content (0-indexed)")] = 0,
) -> dict:
"""Extract web page content with JavaScript support. Use wait_for_js=true for SPAs. Use content_offset/content_limit for pagination."""
# Input validation
validation_error = _validate_crawl_url_params(url, timeout)
if validation_error:
return validation_error
# Content slicing validation
slicing_error = validate_content_slicing_params(content_limit, content_offset)
if slicing_error:
return slicing_error
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available",
"error_code": "modules_unavailable"
}
try:
result = await web_crawling.crawl_url(
url=url, css_selector=css_selector, extract_media=extract_media,
take_screenshot=take_screenshot, generate_markdown=generate_markdown,
include_cleaned_html=include_cleaned_html,
wait_for_selector=wait_for_selector, timeout=timeout, wait_for_js=wait_for_js,
auto_summarize=auto_summarize, use_undetected_browser=use_undetected_browser
)
result_dict = _convert_result_to_dict(result)
result_dict = _process_content_fields(result_dict, include_cleaned_html, generate_markdown)
result_dict = _apply_content_slicing(result_dict, content_limit, content_offset)
# Record if undetected browser was used in initial request
if use_undetected_browser:
result_dict["undetected_browser_used"] = True
# Check if fallback is needed
should_fallback, fallback_reason = _should_trigger_fallback(result_dict, generate_markdown)
if not should_fallback:
return _apply_token_limit_fallback(result_dict, max_tokens=25000)
# Try fallback with undetected browser
fallback_dict = await _execute_fallback(
url=url, css_selector=css_selector, extract_media=extract_media,
take_screenshot=take_screenshot, generate_markdown=generate_markdown,
include_cleaned_html=include_cleaned_html, wait_for_selector=wait_for_selector,
timeout=timeout, wait_for_js=wait_for_js, auto_summarize=auto_summarize,
fallback_reason=fallback_reason, original_error=None,
content_limit=content_limit, content_offset=content_offset
)
return _apply_token_limit_fallback(fallback_dict, max_tokens=25000)
except Exception as e:
# Determine error type for better diagnostics
error_type = type(e).__name__
error_message = str(e)
try:
fallback_dict = await _execute_fallback(
url=url, css_selector=css_selector, extract_media=extract_media,
take_screenshot=take_screenshot, generate_markdown=generate_markdown,
include_cleaned_html=include_cleaned_html, wait_for_selector=wait_for_selector,
timeout=timeout, wait_for_js=wait_for_js, auto_summarize=auto_summarize,
fallback_reason=f"Exception during initial crawl: {error_type}",
original_error=error_message,
content_limit=content_limit, content_offset=content_offset
)
return _apply_token_limit_fallback(fallback_dict, max_tokens=25000)
except Exception as fallback_error:
return {
"success": False,
"url": url,
"error": f"Both crawling methods failed",
"error_code": "both_methods_failed",
"diagnostics": {
"original_error": error_message,
"original_error_type": error_type,
"fallback_error": str(fallback_error),
"fallback_error_type": type(fallback_error).__name__
},
"retryable": "timeout" in error_message.lower() or "timeout" in str(fallback_error).lower(),
"suggested_fix": "Try increasing timeout or using wait_for_js=true for JavaScript-heavy pages"
}
async def _execute_fallback(
url: str, css_selector: Optional[str], extract_media: bool,
take_screenshot: bool, generate_markdown: bool, include_cleaned_html: bool,
wait_for_selector: Optional[str], timeout: int, wait_for_js: bool, auto_summarize: bool,
fallback_reason: str, original_error: Optional[str],
content_limit: int = 0, content_offset: int = 0
) -> dict:
"""Execute fallback crawl with undetected browser and add diagnostics."""
fallback_result = await web_crawling.crawl_url_with_fallback(
url=url, css_selector=css_selector, extract_media=extract_media,
take_screenshot=take_screenshot, generate_markdown=generate_markdown,
include_cleaned_html=include_cleaned_html,
wait_for_selector=wait_for_selector, timeout=timeout, wait_for_js=wait_for_js,
auto_summarize=auto_summarize, use_undetected_browser=True
)
fallback_dict = _convert_result_to_dict(fallback_result)
fallback_dict = _process_content_fields(fallback_dict, include_cleaned_html, generate_markdown)
fallback_dict = _apply_content_slicing(fallback_dict, content_limit, content_offset)
# Always add fallback diagnostics
fallback_dict["fallback_used"] = True
fallback_dict["undetected_browser_used"] = True
fallback_dict["fallback_reason"] = fallback_reason
if original_error:
fallback_dict["original_error"] = original_error
return fallback_dict
@mcp.tool()
async def extract_youtube_transcript(
url: Annotated[str, Field(description="YouTube video URL")],
languages: Annotated[Optional[Union[List[str], str]], Field(description="Language codes in preference order")] = ["ja", "en"],
translate_to: Annotated[Optional[str], Field(description="Target language for translation")] = None,
include_timestamps: Annotated[bool, Field(description="Include timestamps")] = False,
preserve_formatting: Annotated[bool, Field(description="Preserve formatting")] = True,
include_metadata: Annotated[bool, Field(description="Include video metadata")] = True,
auto_summarize: Annotated[bool, Field(description="Summarize long transcripts")] = False,
max_content_tokens: Annotated[int, Field(description="Max tokens before summarization")] = 15000,
summary_length: Annotated[str, Field(description="'short'|'medium'|'long'")] = "medium",
llm_provider: Annotated[Optional[str], Field(description="LLM provider")] = None,
llm_model: Annotated[Optional[str], Field(description="LLM model")] = None,
enable_crawl_fallback: Annotated[bool, Field(description="Enable page crawl fallback when API fails")] = True,
fallback_timeout: Annotated[int, Field(description="Fallback crawl timeout in seconds")] = 60,
enrich_metadata: Annotated[bool, Field(description="Enrich metadata (upload_date, view_count) via page crawl")] = True
) -> dict:
"""Extract YouTube transcripts with timestamps. Works with public captioned videos. Supports fallback to page crawl."""
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available"
}
# Handle string-encoded array for languages parameter
if isinstance(languages, str):
try:
import json
languages = json.loads(languages)
except (json.JSONDecodeError, ValueError):
import re
matches = re.findall(r'"([^"]*)"', languages)
if matches:
languages = matches
else:
languages = ["ja", "en"]
try:
result = await youtube.extract_youtube_transcript(
url=url, languages=languages, translate_to=translate_to,
include_timestamps=include_timestamps, preserve_formatting=preserve_formatting,
include_metadata=include_metadata, auto_summarize=auto_summarize,
max_content_tokens=max_content_tokens, summary_length=summary_length,
llm_provider=llm_provider, llm_model=llm_model,
enable_crawl_fallback=enable_crawl_fallback, fallback_timeout=fallback_timeout,
enrich_metadata=enrich_metadata
)
# Strip segments to reduce response size when timestamps not needed
if not include_timestamps and result.get('transcript'):
transcript = result['transcript']
if 'segments' in transcript:
# Keep segment count for reference but remove the actual data
segment_count = len(transcript.get('segments', []))
transcript['segments'] = []
transcript['segments_stripped'] = True
transcript['original_segment_count'] = segment_count
# Apply token limit fallback to prevent MCP errors
result_with_fallback = _apply_token_limit_fallback(result, max_tokens=25000)
# Add YouTube-specific recommendations when truncation occurs
if result_with_fallback.get("token_limit_applied") or result_with_fallback.get("emergency_truncation"):
youtube_recommendations = [
"For long YouTube videos, consider using crawl_url for different extraction options",
f"Example: crawl_url(url='{url}', wait_for_js=true)",
]
existing_recs = result_with_fallback.get("recommendations", [])
result_with_fallback["recommendations"] = youtube_recommendations + existing_recs
# If auto_summarize was False, add additional suggestion
if not auto_summarize:
result_with_fallback["suggestion"] = "Transcript was truncated due to MCP token limits. Consider using crawl_url for more content, or set auto_summarize=True (requires OPENAI_API_KEY)."
return result_with_fallback
except Exception as e:
return {
"success": False,
"error": f"YouTube transcript error: {str(e)}"
}
@mcp.tool()
async def batch_extract_youtube_transcripts(
request: Annotated[Dict[str, Any], Field(description="Dict with: urls (max 3), languages, include_timestamps")]
) -> Dict[str, Any]:
"""Extract transcripts from multiple YouTube videos. Max 3 URLs per call."""
# URL limit check (MCP best practice: bounded toolsets)
urls = request.get('urls', [])
if len(urls) > 3:
return {"success": False, "error": "Maximum 3 YouTube URLs allowed per batch. Split into multiple calls."}
if not _get_tool_modules():
return {"success": False, "error": "Tool modules not available"}
try:
result = await youtube.batch_extract_youtube_transcripts(request)
# Apply token limit fallback to prevent MCP errors
result_with_fallback = _apply_token_limit_fallback(result, max_tokens=25000)
# If token limit was applied, provide helpful suggestion
if result_with_fallback.get("token_limit_applied"):
if not result_with_fallback.get("emergency_truncation"):
result_with_fallback["suggestion"] = "Batch transcript data was truncated due to MCP token limits. Consider reducing the number of videos or enabling auto_summarize for individual videos."
return result_with_fallback
except Exception as e:
return {
"success": False,
"error": f"Batch YouTube extraction error: {str(e)}"
}
@mcp.tool()
async def get_youtube_video_info(
video_url: Annotated[str, Field(description="YouTube video URL")],
summarize_transcript: Annotated[bool, Field(description="Summarize transcript")] = False,
max_tokens: Annotated[int, Field(description="Token limit for summarization")] = 25000,
llm_provider: Annotated[Optional[str], Field(description="LLM provider")] = None,
llm_model: Annotated[Optional[str], Field(description="LLM model")] = None,
summary_length: Annotated[str, Field(description="'short'|'medium'|'long'")] = "medium",
include_timestamps: Annotated[bool, Field(description="Include timestamps")] = True
) -> Dict[str, Any]:
"""Get YouTube video metadata and transcript availability."""
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available"
}
try:
result = await youtube.get_youtube_video_info(
video_url=video_url, summarize_transcript=summarize_transcript,
max_tokens=max_tokens, llm_provider=llm_provider, llm_model=llm_model,
summary_length=summary_length, include_timestamps=include_timestamps
)
# Apply token limit fallback to prevent MCP errors
result_with_fallback = _apply_token_limit_fallback(result, max_tokens=25000)
# If token limit was applied and summarize_transcript was False, provide helpful suggestion
if result_with_fallback.get("token_limit_applied") and not summarize_transcript:
if not result_with_fallback.get("emergency_truncation"):
result_with_fallback["suggestion"] = "Video info was truncated due to MCP token limits. Consider setting summarize_transcript=True for long transcripts."
return result_with_fallback
except Exception as e:
return {
"success": False,
"error": f"YouTube video info error: {str(e)}"
}
async def get_youtube_api_setup_guide() -> Dict[str, Any]:
"""Get youtube-transcript-api setup info. No API key required."""
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available"
}
try:
result = await youtube.get_youtube_api_setup_guide()
return result
except Exception as e:
return {
"success": False,
"error": f"YouTube API setup guide error: {str(e)}"
}
@mcp.tool()
async def process_file(
url: Annotated[str, Field(description="File URL (PDF, Office, ZIP)")],
max_size_mb: Annotated[int, Field(description="Max file size in MB")] = 100,
extract_all_from_zip: Annotated[bool, Field(description="Extract ZIP contents")] = True,
include_metadata: Annotated[bool, Field(description="Include metadata")] = True,
auto_summarize: Annotated[bool, Field(description="Auto-summarize large content")] = False,
max_content_tokens: Annotated[int, Field(description="Max tokens before summarization")] = 15000,
summary_length: Annotated[str, Field(description="'short'|'medium'|'long'")] = "medium",
llm_provider: Annotated[Optional[str], Field(description="LLM provider")] = None,
llm_model: Annotated[Optional[str], Field(description="LLM model")] = None,
content_limit: Annotated[int, Field(description="Max characters to return (0=unlimited)")] = 0,
content_offset: Annotated[int, Field(description="Start position for content (0-indexed)")] = 0
) -> dict:
"""Convert PDF, Word, Excel, PowerPoint, ZIP to markdown."""
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available"
}
try:
# Validate content slicing parameters (always validate if non-zero)
if content_limit != 0 or content_offset != 0:
slicing_error = validate_content_slicing_params(content_limit, content_offset)
if slicing_error:
return slicing_error
result = await file_processing.process_file(
url=url, max_size_mb=max_size_mb, extract_all_from_zip=extract_all_from_zip,
include_metadata=include_metadata, auto_summarize=auto_summarize,
max_content_tokens=max_content_tokens, summary_length=summary_length,
llm_provider=llm_provider, llm_model=llm_model
)
# Convert FileProcessResponse object to dict for JSON serialization
if hasattr(result, 'model_dump'):
result_dict = result.model_dump()
elif hasattr(result, 'dict'):
result_dict = result.dict()
else:
# Fallback: manual conversion
result_dict = {
'success': getattr(result, 'success', False),
'url': getattr(result, 'url', None),
'filename': getattr(result, 'filename', None),
'file_type': getattr(result, 'file_type', None),
'size_bytes': getattr(result, 'size_bytes', None),
'is_archive': getattr(result, 'is_archive', False),
'content': getattr(result, 'content', None),
'title': getattr(result, 'title', None),
'metadata': getattr(result, 'metadata', None),
'archive_contents': getattr(result, 'archive_contents', None),
'error': getattr(result, 'error', None),
'processing_time': getattr(result, 'processing_time', None)
}
# Apply content slicing if requested
if content_limit != 0 or content_offset != 0:
result_dict = _apply_content_slicing(result_dict, content_limit, content_offset)
# Apply token limit fallback to prevent MCP errors
result_with_fallback = _apply_token_limit_fallback(result_dict, max_tokens=25000)
# If token limit was applied and auto_summarize was False, provide helpful suggestion
if result_with_fallback.get("token_limit_applied") and not auto_summarize:
if not result_with_fallback.get("emergency_truncation"):
result_with_fallback["suggestion"] = "Content was truncated due to MCP token limits. Consider setting auto_summarize=True for better content reduction."
return result_with_fallback
except Exception as e:
return {
"success": False,
"error": f"File processing error: {str(e)}"
}
async def get_supported_file_formats() -> dict:
"""Get supported file formats (PDF, Office, ZIP) and their capabilities."""
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available"
}
try:
result = await file_processing.get_supported_file_formats()
return result
except Exception as e:
return {
"success": False,
"error": f"Get supported formats error: {str(e)}"
}
async def enhanced_process_large_content(
url: Annotated[str, Field(description="URL to process")],
chunking_strategy: Annotated[str, Field(description="'topic'|'sentence'|'overlap'|'regex'")] = "sentence",
filtering_strategy: Annotated[str, Field(description="'bm25'|'pruning'|'llm'")] = "bm25",
filter_query: Annotated[Optional[str], Field(description="Keywords for BM25 filtering")] = None,
max_chunk_tokens: Annotated[int, Field(description="Max tokens per chunk")] = 2000,
chunk_overlap: Annotated[int, Field(description="Overlap tokens")] = 200,
extract_top_chunks: Annotated[int, Field(description="Top chunks to extract")] = 5,
similarity_threshold: Annotated[float, Field(description="Min similarity 0-1")] = 0.5,
summarize_chunks: Annotated[bool, Field(description="Summarize chunks")] = False,
merge_strategy: Annotated[str, Field(description="'hierarchical'|'linear'")] = "linear",
final_summary_length: Annotated[str, Field(description="'short'|'medium'|'long'")] = "short"
) -> Dict[str, Any]:
"""Process large content with chunking and BM25 filtering."""
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available",
"processing_time": None,
"metadata": {},
"url": url,
"original_content_length": 0,
"filtered_content_length": 0,
"total_chunks": 0,
"relevant_chunks": 0,
"processing_method": "enhanced_large_content",
"chunking_strategy_used": chunking_strategy,
"filtering_strategy_used": filtering_strategy,
"chunks": [],
"chunk_summaries": None,
"merged_summary": None,
"final_summary": "Tool modules not available"
}
try:
import asyncio
# Always use fallback to basic crawling due to backend issues
print(f"Processing URL with fallback method: {url}")
fallback_result = await asyncio.wait_for(
web_crawling.crawl_url(
url=url,
generate_markdown=True,
timeout=10
),
timeout=10.0
)
if fallback_result and fallback_result.get("success", False):
content = fallback_result.get("content", "")
# Simple truncation as processing
max_content = max_chunk_tokens * extract_top_chunks
if len(content) > max_content:
content = content[:max_content] + "... [truncated for processing limit]"
# Create simple chunks
chunk_size = max_chunk_tokens
chunks = []
for i in range(0, min(len(content), max_content), chunk_size):
chunk_content = content[i:i + chunk_size]
if chunk_content.strip():
chunks.append({
"content": chunk_content,
"relevance_score": 1.0 - (i / max_content),
"chunk_index": len(chunks)
})
# Take top chunks
top_chunks = chunks[:extract_top_chunks]
# Generate simple summary
if summarize_chunks and len(content) > 1000:
final_summary = content[:500] + "... [content summary]"
else:
final_summary = content[:300] + "..." if len(content) > 300 else content
return {
"success": True,
"error": "Enhanced processing unavailable, used basic crawl with chunking",
"processing_time": 10,
"metadata": {"fallback_used": True, "processing_type": "basic_chunking"},
"url": url,
"original_content_length": len(fallback_result.get("content", "")),
"filtered_content_length": len(content),
"total_chunks": len(chunks),
"relevant_chunks": len(top_chunks),
"processing_method": "basic_crawl_with_chunking",
"chunking_strategy_used": chunking_strategy,
"filtering_strategy_used": "simple_truncation",
"chunks": top_chunks,
"chunk_summaries": None,
"merged_summary": None,
"final_summary": final_summary
}
else:
raise Exception("Fallback crawling also failed")
except asyncio.TimeoutError:
return {
"success": False,
"error": "Processing timed out after 10 seconds",
"processing_time": 10,
"metadata": {"timeout": True},
"url": url,
"original_content_length": 0,
"filtered_content_length": 0,
"total_chunks": 0,
"relevant_chunks": 0,
"processing_method": "timeout_fallback",
"chunking_strategy_used": chunking_strategy,
"filtering_strategy_used": filtering_strategy,
"chunks": [],
"chunk_summaries": None,
"merged_summary": None,
"final_summary": "Processing timed out"
}
except Exception as e:
return {
"success": False,
"error": f"Enhanced processing error: {str(e)}",
"processing_time": None,
"metadata": {"error_type": type(e).__name__},
"url": url,
"original_content_length": 0,
"filtered_content_length": 0,
"total_chunks": 0,
"relevant_chunks": 0,
"processing_method": "enhanced_large_content",
"chunking_strategy_used": chunking_strategy,
"filtering_strategy_used": filtering_strategy,
"chunks": [],
"chunk_summaries": None,
"merged_summary": None,
"final_summary": f"Error occurred: {str(e)}"
}
@mcp.tool()
async def deep_crawl_site(
url: Annotated[str, Field(description="Starting URL")],
max_depth: Annotated[int, Field(description="Link depth (1-2)")] = 2,
max_pages: Annotated[int, Field(description="Max pages (max: 10)")] = 5,
crawl_strategy: Annotated[str, Field(description="'bfs'|'dfs'|'best_first'")] = "bfs",
include_external: Annotated[bool, Field(description="Follow external links")] = False,
url_pattern: Annotated[Optional[str], Field(description="URL filter pattern")] = None,
score_threshold: Annotated[float, Field(description="Min relevance 0-1")] = 0.0,
extract_media: Annotated[bool, Field(description="Extract media")] = False,
base_timeout: Annotated[int, Field(description="Timeout per page")] = 60
) -> Dict[str, Any]:
"""Crawl multiple pages from a site with configurable depth."""
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available"
}
try:
result = await web_crawling.deep_crawl_site(
url=url, max_depth=max_depth, max_pages=max_pages, crawl_strategy=crawl_strategy,
include_external=include_external, url_pattern=url_pattern, score_threshold=score_threshold,
extract_media=extract_media, base_timeout=base_timeout
)
# Check if crawling was successful
if result.get("success", True):
# Apply token limit fallback before returning
return _apply_token_limit_fallback(result, max_tokens=25000)
# If deep crawl failed entirely, try with fallback strategy for the main URL
try:
fallback_result = await web_crawling.crawl_url_with_fallback(
url=url, generate_markdown=True, timeout=base_timeout
)
if fallback_result.get("success", False):
# Convert single URL result to deep crawl format
fallback_response = {
"success": True,
"results": [{
"url": url,
"title": fallback_result.get("title", ""),
"content": fallback_result.get("content", ""),
"markdown": fallback_result.get("markdown", ""),
"success": True
}],
"summary": {
"total_crawled": 1,
"successful": 1,
"failed": 0,
"fallback_used": True,
"note": "Used fallback crawling for main URL only due to deep crawl failure"
},
"original_error": result.get("error", "Deep crawl failed")
}
# Apply token limit fallback before returning
return _apply_token_limit_fallback(fallback_response, max_tokens=25000)
except Exception as fallback_error:
result["fallback_error"] = str(fallback_error)
return result
except Exception as e:
# If deep crawl throws an exception, try single URL fallback
try:
fallback_result = await web_crawling.crawl_url_with_fallback(
url=url, generate_markdown=True, timeout=base_timeout
)
if fallback_result.get("success", False):
fallback_response = {
"success": True,
"results": [{
"url": url,
"title": fallback_result.get("title", ""),
"content": fallback_result.get("content", ""),
"markdown": fallback_result.get("markdown", ""),
"success": True
}],
"summary": {
"total_crawled": 1,
"successful": 1,
"failed": 0,
"fallback_used": True,
"note": "Used fallback crawling for main URL only due to deep crawl exception"
},
"original_error": str(e)
}
# Apply token limit fallback before returning
return _apply_token_limit_fallback(fallback_response, max_tokens=25000)
except Exception as fallback_error:
pass
return {
"success": False,
"error": f"Deep crawl error: {str(e)}"
}
@mcp.tool()
async def crawl_url_with_fallback(
url: Annotated[str, Field(description="URL to crawl")],
css_selector: Annotated[Optional[str], Field(description="CSS selector")] = None,
extract_media: Annotated[bool, Field(description="Extract media")] = False,
take_screenshot: Annotated[bool, Field(description="Take screenshot")] = False,
generate_markdown: Annotated[bool, Field(description="Generate markdown")] = True,
wait_for_selector: Annotated[Optional[str], Field(description="Element to wait for")] = None,
timeout: Annotated[int, Field(description="Timeout in seconds")] = 60,
wait_for_js: Annotated[bool, Field(description="Wait for JavaScript")] = False,
auto_summarize: Annotated[bool, Field(description="Auto-summarize content")] = False,
content_limit: Annotated[int, Field(description="Max characters to return (0=unlimited)")] = 0,
content_offset: Annotated[int, Field(description="Start position for content (0-indexed)")] = 0,
) -> dict:
"""Crawl with fallback strategies for anti-bot sites. Use content_offset/content_limit for pagination."""
# Content slicing validation
slicing_error = validate_content_slicing_params(content_limit, content_offset)
if slicing_error:
return slicing_error
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available"
}
try:
result = await web_crawling.crawl_url_with_fallback(
url=url, css_selector=css_selector, extract_media=extract_media,
take_screenshot=take_screenshot, generate_markdown=generate_markdown,
wait_for_selector=wait_for_selector, timeout=timeout, wait_for_js=wait_for_js,
auto_summarize=auto_summarize
)
# Convert to dict and apply content slicing
result_dict = _convert_result_to_dict(result)
result_dict = _apply_content_slicing(result_dict, content_limit, content_offset)
return result_dict
except Exception as e:
return {
"success": False,
"error": f"Fallback crawl error: {str(e)}"
}
async def intelligent_extract(
url: Annotated[str, Field(description="Target URL")],
extraction_goal: Annotated[str, Field(description="Data to extract")],
content_filter: Annotated[str, Field(description="'bm25'|'pruning'|'llm'")] = "bm25",
filter_query: Annotated[Optional[str], Field(description="BM25 filter keywords")] = None,
chunk_content: Annotated[bool, Field(description="Split content")] = False,
use_llm: Annotated[bool, Field(description="Enable LLM")] = True,
llm_provider: Annotated[Optional[str], Field(description="LLM provider")] = None,
llm_model: Annotated[Optional[str], Field(description="LLM model")] = None,
custom_instructions: Annotated[Optional[str], Field(description="LLM instructions")] = None
) -> Dict[str, Any]:
"""Extract specific data from web pages using LLM."""
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available"
}
try:
result = await web_crawling.intelligent_extract(
url=url, extraction_goal=extraction_goal, content_filter=content_filter,
filter_query=filter_query, chunk_content=chunk_content, use_llm=use_llm,
llm_provider=llm_provider, llm_model=llm_model, custom_instructions=custom_instructions
)
# Check if extraction was successful
if result.get("success", True):
# Apply token limit fallback before returning
return _apply_token_limit_fallback(result, max_tokens=25000)
# If intelligent extraction failed, try with fallback crawling
try:
fallback_crawl = await web_crawling.crawl_url_with_fallback(
url=url, generate_markdown=True, timeout=60
)
if fallback_crawl.get("success", False):
# Attempt basic extraction from fallback content
content = fallback_crawl.get("markdown", "") or fallback_crawl.get("content", "")
if content.strip():
fallback_response = {
"success": True,
"url": url,
"extraction_goal": extraction_goal,
"extracted_data": {
"raw_content": content[:2000] + ("..." if len(content) > 2000 else ""),
"note": "Fallback extraction - manual processing may be needed"
},
"content": fallback_crawl.get("content", ""),
"markdown": fallback_crawl.get("markdown", ""),
"fallback_used": True,
"original_error": result.get("error", "Intelligent extraction failed")
}
# Apply token limit fallback before returning
return _apply_token_limit_fallback(fallback_response, max_tokens=25000)
except Exception as fallback_error:
result["fallback_error"] = str(fallback_error)
return result
except Exception as e:
# If intelligent extraction throws an exception, try basic fallback
try:
fallback_crawl = await web_crawling.crawl_url_with_fallback(
url=url, generate_markdown=True, timeout=60
)
if fallback_crawl.get("success", False):
content = fallback_crawl.get("markdown", "") or fallback_crawl.get("content", "")
fallback_response = {
"success": True,
"url": url,
"extraction_goal": extraction_goal,
"extracted_data": {
"raw_content": content[:2000] + ("..." if len(content) > 2000 else ""),
"note": "Fallback extraction - manual processing may be needed"
},
"content": fallback_crawl.get("content", ""),
"markdown": fallback_crawl.get("markdown", ""),
"fallback_used": True,
"original_error": str(e)
}
# Apply token limit fallback before returning
return _apply_token_limit_fallback(fallback_response, max_tokens=25000)
except Exception as fallback_error:
pass
return {
"success": False,
"error": f"Intelligent extraction error: {str(e)}"
}
async def extract_entities(
url: Annotated[str, Field(description="Target URL")],
entity_types: Annotated[List[str], Field(description="Types: email, phone, url, date, ip, price")],
custom_patterns: Annotated[Optional[Dict[str, str]], Field(description="Custom regex patterns")] = None,
include_context: Annotated[bool, Field(description="Include context")] = True,
deduplicate: Annotated[bool, Field(description="Remove duplicates")] = True,
use_llm: Annotated[bool, Field(description="Use LLM for NER")] = False,
llm_provider: Annotated[Optional[str], Field(description="LLM provider")] = None,
llm_model: Annotated[Optional[str], Field(description="LLM model")] = None
) -> Dict[str, Any]:
"""Extract entities (emails, phones, etc.) from web pages."""
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available"
}
try:
result = await web_crawling.extract_entities(
url=url, entity_types=entity_types, custom_patterns=custom_patterns,
include_context=include_context, deduplicate=deduplicate, use_llm=use_llm,
llm_provider=llm_provider, llm_model=llm_model
)
# Check if entity extraction was successful
if result.get("success", True):
# Apply token limit fallback before returning
return _apply_token_limit_fallback(result, max_tokens=25000)
# If entity extraction failed, try with fallback crawling
try:
fallback_crawl = await web_crawling.crawl_url_with_fallback(
url=url, generate_markdown=True, timeout=60
)
if fallback_crawl.get("success", False):
content = fallback_crawl.get("content", "") or fallback_crawl.get("markdown", "")
# Basic regex-based entity extraction on fallback content
import re
entities = {}
if "emails" in entity_types:
emails = re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', content)
if emails:
entities["emails"] = list(set(emails)) if deduplicate else emails
if "phones" in entity_types:
phones = re.findall(r'[\+]?[1-9]?[0-9]{7,15}', content)
if phones:
entities["phones"] = list(set(phones)) if deduplicate else phones
if "urls" in entity_types:
urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', content)
if urls:
entities["urls"] = list(set(urls)) if deduplicate else urls
fallback_response = {
"success": True,
"url": url,
"entities": entities,
"entity_types": entity_types,
"total_found": sum(len(v) for v in entities.values()),
"content": content[:500] + ("..." if len(content) > 500 else ""),
"fallback_used": True,
"note": "Basic regex extraction used - some entity types may not be fully supported",
"original_error": result.get("error", "Entity extraction failed")
}
# Apply token limit fallback before returning
return _apply_token_limit_fallback(fallback_response, max_tokens=25000)
except Exception as fallback_error:
result["fallback_error"] = str(fallback_error)
return result
except Exception as e:
# If entity extraction throws an exception, try basic fallback
try:
fallback_crawl = await web_crawling.crawl_url_with_fallback(
url=url, generate_markdown=True, timeout=60
)
if fallback_crawl.get("success", False):
content = fallback_crawl.get("content", "") or fallback_crawl.get("markdown", "")
# Basic regex-based entity extraction
import re
entities = {}
if "emails" in entity_types:
emails = re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', content)
if emails:
entities["emails"] = list(set(emails)) if deduplicate else emails
if "phones" in entity_types:
phones = re.findall(r'[\+]?[1-9]?[0-9]{7,15}', content)
if phones:
entities["phones"] = list(set(phones)) if deduplicate else phones
if "urls" in entity_types:
urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', content)
if urls:
entities["urls"] = list(set(urls)) if deduplicate else urls
fallback_response = {
"success": True,
"url": url,
"entities": entities,
"entity_types": entity_types,
"total_found": sum(len(v) for v in entities.values()),
"content": content[:500] + ("..." if len(content) > 500 else ""),
"fallback_used": True,
"note": "Basic regex extraction used - some entity types may not be fully supported",
"original_error": str(e)
}
# Apply token limit fallback before returning
return _apply_token_limit_fallback(fallback_response, max_tokens=25000)
except Exception as fallback_error:
pass
return {
"success": False,
"error": f"Entity extraction error: {str(e)}"
}
@mcp.tool()
async def extract_structured_data(
url: Annotated[str, Field(description="Target URL")],
extraction_type: Annotated[str, Field(description="'css'|'llm'|'table'")] = "css",
css_selectors: Annotated[Optional[Dict[str, str]], Field(description="CSS selector mapping")] = None,
extraction_schema: Annotated[Optional[Dict[str, str]], Field(description="Schema definition")] = None,
generate_markdown: Annotated[bool, Field(description="Generate markdown")] = False,
wait_for_js: Annotated[bool, Field(description="Wait for JavaScript")] = False,
timeout: Annotated[int, Field(description="Timeout in seconds")] = 30,
use_llm_table_extraction: Annotated[bool, Field(description="Use LLM table extraction")] = False,
table_chunking_strategy: Annotated[str, Field(description="'intelligent'|'fixed'|'semantic'")] = "intelligent"
) -> Dict[str, Any]:
"""Extract structured data using CSS selectors or LLM."""
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available"
}
try:
# NEW: LLM Table Extraction mode
if extraction_type == "table" or use_llm_table_extraction:
try:
result = await web_crawling.extract_structured_data(
url=url,
extraction_type="llm_table",
extraction_schema=extraction_schema,
generate_markdown=generate_markdown,
wait_for_js=wait_for_js,
timeout=timeout,
chunking_strategy=table_chunking_strategy
)
if result.get("success", False):
result["processing_method"] = "llm_table_extraction"
result["features_used"] = ["intelligent_chunking", "massive_table_support"]
# Apply token limit fallback before returning
return _apply_token_limit_fallback(result, max_tokens=25000)
except Exception as table_error:
# Fallback to CSS extraction if table extraction fails
if css_selectors:
extraction_type = "css"
else:
return {
"success": False,
"error": f"LLM table extraction failed: {str(table_error)}",
"suggested_fallback": "Try with css_selectors or extraction_type='css'"
}
# CSS selectors provided and extraction_type is css
if css_selectors and extraction_type == "css":
# Use basic crawling with CSS selector post-processing
try:
# Basic crawl first
crawl_result = await web_crawling.crawl_url(
url=url,
generate_markdown=generate_markdown,
wait_for_js=wait_for_js,
timeout=timeout
)
# If initial crawl fails, try fallback
if not crawl_result.get("success", False) or not crawl_result.get("content", "").strip():
fallback_result = await web_crawling.crawl_url_with_fallback(
url=url,
generate_markdown=generate_markdown,
wait_for_js=wait_for_js,
timeout=timeout,
use_undetected_browser=True
)
if fallback_result.get("success", False):
crawl_result = fallback_result
crawl_result["fallback_used"] = True
else:
return crawl_result
# Enhanced CSS selector extraction with table detection
from bs4 import BeautifulSoup
html_content = crawl_result.get("content", "")
soup = BeautifulSoup(html_content, 'html.parser')
extracted_data = {}
tables_found = []
# Enhanced table detection and extraction
tables = soup.find_all('table')
if tables and use_llm_table_extraction:
for i, table in enumerate(tables):
table_data = {
"table_index": i,
"headers": [],
"rows": [],
"extraction_method": "enhanced_css_with_table_support"
}
# Extract headers
headers = table.find_all(['th', 'td'])
if headers:
table_data["headers"] = [h.get_text().strip() for h in headers[:10]] # Limit for performance
# Extract first few rows
rows = table.find_all('tr')
for j, row in enumerate(rows[:5]): # Limit for performance
cells = row.find_all(['td', 'th'])
row_data = [cell.get_text().strip() for cell in cells]
if row_data:
table_data["rows"].append(row_data)
tables_found.append(table_data)
# Standard CSS selector extraction
for key, selector in css_selectors.items():
elements = soup.select(selector)
if elements:
if len(elements) == 1:
extracted_data[key] = elements[0].get_text().strip()
else:
extracted_data[key] = [elem.get_text().strip() for elem in elements]
else:
extracted_data[key] = None
result = {
"success": True,
"url": url,
"extracted_data": extracted_data,
"processing_method": "enhanced_css_selector_extraction",
"content": crawl_result.get("content", ""),
"markdown": crawl_result.get("markdown", "")
}
if tables_found:
result["tables_detected"] = len(tables_found)
result["table_data"] = tables_found
result["table_extraction_enhanced"] = True
if crawl_result.get("fallback_used"):
result["fallback_used"] = True
# Apply token limit fallback before returning
return _apply_token_limit_fallback(result, max_tokens=25000)
except ImportError:
# If BeautifulSoup not available, try fallback crawl
try:
fallback_result = await web_crawling.crawl_url_with_fallback(
url=url,
generate_markdown=generate_markdown,
wait_for_js=wait_for_js,
timeout=timeout,
use_undetected_browser=True
)
if fallback_result.get("success", False):
fallback_response = {
"success": True,
"url": url,
"extracted_data": {"raw_content": fallback_result.get("content", "")[:500] + "..."},
"processing_method": "fallback_crawl_only",
"content": fallback_result.get("content", ""),
"markdown": fallback_result.get("markdown", ""),
"fallback_used": True,
"note": "BeautifulSoup not available - CSS extraction skipped"
}
# Apply token limit fallback before returning
return _apply_token_limit_fallback(fallback_response, max_tokens=25000)
except Exception:
pass
return {
"success": False,
"error": "BeautifulSoup not available for CSS extraction"
}
except Exception as e:
# Try fallback on CSS extraction error
try:
fallback_result = await web_crawling.crawl_url_with_fallback(
url=url,
generate_markdown=generate_markdown,
wait_for_js=wait_for_js,
timeout=timeout,
use_undetected_browser=True
)
if fallback_result.get("success", False):
fallback_response = {
"success": True,
"url": url,
"extracted_data": {"raw_content": fallback_result.get("content", "")[:500] + "..."},
"processing_method": "fallback_crawl_after_css_error",
"content": fallback_result.get("content", ""),
"markdown": fallback_result.get("markdown", ""),
"fallback_used": True,
"original_error": str(e)
}
# Apply token limit fallback before returning
return _apply_token_limit_fallback(fallback_response, max_tokens=25000)
except Exception:
pass
return {
"success": False,
"error": f"CSS extraction error: {str(e)}"
}
else:
# Fallback to basic crawling or LLM extraction
crawl_result = await web_crawling.crawl_url(
url=url,
generate_markdown=generate_markdown,
wait_for_js=wait_for_js,
timeout=timeout
)
# If basic crawl fails, try fallback
if not crawl_result.get("success", False) or not crawl_result.get("content", "").strip():
fallback_result = await web_crawling.crawl_url_with_fallback(
url=url,
generate_markdown=generate_markdown,
wait_for_js=wait_for_js,
timeout=timeout,
use_undetected_browser=True
)
if fallback_result.get("success", False):
crawl_result = fallback_result
crawl_result["fallback_used"] = True
if crawl_result.get("success", False):
crawl_result["processing_method"] = "basic_crawl_fallback"
crawl_result["note"] = "Used basic crawling - structured extraction not configured"
crawl_result["extracted_data"] = {"raw_content": crawl_result.get("content", "")[:500] + "..."}
# Apply token limit fallback before returning
return _apply_token_limit_fallback(crawl_result, max_tokens=25000)
except Exception as e:
# Final fallback attempt
try:
fallback_result = await web_crawling.crawl_url_with_fallback(
url=url,
generate_markdown=generate_markdown,
wait_for_js=wait_for_js,
timeout=timeout,
use_undetected_browser=True
)
if fallback_result.get("success", False):
fallback_response = {
"success": True,
"url": url,
"extracted_data": {"raw_content": fallback_result.get("content", "")[:500] + "..."},
"processing_method": "emergency_fallback",
"content": fallback_result.get("content", ""),
"markdown": fallback_result.get("markdown", ""),
"fallback_used": True,
"original_error": str(e)
}
# Apply token limit fallback before returning
return _apply_token_limit_fallback(fallback_response, max_tokens=25000)
except Exception:
pass
return {
"success": False,
"error": f"Structured extraction error: {str(e)}"
}
@mcp.tool()
async def search_google(
request: Annotated[Dict[str, Any], Field(description="Dict with: query (required), num_results, search_genre, language, region, recent_days")]
) -> Dict[str, Any]:
"""Search Google with genre filtering. Genres: academic, news, technical, commercial, social."""
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available"
}
try:
# Extract parameters from request
query = request.get('query', '')
num_results = request.get('num_results', 10)
# Extract and coerce to int (handles float/string from JSON)
try:
content_limit = int(request.get('content_limit', 0))
content_offset = int(request.get('content_offset', 0))
except (TypeError, ValueError) as e:
return {
"success": False,
"error": f"Invalid content slicing parameter type: {str(e)}",
"error_code": "invalid_slicing_param_type"
}
# Validate content slicing parameters (always validate if non-zero)
if content_limit != 0 or content_offset != 0:
slicing_error = validate_content_slicing_params(content_limit, content_offset)
if slicing_error:
return slicing_error
# Check cache when slicing is requested
cache_key = None
if content_limit != 0 or content_offset != 0:
cache_key = _get_search_cache_key(request)
cached = _get_cached_search_result(cache_key)
if cached:
# Apply slicing to cached result
import copy
result_copy = copy.deepcopy(cached)
result_copy = _apply_search_content_slicing(result_copy, content_limit, content_offset)
result_copy['cache_hit'] = True
return _apply_token_limit_fallback(result_copy, max_tokens=25000)
# Execute search
result = await search.search_google(request)
# Store in cache if slicing is requested and search succeeded
if result.get('success') and cache_key:
import copy
_cache_search_result(cache_key, copy.deepcopy(result))
# Apply content slicing if requested
if content_limit != 0 or content_offset != 0:
result = _apply_search_content_slicing(result, content_limit, content_offset)
result['cache_hit'] = False
# Apply token limit fallback to prevent MCP errors
result_with_fallback = _apply_token_limit_fallback(result, max_tokens=25000)
return result_with_fallback
except Exception as e:
return {
"success": False,
"error": f"Google search error: {str(e)}"
}
@mcp.tool()
async def batch_search_google(
request: Annotated[Dict[str, Any], Field(description="Dict with: queries (max 3), num_results_per_query, search_genre, recent_days")]
) -> Dict[str, Any]:
"""Perform multiple Google searches. Max 3 queries per call."""
# Query limit check (MCP best practice: bounded toolsets)
queries = request.get('queries', [])
if len(queries) > 3:
return {"success": False, "error": "Maximum 3 queries allowed per batch. Split into multiple calls."}
if not _get_tool_modules():
return {"success": False, "error": "Tool modules not available"}
try:
result = await search.batch_search_google(request)
return _apply_token_limit_fallback(result, max_tokens=25000)
except Exception as e:
return {
"success": False,
"error": f"Batch search error: {str(e)}"
}
@mcp.tool()
async def search_and_crawl(
request: Annotated[Dict[str, Any], Field(description="Dict with: search_query (required), crawl_top_results, search_genre, recent_days")]
) -> Dict[str, Any]:
"""Search Google and crawl top results. Combines search with full content extraction."""
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available"
}
try:
# Extract parameters from request
search_query = request.get('search_query')
if not search_query:
return {
"success": False,
"error": "search_query is required in request"
}
crawl_top_results = min(request.get('crawl_top_results', 2), 3)
search_genre = request.get('search_genre')
recent_days = request.get('recent_days')
generate_markdown = request.get('generate_markdown', True)
max_content_per_page = request.get('max_content_per_page', 5000)
result = await search.search_and_crawl(
search_query=search_query,
crawl_top_results=crawl_top_results,
search_genre=search_genre,
recent_days=recent_days,
generate_markdown=generate_markdown
)
# Check for failed crawls and apply fallback
if result and isinstance(result, dict) and "crawled_pages" in result:
failed_pages = []
for i, page in enumerate(result["crawled_pages"]):
if isinstance(page, dict):
if not page.get("success", True) or not page.get("content", "").strip():
failed_pages.append((i, page.get("url", "")))
# Apply fallback to failed pages
for idx, url in failed_pages:
if url:
try:
fallback_result = await web_crawling.crawl_url_with_fallback(
url=url,
generate_markdown=generate_markdown,
timeout=30
)
if fallback_result.get("success", False):
fallback_result["fallback_used"] = True
# Update the page data
result["crawled_pages"][idx].update(fallback_result)
except Exception as fallback_error:
result["crawled_pages"][idx]["fallback_error"] = str(fallback_error)
# Truncate content if too large
if result and isinstance(result, dict):
if "crawled_pages" in result:
for page in result["crawled_pages"]:
if isinstance(page, dict):
if "content" in page and len(page["content"]) > max_content_per_page:
page["content"] = page["content"][:max_content_per_page] + "... [truncated for size limit]"
if "markdown" in page and len(page["markdown"]) > max_content_per_page:
page["markdown"] = page["markdown"][:max_content_per_page] + "... [truncated for size limit]"
# Apply token limit fallback before returning
return _apply_token_limit_fallback(result, max_tokens=25000)
except Exception as e:
# If search_and_crawl fails entirely, try with fallback crawling
try:
# First try to get search results only
search_result = await search.search_google({
"query": request.get('search_query', ''),
"num_results": request.get('crawl_top_results', 2)
})
if search_result.get("success", False) and "results" in search_result:
# Extract URLs and crawl with fallback
urls = [item.get("url", "") for item in search_result["results"] if item.get("url")]
crawled_pages = []
generate_markdown = request.get('generate_markdown', True)
max_content_per_page = request.get('max_content_per_page', 5000)
for url in urls[:request.get('crawl_top_results', 2)]:
try:
fallback_result = await web_crawling.crawl_url_with_fallback(
url=url,
generate_markdown=generate_markdown,
timeout=30
)
if fallback_result.get("success", False):
fallback_result["fallback_used"] = True
fallback_result["original_search_crawl_error"] = str(e)
# Truncate if needed
if "content" in fallback_result and len(fallback_result["content"]) > max_content_per_page:
fallback_result["content"] = fallback_result["content"][:max_content_per_page] + "... [truncated for size limit]"
if "markdown" in fallback_result and len(fallback_result["markdown"]) > max_content_per_page:
fallback_result["markdown"] = fallback_result["markdown"][:max_content_per_page] + "... [truncated for size limit]"
crawled_pages.append(fallback_result)
except Exception as individual_error:
crawled_pages.append({
"success": False,
"url": url,
"error": f"Individual crawl failed: {str(individual_error)}",
"original_search_crawl_error": str(e)
})
fallback_response = {
"success": True,
"query": request.get('search_query', ''),
"search_results": search_result.get("results", []),
"crawled_pages": crawled_pages,
"fallback_used": True,
"original_error": str(e)
}
# Apply token limit fallback before returning
return _apply_token_limit_fallback(fallback_response, max_tokens=25000)
except Exception as fallback_error:
pass
return {
"success": False,
"error": f"Search and crawl error: {str(e)}"
}
async def get_search_genres() -> Dict[str, Any]:
"""Get available search genres for targeted searching."""
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available"
}
try:
result = await search.get_search_genres()
return result
except Exception as e:
return {
"success": False,
"error": f"Get search genres error: {str(e)}"
}
async def get_llm_config_info() -> Dict[str, Any]:
"""Get current LLM configuration and available providers."""
if not _get_tool_modules():
return {
"success": False,
"error": "Tool modules not available"
}
try:
result = await utilities.get_llm_config_info()
return result
except Exception as e:
return {
"success": False,
"error": f"LLM config info error: {str(e)}"
}
@mcp.tool()
async def batch_crawl(
urls: Annotated[List[str], Field(description="URLs to crawl (max 3)")],
base_timeout: Annotated[int, Field(description="Timeout per URL (default: 30)")] = 30,
generate_markdown: Annotated[bool, Field(description="Generate markdown (default: True)")] = True,
extract_media: Annotated[bool, Field(description="Extract media (default: False)")] = False,
wait_for_js: Annotated[bool, Field(description="Wait for JS (default: False)")] = False,
max_concurrent: Annotated[int, Field(description="Max concurrent (default: 3)")] = 3
) -> List[Dict[str, Any]]:
"""Crawl multiple URLs with fallback. Max 3 URLs per call."""
# URL limit check (MCP best practice: bounded toolsets)
if len(urls) > 3:
return [{"success": False, "error": "Maximum 3 URLs allowed per batch. Split into multiple calls."}]
if not _get_tool_modules():
return [{"success": False, "error": "Tool modules not available"}]
try:
# Build config from individual parameters
config = {
"generate_markdown": generate_markdown,
"extract_media": extract_media,
"wait_for_js": wait_for_js,
"max_concurrent": max_concurrent,
"use_undetected_browser": False # Default to False for batch
}
# Add timeout handling - optimized for faster response
import asyncio
# Reduced timeout: base + 10s per URL (instead of base * URLs)
total_timeout = base_timeout + (len(urls) * 10) + 30 # More reasonable timeout
result = await asyncio.wait_for(
utilities.batch_crawl(urls, config, base_timeout),
timeout=total_timeout
)
# Check for failed crawls and apply fallback with undetected browser
if isinstance(result, list):
failed_urls = []
for i, crawl_result in enumerate(result):
# Handle both dict and CrawlResponse objects
if hasattr(crawl_result, 'success'):
# CrawlResponse object
success = crawl_result.success
markdown = getattr(crawl_result, 'markdown', '') or ''
else:
# Dictionary object
success = crawl_result.get("success", True)
markdown = crawl_result.get("markdown", "") or ""
if not success or not markdown.strip():
failed_urls.append((i, urls[i]))
# Apply fallback to failed URLs with undetected browser
if failed_urls:
for idx, url in failed_urls:
try:
fallback_result = await web_crawling.crawl_url_with_fallback(
url=url,
generate_markdown=generate_markdown,
extract_media=extract_media,
wait_for_js=wait_for_js,
timeout=base_timeout,
use_undetected_browser=True # Force undetected for fallback
)
# Handle both dict and CrawlResponse objects for fallback
if hasattr(fallback_result, 'success'):
# CrawlResponse object - convert to dict
if fallback_result.success:
fallback_dict = fallback_result.dict()
fallback_dict["fallback_used"] = True
fallback_dict["undetected_browser_used"] = True
result[idx] = fallback_dict
else:
# Dictionary object
if fallback_result.get("success", False):
fallback_result["fallback_used"] = True
fallback_result["undetected_browser_used"] = True
result[idx] = fallback_result
except Exception as fallback_error:
# Handle CrawlResponse objects in error case
if hasattr(result[idx], 'dict'):
error_dict = result[idx].dict()
error_dict["fallback_error"] = str(fallback_error)
result[idx] = error_dict
else:
result[idx]["fallback_error"] = str(fallback_error)
# Convert CrawlResponse objects to dictionaries for JSON serialization
dict_results = []
for crawl_result in result:
if hasattr(crawl_result, 'dict'):
# CrawlResponse object
dict_results.append(crawl_result.dict())
else:
# Already a dictionary
dict_results.append(crawl_result)
# Apply token limit fallback to the entire batch result
batch_response = {"batch_results": dict_results, "total_urls": len(urls)}
final_result = _apply_token_limit_fallback(batch_response, max_tokens=25000)
# Return just the batch_results list if no token limits were applied
if not final_result.get("token_limit_applied"):
return dict_results
else:
# If token limits were applied, return the modified structure with metadata
return final_result.get("batch_results", dict_results)
except asyncio.TimeoutError:
return [{
"success": False,
"error": f"Batch crawl timed out after {total_timeout} seconds"
}]
except Exception as e:
# If batch crawl fails entirely, try individual fallbacks with undetected browser
try:
fallback_results = []
for url in urls:
try:
fallback_result = await web_crawling.crawl_url_with_fallback(
url=url,
generate_markdown=generate_markdown,
extract_media=extract_media,
wait_for_js=wait_for_js,
timeout=base_timeout,
use_undetected_browser=True # Force undetected for emergency fallback
)
# Handle CrawlResponse objects in emergency fallback
if hasattr(fallback_result, 'success'):
# CrawlResponse object - convert to dict
fallback_dict = fallback_result.dict()
if fallback_result.success:
fallback_dict["fallback_used"] = True
fallback_dict["undetected_browser_used"] = True
fallback_dict["original_batch_error"] = str(e)
fallback_results.append(fallback_dict)
else:
# Dictionary object
if fallback_result.get("success", False):
fallback_result["fallback_used"] = True
fallback_result["undetected_browser_used"] = True
fallback_result["original_batch_error"] = str(e)
fallback_results.append(fallback_result)
except Exception as individual_error:
fallback_results.append({
"success": False,
"url": url,
"error": f"Individual fallback failed: {str(individual_error)}",
"original_batch_error": str(e)
})
# Apply token limit fallback to emergency results
batch_response = {"batch_results": fallback_results, "total_urls": len(urls)}
final_result = _apply_token_limit_fallback(batch_response, max_tokens=25000)
return final_result.get("batch_results", fallback_results)
except Exception:
return [{
"success": False,
"error": f"Batch crawl error: {str(e)}"
}]
@mcp.tool()
async def multi_url_crawl(
url_configurations: Annotated[Dict[str, Dict], Field(description="URL-config mapping (max 5 URLs). Example: {'https://site1.com': {'wait_for_js': true}}")],
pattern_matching: Annotated[str, Field(description="Pattern: 'wildcard' or 'regex' (default: wildcard)")] = "wildcard",
default_config: Annotated[Optional[Dict], Field(description="Default config")] = None,
base_timeout: Annotated[int, Field(description="Timeout per URL (default: 30)")] = 30,
max_concurrent: Annotated[int, Field(description="Max concurrent (default: 3)")] = 3
) -> List[Dict[str, Any]]:
"""Multi-URL crawl with pattern-based config. Max 5 URL patterns per call."""
# URL limit check (MCP best practice: bounded toolsets)
if len(url_configurations) > 5:
return [{"success": False, "error": "Maximum 5 URL configurations allowed per batch. Split into multiple calls."}]
if not _get_tool_modules():
return [{
"success": False,
"error": "Tool modules not available"
}]
try:
import re
import fnmatch
# Extract all URLs from configurations
all_urls = []
for pattern in url_configurations.keys():
# For now, treat patterns as literal URLs if they don't contain wildcards
if pattern_matching == "wildcard" and ('*' not in pattern and '?' not in pattern):
all_urls.append(pattern)
if not all_urls:
return [{
"success": False,
"error": "No valid URLs found in configurations. Use direct URLs or wildcard patterns."
}]
results = []
# Process each URL with its matched configuration
for url in all_urls:
matched_config = default_config or {}
pattern_used = "default"
# Find matching pattern and configuration
for pattern, config in url_configurations.items():
pattern_matches = False
if pattern_matching == "wildcard":
pattern_matches = fnmatch.fnmatch(url, pattern) or url == pattern
elif pattern_matching == "regex":
try:
pattern_matches = bool(re.match(pattern, url))
except re.error:
continue
if pattern_matches:
matched_config = {**matched_config, **config}
pattern_used = pattern
break
# Apply configuration with fallback to defaults
crawl_config = {
"generate_markdown": matched_config.get("generate_markdown", True),
"extract_media": matched_config.get("extract_media", False),
"wait_for_js": matched_config.get("wait_for_js", False),
"timeout": matched_config.get("timeout", base_timeout),
"use_undetected_browser": matched_config.get("use_undetected_browser", False),
"css_selector": matched_config.get("css_selector")
}
try:
# Crawl with pattern-specific configuration
result = await web_crawling.crawl_url(
url=url,
**{k: v for k, v in crawl_config.items() if v is not None}
)
# Convert CrawlResponse to dict
if hasattr(result, 'model_dump'):
result_dict = result.model_dump()
elif hasattr(result, 'dict'):
result_dict = result.dict()
else:
result_dict = result
# Add configuration metadata to result
if result_dict.get("success", True):
result_dict["pattern_matched"] = pattern_used
result_dict["configuration_applied"] = crawl_config
result_dict["multi_url_config_used"] = True
result = result_dict
else:
# Try fallback with undetected browser if initial fails
fallback_result = await web_crawling.crawl_url_with_fallback(
url=url,
**{k: v for k, v in crawl_config.items() if v is not None and k != 'use_undetected_browser'},
use_undetected_browser=True
)
# Convert fallback result to dict too
if hasattr(fallback_result, 'model_dump'):
result = fallback_result.model_dump()
elif hasattr(fallback_result, 'dict'):
result = fallback_result.dict()
else:
result = fallback_result
if result.get("success", False):
result["pattern_matched"] = pattern_used
result["configuration_applied"] = crawl_config
result["multi_url_config_used"] = True
result["fallback_used"] = True
results.append(result)
except Exception as e:
# Error handling with pattern information
error_result = {
"success": False,
"url": url,
"error": f"Multi-URL crawl error: {str(e)}",
"pattern_matched": pattern_used,
"configuration_applied": crawl_config,
"multi_url_config_used": True
}
results.append(error_result)
# Apply token limit fallback to the entire multi-URL result
batch_response = {"multi_url_results": results, "total_urls": len(all_urls)}
final_result = _apply_token_limit_fallback(batch_response, max_tokens=25000)
# Return just the results list if no token limits were applied
if not final_result.get("token_limit_applied"):
return results
else:
# If token limits were applied, return the modified structure
return final_result.get("multi_url_results", results)
except Exception as e:
return [{
"success": False,
"error": f"Multi-URL configuration error: {str(e)}"
}]
def get_tool_selection_guide() -> dict:
"""Get tool-to-use-case mapping guide for all available tools."""
return {
"web_crawling": ["crawl_url", "deep_crawl_site", "crawl_url_with_fallback", "intelligent_extract", "extract_entities", "extract_structured_data"],
"youtube": ["extract_youtube_transcript", "batch_extract_youtube_transcripts", "get_youtube_video_info", "get_youtube_api_setup_guide"],
"search": ["search_google", "batch_search_google", "search_and_crawl", "get_search_genres"],
"batch": ["batch_crawl", "multi_url_crawl"],
"files": ["process_file", "get_supported_file_formats", "enhanced_process_large_content"],
"config": ["get_llm_config_info", "get_tool_selection_guide"],
"diagnostics": ["get_system_diagnostics"],
"new_v074_features": {
"undetected_browser": "Enhanced crawl_url with use_undetected_browser parameter",
"llm_table_extraction": "Revolutionary table extraction in extract_structured_data with use_llm_table_extraction",
"multi_url_config": "Pattern-based configuration matching in multi_url_crawl tool",
"intelligent_chunking": "Massive table support with adaptive chunking strategies"
},
"best_practices": {
"bot_detection": "Use crawl_url with undetected browser mode for difficult sites",
"table_data": "Enable LLM table extraction for complex table structures",
"mixed_domains": "Use multi_url_crawl for site-specific optimization",
"fallback_reliability": "All tools now include automatic fallback mechanisms"
}
}
def main():
"""Clean main entry point - FastMCP 2.0 with no banner issues"""
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--help":
print("Crawl4AI MCP Server - FastMCP 2.0 Version")
print("Usage: python -m crawl4ai_mcp.server [--transport TRANSPORT]")
print("Transports: stdio (default), streamable-http, sse")
return
# Parse args
transport = "stdio"
host = "127.0.0.1"
port = 8000
args = sys.argv[1:]
i = 0
while i < len(args):
if args[i] == "--transport" and i + 1 < len(args):
transport = args[i + 1]
i += 2
elif args[i] == "--host" and i + 1 < len(args):
host = args[i + 1]
i += 2
elif args[i] == "--port" and i + 1 < len(args):
port = int(args[i + 1])
i += 2
else:
i += 1
# Run server - clean FastMCP 2.0 execution
try:
if transport == "stdio":
mcp.run()
elif transport == "streamable-http" or transport == "http":
mcp.run(transport="streamable-http", host=host, port=port)
elif transport == "sse":
mcp.run(transport="sse", host=host, port=port)
else:
print(f"Unknown transport: {transport}")
sys.exit(1)
except KeyboardInterrupt:
pass
except Exception as e:
if transport != "stdio":
print(f"Server error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()