#!/usr/bin/env python3
"""
Fetch as Markdown MCP Server
A Model Context Protocol server that fetches web pages and converts them to clean markdown,
focusing on main content extraction while minimizing context overhead.
"""
import asyncio
import logging
import re
import time
from typing import Any, Sequence
from urllib.parse import urljoin, urlparse
import requests
from bs4 import BeautifulSoup
from markdownify import markdownify as md
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.types import (
Tool,
TextContent,
LoggingLevel
)
import mcp.types as types
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("fetch-markdown-mcp")
# Initialize the MCP server
server = Server("fetch-as-markdown")
# Rate limiting
last_request_time = 0
MIN_REQUEST_INTERVAL = 1.0 # seconds between requests
# User agent for respectful scraping
USER_AGENT = "Mozilla/5.0 (compatible; MCP-Fetch-As-Markdown/1.0; +https://github.com/modelcontextprotocol/fetch-as-markdown)"
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""
List available tools.
Each tool specifies its arguments using JSON Schema validation.
"""
return [
Tool(
name="fetch",
description="Fetch a web page and convert it to clean markdown format, focusing on main content",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL of the web page to fetch and convert to markdown"
},
"include_links": {
"type": "boolean",
"description": "Whether to preserve links in the markdown output (default: true)",
"default": True
},
"include_images": {
"type": "boolean",
"description": "Whether to include image references in the markdown output (default: false)",
"default": False
},
"timeout": {
"type": "integer",
"description": "Request timeout in seconds (default: 10)",
"default": 10,
"minimum": 5,
"maximum": 30
}
},
"required": ["url"]
}
)
]
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any]) -> list[types.TextContent]:
"""
Handle tool execution requests.
"""
if name == "fetch":
return await handle_fetch_page(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
async def handle_fetch_page(arguments: dict[str, Any]) -> list[types.TextContent]:
"""Handle web page fetching and markdown conversion requests."""
url = arguments.get("url")
include_links = arguments.get("include_links", True)
include_images = arguments.get("include_images", False)
timeout = arguments.get("timeout", 10)
if not url:
raise ValueError("URL parameter is required")
# Validate URL format
try:
parsed_url = urlparse(url)
if not parsed_url.scheme or not parsed_url.netloc:
raise ValueError("Invalid URL format")
except Exception as e:
return [types.TextContent(
type="text",
text=f"# URL Validation Error\n\n**URL:** {url}\n**Error:** Invalid URL format - {str(e)}"
)]
results = []
try:
# Rate limiting
global last_request_time
current_time = time.time()
time_since_last = current_time - last_request_time
if time_since_last < MIN_REQUEST_INTERVAL:
await asyncio.sleep(MIN_REQUEST_INTERVAL - time_since_last)
last_request_time = time.time()
# Fetch the page
headers = {
'User-Agent': USER_AGENT,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
response = requests.get(url, headers=headers, timeout=timeout, allow_redirects=True)
response.raise_for_status()
# Get final URL after redirects
final_url = response.url
# Parse HTML
soup = BeautifulSoup(response.content, 'html.parser')
# Extract title
title = soup.find('title')
page_title = title.get_text().strip() if title else "Untitled"
# Remove unwanted elements to reduce context
unwanted_selectors = [
'script', 'style', 'nav', 'header', 'footer', 'aside',
'.advertisement', '.ad', '.ads', '.sidebar', '.navigation',
'.menu', '.navbar', '.header', '.footer', '.social-media',
'.comments', '.comment-section', '.related-posts', '.share',
'.popup', '.modal', '.overlay', '.cookie-notice', '.banner',
'[role="banner"]', '[role="navigation"]', '[role="complementary"]',
'.breadcrumb', '.pagination', '.tags', '.categories'
]
for selector in unwanted_selectors:
for element in soup.select(selector):
element.decompose()
# Try to find main content area
main_content = None
main_selectors = [
'main', 'article', '.main-content', '.content', '.post-content',
'.entry-content', '.article-content', '#content', '#main',
'.container .content', '.page-content', '.single-content'
]
for selector in main_selectors:
main_content = soup.select_one(selector)
if main_content:
break
# If no main content found, use body but clean it up more
if not main_content:
main_content = soup.find('body')
if main_content:
# Remove more elements for body content
for element in main_content.find_all(['header', 'nav', 'footer', 'aside']):
element.decompose()
# If still nothing, use the whole soup
if not main_content:
main_content = soup
# Clean up remaining unwanted attributes to reduce size
for element in main_content.find_all():
# Keep only essential attributes
attrs_to_keep = ['href', 'src', 'alt', 'title']
if include_links and element.name == 'a':
attrs_to_keep.append('href')
if include_images and element.name == 'img':
attrs_to_keep.extend(['src', 'alt'])
# Remove all other attributes
attrs = dict(element.attrs)
for attr in attrs:
if attr not in attrs_to_keep:
del element.attrs[attr]
# Convert to markdown
markdown_config = {
'heading_style': 'ATX',
'bullets': '-'
}
# Build list of tags to convert
convert_tags = ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'ul', 'ol', 'blockquote', 'pre', 'code', 'strong', 'em', 'br']
if include_links:
convert_tags.append('a')
if include_images:
convert_tags.append('img')
markdown_content = md(str(main_content), heading_style='ATX', bullets='-')
# Clean up the markdown
markdown_content = re.sub(r'\n\s*\n\s*\n', '\n\n', markdown_content) # Remove excessive blank lines
markdown_content = re.sub(r'[ \t]+\n', '\n', markdown_content) # Remove trailing whitespace
markdown_content = markdown_content.strip()
# Add metadata
results.append(types.TextContent(
type="text",
text=f"# Fetch Metadata\n\n**Original URL:** {url}\n**Final URL:** {final_url}\n**Title:** {page_title}\n**Content Length:** {len(markdown_content)} characters\n**Status Code:** {response.status_code}\n**Content Type:** {response.headers.get('content-type', 'unknown')}"
))
# Add the markdown content
results.append(types.TextContent(
type="text",
text=f"# {page_title}\n\n{markdown_content}"
))
except requests.exceptions.Timeout:
results.append(types.TextContent(
type="text",
text=f"# Fetch Error\n\n**URL:** {url}\n**Error:** Request timeout after {timeout} seconds"
))
except requests.exceptions.ConnectionError:
results.append(types.TextContent(
type="text",
text=f"# Fetch Error\n\n**URL:** {url}\n**Error:** Connection error - unable to reach the server"
))
except requests.exceptions.HTTPError as e:
results.append(types.TextContent(
type="text",
text=f"# Fetch Error\n\n**URL:** {url}\n**Error:** HTTP error {e.response.status_code} - {e.response.reason}"
))
except requests.exceptions.RequestException as e:
results.append(types.TextContent(
type="text",
text=f"# Fetch Error\n\n**URL:** {url}\n**Error:** Request failed - {str(e)}"
))
except Exception as e:
logger.error(f"Unexpected error fetching {url}: {e}")
results.append(types.TextContent(
type="text",
text=f"# Fetch Error\n\n**URL:** {url}\n**Error:** Unexpected error - {str(e)}"
))
return results
async def main():
# Run the server using stdin/stdout streams
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="fetch-as-markdown",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
def cli():
"""Entry point for the mcp-fetch-as-markdown command."""
asyncio.run(main())
if __name__ == "__main__":
cli()