handlers.py•19.1 kB
"""
Request handlers for MCP tools with sandboxed file operations.
"""
import re
import logging
import hashlib
from pathlib import Path
from typing import Dict, Any, List
from urllib.parse import urlparse, urljoin
import aiofiles
from bs4 import BeautifulSoup
from markdownify import markdownify as md
from models import MCPMessage, MCPError, ContentBlock, ToolResult
from config import MAX_CONTENT_LENGTH, MAX_DOWNLOAD_SIZE
logger = logging.getLogger(__name__)
class ToolHandlers:
"""Handlers for all tool operations."""
def __init__(self, server):
self.server = server
async def handle_fetch_webpage(self, message_id: Any, arguments: Dict[str, Any]) -> MCPMessage:
"""Fetch webpage with security and banner removal."""
try:
url = arguments.get("url")
format_type = arguments.get("format", "markdown")
include_links = arguments.get("include_links", True)
include_images = arguments.get("include_images", False)
remove_banners = arguments.get("remove_banners", True)
if not url:
return self._create_error_response(
message_id, -32602, "Missing required parameter: url"
)
# Security validation
is_safe, error_msg = self.server.security.is_safe_url(url)
if not is_safe:
return self._create_error_response(
message_id, -32602, f"Unsafe URL: {error_msg}"
)
# Check for prompt injection in URL
if self.server.security.is_prompt_injection(url):
return self._create_error_response(
message_id, -32602, "Potential prompt injection detected in URL"
)
# Fetch page
session = await self.server._get_session()
async with session.get(url, allow_redirects=True) as response:
if response.status != 200:
return self._create_error_response(
message_id, -32603, f"HTTP error: {response.status}"
)
content_type = response.headers.get('Content-Type', '')
if 'text/html' not in content_type:
return self._create_error_response(
message_id, -32602, f"Not HTML content: {content_type}"
)
# Check content length
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > MAX_CONTENT_LENGTH:
return self._create_error_response(
message_id, -32602, f"Content too large: {content_length} bytes"
)
html = await response.text()
# Sanitize HTML
html = self.server.security.sanitize_html_content(html)
# Parse with BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
# Remove cookie banners and popups
if remove_banners:
soup = self._remove_banners_and_popups(soup)
# Extract content
if format_type == "markdown":
content = md(str(soup.body or soup), heading_style="ATX")
elif format_type == "text":
content = soup.get_text(separator='\n', strip=True)
else: # html
content = str(soup)
# Check for prompt injection in content
if self.server.security.is_prompt_injection(content[:5000]):
logger.warning(f"Potential prompt injection detected in content from {url}")
content = "[WARNING: Potential prompt injection detected in content]\n\n" + content
# Build output
output = f"# Webpage Content\n\n**URL**: {url}\n**Format**: {format_type}\n\n---\n\n{content}"
# Add links
if include_links:
links = self._extract_links_from_soup(soup, url)
if links:
output += f"\n\n## Links ({len(links)})\n\n"
for link in links[:50]:
output += f"- [{link['text']}]({link['url']})\n"
# Add images
if include_images:
images = [urljoin(url, img.get('src')) for img in soup.find_all('img') if img.get('src')]
if images:
output += f"\n\n## Images ({len(images)})\n\n"
for img_url in images[:30]:
output += f"- {img_url}\n"
result = ToolResult([ContentBlock("text", output)])
return MCPMessage(id=message_id, result=result.to_dict())
except Exception as e:
logger.error(f"Fetch error: {e}", exc_info=True)
return self._create_error_response(
message_id, -32603, f"Fetch error: {str(e)}"
)
async def handle_extract_links(self, message_id: Any, arguments: Dict[str, Any]) -> MCPMessage:
"""Extract links from page."""
try:
url = arguments.get("url")
filter_type = arguments.get("filter", "all")
if not url:
return self._create_error_response(
message_id, -32602, "Missing required parameter: url"
)
is_safe, error_msg = self.server.security.is_safe_url(url)
if not is_safe:
return self._create_error_response(
message_id, -32602, f"Unsafe URL: {error_msg}"
)
# Fetch page
session = await self.server._get_session()
async with session.get(url) as response:
html = await response.text()
html = self.server.security.sanitize_html_content(html)
soup = BeautifulSoup(html, 'html.parser')
all_links = self._extract_links_from_soup(soup, url)
# Filter
if filter_type == "internal":
links = [l for l in all_links if l['internal']]
elif filter_type == "external":
links = [l for l in all_links if not l['internal']]
elif filter_type == "resources":
links = [l for l in all_links if self._is_resource_url(l['url'])]
else:
links = all_links
# Output
output = f"# Links from {url}\n\n**Filter**: {filter_type}\n**Total**: {len(links)}\n\n"
for i, link in enumerate(links, 1):
link_type = "🔗 Internal" if link['internal'] else "🌐 External"
output += f"{i}. {link_type}: **{link['text']}**\n {link['url']}\n\n"
result = ToolResult([ContentBlock("text", output)])
return MCPMessage(id=message_id, result=result.to_dict())
except Exception as e:
logger.error(f"Link extraction error: {e}", exc_info=True)
return self._create_error_response(
message_id, -32603, f"Extraction error: {str(e)}"
)
async def handle_download_file(self, message_id: Any, arguments: Dict[str, Any]) -> MCPMessage:
"""Download file securely to sandboxed cache directory."""
try:
url = arguments.get("url")
filename = arguments.get("filename")
if not url:
return self._create_error_response(
message_id, -32602, "Missing required parameter: url"
)
is_safe, error_msg = self.server.security.is_safe_url(url)
if not is_safe:
return self._create_error_response(
message_id, -32602, f"Unsafe URL: {error_msg}"
)
# Generate/sanitize filename
if filename:
filename = str(self.server.security.sanitize_path(filename))
else:
parsed = urlparse(url)
filename = str(self.server.security.sanitize_path(
Path(parsed.path).name or f"download_{hashlib.md5(url.encode()).hexdigest()[:8]}"
))
# Get safe path within cache directory
dest_path, is_safe_path = self.server.security.get_safe_path(filename, self.server.cache_dir)
if not is_safe_path:
return self._create_error_response(
message_id, -32602, "Path traversal attempt detected"
)
# Download
session = await self.server._get_session()
total_size = 0
async with session.get(url) as response:
if response.status != 200:
return self._create_error_response(
message_id, -32603, f"HTTP error: {response.status}"
)
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > MAX_DOWNLOAD_SIZE:
return self._create_error_response(
message_id, -32602, f"File too large: {content_length} bytes"
)
async with aiofiles.open(dest_path, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
total_size += len(chunk)
if total_size > MAX_DOWNLOAD_SIZE:
dest_path.unlink()
return self._create_error_response(
message_id, -32602, "File exceeded size limit during download"
)
await f.write(chunk)
# Get relative path from working directory for cross-agent access
relative_path = self.server.security.get_relative_path(dest_path)
output = f"# File Downloaded\n\n"
output += f"**URL**: {url}\n"
output += f"**Filename**: {filename}\n"
output += f"**Size**: {total_size:,} bytes ({total_size / 1024:.2f} KB)\n"
output += f"**Relative Path**: {relative_path}\n"
output += f"\n*IMPORTANT: Use the relative path to access this file from other agents configured with the same working directory.*\n"
result = ToolResult([ContentBlock("text", output)])
return MCPMessage(id=message_id, result=result.to_dict())
except Exception as e:
logger.error(f"Download error: {e}", exc_info=True)
return self._create_error_response(
message_id, -32603, f"Download error: {str(e)}"
)
async def handle_get_metadata(self, message_id: Any, arguments: Dict[str, Any]) -> MCPMessage:
"""Extract page metadata."""
try:
url = arguments.get("url")
if not url:
return self._create_error_response(
message_id, -32602, "Missing required parameter: url"
)
is_safe, error_msg = self.server.security.is_safe_url(url)
if not is_safe:
return self._create_error_response(
message_id, -32602, f"Unsafe URL: {error_msg}"
)
session = await self.server._get_session()
async with session.get(url) as response:
html = await response.text()
html = self.server.security.sanitize_html_content(html)
soup = BeautifulSoup(html, 'html.parser')
metadata = {
'url': url,
'title': soup.title.string if soup.title else None,
'description': None,
'keywords': None,
'og': {},
'twitter': {},
}
for meta in soup.find_all('meta'):
name = meta.get('name', '').lower()
property_val = meta.get('property', '').lower()
content = meta.get('content', '')
if name == 'description':
metadata['description'] = content
elif name == 'keywords':
metadata['keywords'] = content
elif property_val.startswith('og:'):
metadata['og'][property_val[3:]] = content
elif name.startswith('twitter:'):
metadata['twitter'][name[8:]] = content
# Format output
output = f"# Page Metadata\n\n**URL**: {url}\n\n"
if metadata['title']:
output += f"**Title**: {metadata['title']}\n\n"
if metadata['description']:
output += f"**Description**: {metadata['description']}\n\n"
if metadata['keywords']:
output += f"**Keywords**: {metadata['keywords']}\n\n"
if metadata['og']:
output += "## Open Graph\n\n"
for key, value in metadata['og'].items():
output += f"- **{key}**: {value}\n"
output += "\n"
if metadata['twitter']:
output += "## Twitter Cards\n\n"
for key, value in metadata['twitter'].items():
output += f"- **{key}**: {value}\n"
result = ToolResult([ContentBlock("text", output)])
return MCPMessage(id=message_id, result=result.to_dict())
except Exception as e:
logger.error(f"Metadata error: {e}", exc_info=True)
return self._create_error_response(
message_id, -32603, f"Metadata extraction error: {str(e)}"
)
async def handle_check_url(self, message_id: Any, arguments: Dict[str, Any]) -> MCPMessage:
"""Check URL status."""
try:
url = arguments.get("url")
if not url:
return self._create_error_response(
message_id, -32602, "Missing required parameter: url"
)
is_safe, error_msg = self.server.security.is_safe_url(url)
if not is_safe:
return self._create_error_response(
message_id, -32602, f"Unsafe URL: {error_msg}"
)
session = await self.server._get_session()
async with session.head(url, allow_redirects=True) as response:
status = response.status
headers = dict(response.headers)
final_url = str(response.url)
output = f"# URL Check\n\n"
output += f"**Original URL**: {url}\n"
if final_url != url:
output += f"**Final URL**: {final_url}\n"
output += f"**Status Code**: {status}\n"
output += f"**Status**: {'✅ OK' if 200 <= status < 300 else '⚠️ Error'}\n\n"
output += "## Headers\n\n"
for key in ['Content-Type', 'Content-Length', 'Last-Modified', 'Server']:
if key in headers:
output += f"- **{key}**: {headers[key]}\n"
if 'Content-Length' in headers:
size = int(headers['Content-Length'])
output += f"\n**Size**: {size:,} bytes ({size / 1024:.2f} KB)\n"
result = ToolResult([ContentBlock("text", output)])
return MCPMessage(id=message_id, result=result.to_dict())
except Exception as e:
logger.error(f"URL check error: {e}", exc_info=True)
return self._create_error_response(
message_id, -32603, f"Check error: {str(e)}"
)
def _remove_banners_and_popups(self, soup: BeautifulSoup) -> BeautifulSoup:
"""Remove cookie banners, popups, and overlays."""
# Common banner/popup selectors
selectors = [
{'class': re.compile(r'cookie', re.I)},
{'class': re.compile(r'banner', re.I)},
{'class': re.compile(r'consent', re.I)},
{'class': re.compile(r'gdpr', re.I)},
{'class': re.compile(r'popup', re.I)},
{'class': re.compile(r'modal', re.I)},
{'class': re.compile(r'overlay', re.I)},
{'id': re.compile(r'cookie', re.I)},
{'id': re.compile(r'consent', re.I)},
{'id': re.compile(r'banner', re.I)},
]
for selector in selectors:
for element in soup.find_all(attrs=selector):
element.decompose()
# Remove fixed/sticky positioned elements (often used for banners)
for element in soup.find_all(style=re.compile(r'position\s*:\s*(fixed|sticky)', re.I)):
element.decompose()
return soup
def _extract_links_from_soup(self, soup: BeautifulSoup, base_url: str) -> List[Dict[str, str]]:
"""Extract links from soup."""
links = []
base_domain = urlparse(base_url).netloc
for a in soup.find_all('a', href=True):
href = a.get('href')
text = a.get_text(strip=True)
if not href or href.startswith(('#', 'javascript:', 'mailto:', 'tel:')):
continue
full_url = urljoin(base_url, href)
# Validate URL
is_safe, _ = self.server.security.is_safe_url(full_url)
if not is_safe:
continue
link_domain = urlparse(full_url).netloc
links.append({
'url': full_url,
'text': text or '[no text]',
'internal': link_domain == base_domain
})
return links
def _is_resource_url(self, url: str) -> bool:
"""Check if URL is a resource."""
resource_extensions = {
'.jpg', '.jpeg', '.png', '.gif', '.svg', '.webp',
'.pdf', '.doc', '.docx', '.xls', '.xlsx',
'.zip', '.rar', '.tar', '.gz',
'.mp3', '.mp4', '.avi', '.mov'
}
path = urlparse(url).path.lower()
return any(path.endswith(ext) for ext in resource_extensions)
def _create_error_response(self, message_id: Any, code: int, message: str) -> MCPMessage:
"""Create error response."""
return MCPMessage(id=message_id, error=MCPError(code, message).to_dict())