server.py•6 kB
"""
Main MCP Data Fetch Server implementation with sandboxed working directory.
"""
import logging
from pathlib import Path
from typing import Optional
import aiohttp
from config import REQUEST_TIMEOUT, USER_AGENT
from security import SecurityValidator
from models import MCPMessage, MCPError
from tools import get_tool_definitions
from handlers import ToolHandlers
logger = logging.getLogger(__name__)
class MCPDataFetchServer:
"""
Secure MCP server for fetching web content with sandboxed working directory.
NO JavaScript execution, enhanced security, cookie/banner handling.
"""
def __init__(self, working_dir: Path, cache_dir: Path):
"""
Initialize server with sandboxed directories.
Args:
working_dir: Sandboxed working directory (all operations confined here)
cache_dir: Cache directory (must be within working_dir)
"""
self.working_dir = working_dir.resolve()
self.cache_dir = cache_dir.resolve()
# Ensure cache_dir is within working_dir
try:
self.cache_dir.relative_to(self.working_dir)
except ValueError:
raise ValueError(f"Cache directory {self.cache_dir} must be within working directory {self.working_dir}")
# Create directories
self.working_dir.mkdir(parents=True, exist_ok=True)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.session: Optional[aiohttp.ClientSession] = None
self.security = SecurityValidator(self.working_dir)
self.handlers = ToolHandlers(self)
logger.info(f"Data fetch server started")
logger.info(f"Working directory (sandbox): {self.working_dir}")
logger.info(f"Cache directory: {self.cache_dir}")
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create session with cookie handling."""
if self.session is None or self.session.closed:
timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)
# Cookie jar for handling cookies (including consent cookies)
jar = aiohttp.CookieJar(unsafe=True)
headers = {
'User-Agent': USER_AGENT,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
self.session = aiohttp.ClientSession(
timeout=timeout,
headers=headers,
cookie_jar=jar
)
return self.session
async def close(self):
"""Close session."""
if self.session and not self.session.closed:
await self.session.close()
async def handle_message(self, message: MCPMessage) -> Optional[MCPMessage]:
"""Handle incoming messages."""
try:
if message.method == "initialize":
return self.handle_initialize(message)
elif message.method == "tools/list":
return self.handle_tools_list(message)
elif message.method == "tools/call":
return await self.handle_tool_call(message)
else:
return self.create_error_response(
message.id, -32601, f"Method not found: {message.method}"
)
except Exception as e:
logger.error(f"Error handling message: {e}", exc_info=True)
return self.create_error_response(
message.id, -32603, f"Internal error: {str(e)}"
)
def handle_initialize(self, message: MCPMessage) -> MCPMessage:
"""Initialize."""
result = {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {
"name": "mcp-datafetch-server",
"version": "2.1.0"
},
}
return MCPMessage(id=message.id, result=result)
def handle_tools_list(self, message: MCPMessage) -> MCPMessage:
"""List tools."""
tools = get_tool_definitions()
result = {"tools": [tool.to_dict() for tool in tools]}
return MCPMessage(id=message.id, result=result)
async def handle_tool_call(self, message: MCPMessage) -> MCPMessage:
"""Handle tool call."""
try:
params = message.params
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "fetch_webpage":
return await self.handlers.handle_fetch_webpage(message.id, arguments)
elif tool_name == "extract_links":
return await self.handlers.handle_extract_links(message.id, arguments)
elif tool_name == "download_file":
return await self.handlers.handle_download_file(message.id, arguments)
elif tool_name == "get_page_metadata":
return await self.handlers.handle_get_metadata(message.id, arguments)
elif tool_name == "check_url":
return await self.handlers.handle_check_url(message.id, arguments)
else:
return self.create_error_response(
message.id, -32602, f"Unknown tool: {tool_name}"
)
except Exception as e:
logger.error(f"Error executing tool: {e}", exc_info=True)
return self.create_error_response(
message.id, -32603, f"Tool execution error: {str(e)}"
)
def create_error_response(self, message_id, code: int, message: str) -> MCPMessage:
"""Create error response."""
return MCPMessage(id=message_id, error=MCPError(code, message).to_dict())