Skip to main content
Glama

MCP Data Fetch Server

by undici77
handlers.py19.1 kB
""" Request handlers for MCP tools with sandboxed file operations. """ import re import logging import hashlib from pathlib import Path from typing import Dict, Any, List from urllib.parse import urlparse, urljoin import aiofiles from bs4 import BeautifulSoup from markdownify import markdownify as md from models import MCPMessage, MCPError, ContentBlock, ToolResult from config import MAX_CONTENT_LENGTH, MAX_DOWNLOAD_SIZE logger = logging.getLogger(__name__) class ToolHandlers: """Handlers for all tool operations.""" def __init__(self, server): self.server = server async def handle_fetch_webpage(self, message_id: Any, arguments: Dict[str, Any]) -> MCPMessage: """Fetch webpage with security and banner removal.""" try: url = arguments.get("url") format_type = arguments.get("format", "markdown") include_links = arguments.get("include_links", True) include_images = arguments.get("include_images", False) remove_banners = arguments.get("remove_banners", True) if not url: return self._create_error_response( message_id, -32602, "Missing required parameter: url" ) # Security validation is_safe, error_msg = self.server.security.is_safe_url(url) if not is_safe: return self._create_error_response( message_id, -32602, f"Unsafe URL: {error_msg}" ) # Check for prompt injection in URL if self.server.security.is_prompt_injection(url): return self._create_error_response( message_id, -32602, "Potential prompt injection detected in URL" ) # Fetch page session = await self.server._get_session() async with session.get(url, allow_redirects=True) as response: if response.status != 200: return self._create_error_response( message_id, -32603, f"HTTP error: {response.status}" ) content_type = response.headers.get('Content-Type', '') if 'text/html' not in content_type: return self._create_error_response( message_id, -32602, f"Not HTML content: {content_type}" ) # Check content length content_length = response.headers.get('Content-Length') if content_length and int(content_length) > MAX_CONTENT_LENGTH: return self._create_error_response( message_id, -32602, f"Content too large: {content_length} bytes" ) html = await response.text() # Sanitize HTML html = self.server.security.sanitize_html_content(html) # Parse with BeautifulSoup soup = BeautifulSoup(html, 'html.parser') # Remove cookie banners and popups if remove_banners: soup = self._remove_banners_and_popups(soup) # Extract content if format_type == "markdown": content = md(str(soup.body or soup), heading_style="ATX") elif format_type == "text": content = soup.get_text(separator='\n', strip=True) else: # html content = str(soup) # Check for prompt injection in content if self.server.security.is_prompt_injection(content[:5000]): logger.warning(f"Potential prompt injection detected in content from {url}") content = "[WARNING: Potential prompt injection detected in content]\n\n" + content # Build output output = f"# Webpage Content\n\n**URL**: {url}\n**Format**: {format_type}\n\n---\n\n{content}" # Add links if include_links: links = self._extract_links_from_soup(soup, url) if links: output += f"\n\n## Links ({len(links)})\n\n" for link in links[:50]: output += f"- [{link['text']}]({link['url']})\n" # Add images if include_images: images = [urljoin(url, img.get('src')) for img in soup.find_all('img') if img.get('src')] if images: output += f"\n\n## Images ({len(images)})\n\n" for img_url in images[:30]: output += f"- {img_url}\n" result = ToolResult([ContentBlock("text", output)]) return MCPMessage(id=message_id, result=result.to_dict()) except Exception as e: logger.error(f"Fetch error: {e}", exc_info=True) return self._create_error_response( message_id, -32603, f"Fetch error: {str(e)}" ) async def handle_extract_links(self, message_id: Any, arguments: Dict[str, Any]) -> MCPMessage: """Extract links from page.""" try: url = arguments.get("url") filter_type = arguments.get("filter", "all") if not url: return self._create_error_response( message_id, -32602, "Missing required parameter: url" ) is_safe, error_msg = self.server.security.is_safe_url(url) if not is_safe: return self._create_error_response( message_id, -32602, f"Unsafe URL: {error_msg}" ) # Fetch page session = await self.server._get_session() async with session.get(url) as response: html = await response.text() html = self.server.security.sanitize_html_content(html) soup = BeautifulSoup(html, 'html.parser') all_links = self._extract_links_from_soup(soup, url) # Filter if filter_type == "internal": links = [l for l in all_links if l['internal']] elif filter_type == "external": links = [l for l in all_links if not l['internal']] elif filter_type == "resources": links = [l for l in all_links if self._is_resource_url(l['url'])] else: links = all_links # Output output = f"# Links from {url}\n\n**Filter**: {filter_type}\n**Total**: {len(links)}\n\n" for i, link in enumerate(links, 1): link_type = "🔗 Internal" if link['internal'] else "🌐 External" output += f"{i}. {link_type}: **{link['text']}**\n {link['url']}\n\n" result = ToolResult([ContentBlock("text", output)]) return MCPMessage(id=message_id, result=result.to_dict()) except Exception as e: logger.error(f"Link extraction error: {e}", exc_info=True) return self._create_error_response( message_id, -32603, f"Extraction error: {str(e)}" ) async def handle_download_file(self, message_id: Any, arguments: Dict[str, Any]) -> MCPMessage: """Download file securely to sandboxed cache directory.""" try: url = arguments.get("url") filename = arguments.get("filename") if not url: return self._create_error_response( message_id, -32602, "Missing required parameter: url" ) is_safe, error_msg = self.server.security.is_safe_url(url) if not is_safe: return self._create_error_response( message_id, -32602, f"Unsafe URL: {error_msg}" ) # Generate/sanitize filename if filename: filename = str(self.server.security.sanitize_path(filename)) else: parsed = urlparse(url) filename = str(self.server.security.sanitize_path( Path(parsed.path).name or f"download_{hashlib.md5(url.encode()).hexdigest()[:8]}" )) # Get safe path within cache directory dest_path, is_safe_path = self.server.security.get_safe_path(filename, self.server.cache_dir) if not is_safe_path: return self._create_error_response( message_id, -32602, "Path traversal attempt detected" ) # Download session = await self.server._get_session() total_size = 0 async with session.get(url) as response: if response.status != 200: return self._create_error_response( message_id, -32603, f"HTTP error: {response.status}" ) content_length = response.headers.get('Content-Length') if content_length and int(content_length) > MAX_DOWNLOAD_SIZE: return self._create_error_response( message_id, -32602, f"File too large: {content_length} bytes" ) async with aiofiles.open(dest_path, 'wb') as f: async for chunk in response.content.iter_chunked(8192): total_size += len(chunk) if total_size > MAX_DOWNLOAD_SIZE: dest_path.unlink() return self._create_error_response( message_id, -32602, "File exceeded size limit during download" ) await f.write(chunk) # Get relative path from working directory for cross-agent access relative_path = self.server.security.get_relative_path(dest_path) output = f"# File Downloaded\n\n" output += f"**URL**: {url}\n" output += f"**Filename**: {filename}\n" output += f"**Size**: {total_size:,} bytes ({total_size / 1024:.2f} KB)\n" output += f"**Relative Path**: {relative_path}\n" output += f"\n*IMPORTANT: Use the relative path to access this file from other agents configured with the same working directory.*\n" result = ToolResult([ContentBlock("text", output)]) return MCPMessage(id=message_id, result=result.to_dict()) except Exception as e: logger.error(f"Download error: {e}", exc_info=True) return self._create_error_response( message_id, -32603, f"Download error: {str(e)}" ) async def handle_get_metadata(self, message_id: Any, arguments: Dict[str, Any]) -> MCPMessage: """Extract page metadata.""" try: url = arguments.get("url") if not url: return self._create_error_response( message_id, -32602, "Missing required parameter: url" ) is_safe, error_msg = self.server.security.is_safe_url(url) if not is_safe: return self._create_error_response( message_id, -32602, f"Unsafe URL: {error_msg}" ) session = await self.server._get_session() async with session.get(url) as response: html = await response.text() html = self.server.security.sanitize_html_content(html) soup = BeautifulSoup(html, 'html.parser') metadata = { 'url': url, 'title': soup.title.string if soup.title else None, 'description': None, 'keywords': None, 'og': {}, 'twitter': {}, } for meta in soup.find_all('meta'): name = meta.get('name', '').lower() property_val = meta.get('property', '').lower() content = meta.get('content', '') if name == 'description': metadata['description'] = content elif name == 'keywords': metadata['keywords'] = content elif property_val.startswith('og:'): metadata['og'][property_val[3:]] = content elif name.startswith('twitter:'): metadata['twitter'][name[8:]] = content # Format output output = f"# Page Metadata\n\n**URL**: {url}\n\n" if metadata['title']: output += f"**Title**: {metadata['title']}\n\n" if metadata['description']: output += f"**Description**: {metadata['description']}\n\n" if metadata['keywords']: output += f"**Keywords**: {metadata['keywords']}\n\n" if metadata['og']: output += "## Open Graph\n\n" for key, value in metadata['og'].items(): output += f"- **{key}**: {value}\n" output += "\n" if metadata['twitter']: output += "## Twitter Cards\n\n" for key, value in metadata['twitter'].items(): output += f"- **{key}**: {value}\n" result = ToolResult([ContentBlock("text", output)]) return MCPMessage(id=message_id, result=result.to_dict()) except Exception as e: logger.error(f"Metadata error: {e}", exc_info=True) return self._create_error_response( message_id, -32603, f"Metadata extraction error: {str(e)}" ) async def handle_check_url(self, message_id: Any, arguments: Dict[str, Any]) -> MCPMessage: """Check URL status.""" try: url = arguments.get("url") if not url: return self._create_error_response( message_id, -32602, "Missing required parameter: url" ) is_safe, error_msg = self.server.security.is_safe_url(url) if not is_safe: return self._create_error_response( message_id, -32602, f"Unsafe URL: {error_msg}" ) session = await self.server._get_session() async with session.head(url, allow_redirects=True) as response: status = response.status headers = dict(response.headers) final_url = str(response.url) output = f"# URL Check\n\n" output += f"**Original URL**: {url}\n" if final_url != url: output += f"**Final URL**: {final_url}\n" output += f"**Status Code**: {status}\n" output += f"**Status**: {'✅ OK' if 200 <= status < 300 else '⚠️ Error'}\n\n" output += "## Headers\n\n" for key in ['Content-Type', 'Content-Length', 'Last-Modified', 'Server']: if key in headers: output += f"- **{key}**: {headers[key]}\n" if 'Content-Length' in headers: size = int(headers['Content-Length']) output += f"\n**Size**: {size:,} bytes ({size / 1024:.2f} KB)\n" result = ToolResult([ContentBlock("text", output)]) return MCPMessage(id=message_id, result=result.to_dict()) except Exception as e: logger.error(f"URL check error: {e}", exc_info=True) return self._create_error_response( message_id, -32603, f"Check error: {str(e)}" ) def _remove_banners_and_popups(self, soup: BeautifulSoup) -> BeautifulSoup: """Remove cookie banners, popups, and overlays.""" # Common banner/popup selectors selectors = [ {'class': re.compile(r'cookie', re.I)}, {'class': re.compile(r'banner', re.I)}, {'class': re.compile(r'consent', re.I)}, {'class': re.compile(r'gdpr', re.I)}, {'class': re.compile(r'popup', re.I)}, {'class': re.compile(r'modal', re.I)}, {'class': re.compile(r'overlay', re.I)}, {'id': re.compile(r'cookie', re.I)}, {'id': re.compile(r'consent', re.I)}, {'id': re.compile(r'banner', re.I)}, ] for selector in selectors: for element in soup.find_all(attrs=selector): element.decompose() # Remove fixed/sticky positioned elements (often used for banners) for element in soup.find_all(style=re.compile(r'position\s*:\s*(fixed|sticky)', re.I)): element.decompose() return soup def _extract_links_from_soup(self, soup: BeautifulSoup, base_url: str) -> List[Dict[str, str]]: """Extract links from soup.""" links = [] base_domain = urlparse(base_url).netloc for a in soup.find_all('a', href=True): href = a.get('href') text = a.get_text(strip=True) if not href or href.startswith(('#', 'javascript:', 'mailto:', 'tel:')): continue full_url = urljoin(base_url, href) # Validate URL is_safe, _ = self.server.security.is_safe_url(full_url) if not is_safe: continue link_domain = urlparse(full_url).netloc links.append({ 'url': full_url, 'text': text or '[no text]', 'internal': link_domain == base_domain }) return links def _is_resource_url(self, url: str) -> bool: """Check if URL is a resource.""" resource_extensions = { '.jpg', '.jpeg', '.png', '.gif', '.svg', '.webp', '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.zip', '.rar', '.tar', '.gz', '.mp3', '.mp4', '.avi', '.mov' } path = urlparse(url).path.lower() return any(path.endswith(ext) for ext in resource_extensions) def _create_error_response(self, message_id: Any, code: int, message: str) -> MCPMessage: """Create error response.""" return MCPMessage(id=message_id, error=MCPError(code, message).to_dict())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/undici77/MCPDataFetchServer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server