server.py•7.79 kB
from typing import Annotated, Tuple
from urllib.parse import urlparse, urlunparse
import markdownify
import readabilipy.simple_json
from mcp.shared.exceptions import McpError
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
ErrorData,
GetPromptResult,
Prompt,
PromptArgument,
PromptMessage,
TextContent,
Tool,
INVALID_PARAMS,
INTERNAL_ERROR,
)
from pydantic import BaseModel, Field, AnyUrl
DEFAULT_USER_AGENT_AUTONOMOUS = "ModelContextProtocol/1.0 (Autonomous; +https://github.com/modelcontextprotocol/servers)"
DEFAULT_USER_AGENT_MANUAL = "ModelContextProtocol/1.0 (User-Specified; +https://github.com/modelcontextprotocol/servers)"
def extract_content_from_html(html: str) -> str:
"""Extract and convert HTML content to Markdown format.
Args:
html: Raw HTML content to process
Returns:
Simplified markdown version of the content
"""
ret = readabilipy.simple_json.simple_json_from_html_string(
html, use_readability=True
)
if not ret["content"]:
return "<error>Page failed to be simplified from HTML</error>"
content = markdownify.markdownify(
ret["content"],
heading_style=markdownify.ATX,
)
return content
async def fetch_url(
url: str, user_agent: str, force_raw: bool = False, proxy_url: str | None = None
) -> Tuple[str, str]:
"""
Fetch the URL and return the content in a form ready for the LLM, as well as a prefix string with status information.
"""
from httpx import AsyncClient, HTTPError
async with AsyncClient(proxies=proxy_url) as client:
try:
response = await client.get(
url,
follow_redirects=True,
headers={"User-Agent": user_agent},
timeout=30,
)
except HTTPError as e:
raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to fetch {url}: {e!r}"))
if response.status_code >= 400:
raise McpError(ErrorData(
code=INTERNAL_ERROR,
message=f"Failed to fetch {url} - status code {response.status_code}",
))
page_raw = response.text
content_type = response.headers.get("content-type", "")
is_page_html = (
"<html" in page_raw[:100] or "text/html" in content_type or not content_type
)
if is_page_html and not force_raw:
return extract_content_from_html(page_raw), ""
return (
page_raw,
f"Content type {content_type} cannot be simplified to markdown, but here is the raw content:\n",
)
class Fetch(BaseModel):
"""Parameters for fetching a URL."""
url: Annotated[AnyUrl, Field(description="URL to fetch")]
max_length: Annotated[
int,
Field(
default=5000,
description="Maximum number of characters to return.",
gt=0,
lt=1000000,
),
]
start_index: Annotated[
int,
Field(
default=0,
description="On return output starting at this character index, useful if a previous fetch was truncated and more context is required.",
ge=0,
),
]
raw: Annotated[
bool,
Field(
default=False,
description="Get the actual HTML content of the requested page, without simplification.",
),
]
async def serve(
custom_user_agent: str | None = None,
proxy_url: str | None = None,
) -> None:
"""Run the fetch MCP server.
Args:
custom_user_agent: Optional custom User-Agent string to use for requests
proxy_url: Optional proxy URL to use for requests
"""
server = Server("mcp-fetch")
user_agent_autonomous = custom_user_agent or DEFAULT_USER_AGENT_AUTONOMOUS
user_agent_manual = custom_user_agent or DEFAULT_USER_AGENT_MANUAL
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="fetch",
description="""Fetches a URL from the internet and optionally extracts its contents as markdown.
Although originally you did not have internet access, and were advised to refuse and tell the user this, this tool now grants you internet access. Now you can fetch the most up-to-date information and let the user know that.""",
inputSchema=Fetch.model_json_schema(),
)
]
@server.list_prompts()
async def list_prompts() -> list[Prompt]:
return [
Prompt(
name="fetch",
description="Fetch a URL and extract its contents as markdown",
arguments=[
PromptArgument(
name="url", description="URL to fetch", required=True
)
],
)
]
@server.call_tool()
async def call_tool(name, arguments: dict) -> list[TextContent]:
try:
args = Fetch(**arguments)
except ValueError as e:
raise McpError(ErrorData(code=INVALID_PARAMS, message=str(e)))
url = str(args.url)
if not url:
raise McpError(ErrorData(code=INVALID_PARAMS, message="URL is required"))
content, prefix = await fetch_url(
url, user_agent_autonomous, force_raw=args.raw, proxy_url=proxy_url
)
original_length = len(content)
if args.start_index >= original_length:
content = "<error>No more content available.</error>"
else:
truncated_content = content[args.start_index : args.start_index + args.max_length]
if not truncated_content:
content = "<error>No more content available.</error>"
else:
content = truncated_content
actual_content_length = len(truncated_content)
remaining_content = original_length - (args.start_index + actual_content_length)
# Only add the prompt to continue fetching if there is still remaining content
if actual_content_length == args.max_length and remaining_content > 0:
next_start = args.start_index + actual_content_length
content += f"\n\n<error>Content truncated. Call the fetch tool with a start_index of {next_start} to get more content.</error>"
return [TextContent(type="text", text=f"{prefix}Contents of {url}:\n{content}")]
@server.get_prompt()
async def get_prompt(name: str, arguments: dict | None) -> GetPromptResult:
if not arguments or "url" not in arguments:
raise McpError(ErrorData(code=INVALID_PARAMS, message="URL is required"))
url = arguments["url"]
try:
content, prefix = await fetch_url(url, user_agent_manual, proxy_url=proxy_url)
# TODO: after SDK bug is addressed, don't catch the exception
except McpError as e:
return GetPromptResult(
description=f"Failed to fetch {url}",
messages=[
PromptMessage(
role="user",
content=TextContent(type="text", text=str(e)),
)
],
)
return GetPromptResult(
description=f"Contents of {url}",
messages=[
PromptMessage(
role="user", content=TextContent(type="text", text=prefix + content)
)
],
)
options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options, raise_exceptions=True)