Skip to main content
Glama

URL Text Fetcher MCP Server

by billallison
server.py20.8 kB
import asyncio import requests from bs4 import BeautifulSoup import os from pathlib import Path import time import threading import logging from urllib.parse import urlparse import ipaddress from typing import List from mcp.server.fastmcp import FastMCP # Version information __version__ = "2.0.0" # FastMCP modernized version __implementation__ = "FastMCP" # Setup logging to stderr (not stdout for MCP compliance) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()] ) logger = logging.getLogger(__name__) # Initialize FastMCP server mcp = FastMCP("url-text-fetcher") # Load environment variables from .env file if it exists def load_env(): env_path = Path(__file__).parent.parent.parent / '.env' if env_path.exists(): with open(env_path) as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) if value and key not in os.environ: os.environ[key] = value load_env() # Configuration from environment variables with validation def get_int_env(key: str, default: int) -> int: try: return int(os.getenv(key, str(default))) except ValueError: logger.warning(f"Invalid {key} value, using default: {default}") return default # Environment configuration BRAVE_API_KEY = os.getenv('BRAVE_API_KEY', '') BRAVE_RATE_LIMIT_RPS = get_int_env('BRAVE_RATE_LIMIT_RPS', 1) # Default to free tier REQUEST_TIMEOUT = get_int_env('REQUEST_TIMEOUT', 10) CONTENT_LENGTH_LIMIT = get_int_env('CONTENT_LENGTH_LIMIT', 5000) MAX_RESPONSE_SIZE = get_int_env('MAX_RESPONSE_SIZE', 10485760) # 10MB default # Validate rate limit configuration if BRAVE_RATE_LIMIT_RPS < 1: logger.warning(f"Invalid BRAVE_RATE_LIMIT_RPS ({BRAVE_RATE_LIMIT_RPS}), using default: 1") BRAVE_RATE_LIMIT_RPS = 1 elif BRAVE_RATE_LIMIT_RPS > 50: logger.warning(f"Rate limit ({BRAVE_RATE_LIMIT_RPS}) exceeds maximum tier (50), capping at 50") BRAVE_RATE_LIMIT_RPS = 50 # Calculate minimum interval between requests (in seconds) MIN_REQUEST_INTERVAL = 1.0 / BRAVE_RATE_LIMIT_RPS logger.info(f"Brave Search rate limit configured: {BRAVE_RATE_LIMIT_RPS} requests/second (interval: {MIN_REQUEST_INTERVAL:.3f}s)") # Log version information at startup logger.info(f"URL Text Fetcher MCP Server v{__version__} ({__implementation__}) starting up") logger.info(f"Environment: Python {'.'.join(map(str, __import__('sys').version_info[:2]))}, MCP SDK, Brave Search API") # Standard HTTP headers for requests HEADERS = { 'User-Agent': 'Mozilla/5.0 (compatible; MCP-URL-Fetcher/1.0)', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' } # Thread-safe rate limiting for Brave Search API rate_limit_lock = threading.Lock() last_brave_request = [0] # Using list for mutable reference def sanitize_query(query: str) -> str: """Sanitize search query to prevent injection attacks and malformed requests.""" if not query or not isinstance(query, str): return "" # Remove null bytes and control characters query = ''.join(char for char in query if ord(char) >= 32 or char in '\t\n\r') # Limit query length to prevent abuse max_query_length = 500 if len(query) > max_query_length: query = query[:max_query_length] logger.warning(f"Query truncated to {max_query_length} characters") # Remove potentially dangerous patterns dangerous_patterns = ['<script', 'javascript:', 'data:', 'vbscript:'] query_lower = query.lower() for pattern in dangerous_patterns: if pattern in query_lower: logger.warning(f"Potentially dangerous pattern detected in query: {pattern}") query = query.replace(pattern, '') return query.strip() def sanitize_url(url: str) -> str: """Basic URL sanitization and normalization.""" if not url or not isinstance(url, str): return "" # Remove whitespace and control characters url = ''.join(char for char in url if ord(char) >= 32 or char in '\t\n\r') url = url.strip() # Ensure URL has protocol if url and not url.startswith(('http://', 'https://')): # Don't auto-add protocol for security reasons logger.warning(f"URL missing protocol: {url}") return "" return url def is_safe_url(url: str) -> bool: """Validate URL is safe to fetch - prevents SSRF attacks.""" try: parsed = urlparse(url) # Only allow http/https if parsed.scheme not in ['http', 'https']: return False hostname = parsed.hostname if not hostname: return False # Block common internal/metadata hostnames blocked_hostnames = [ 'localhost', 'metadata.google.internal', '169.254.169.254', # AWS/GCP metadata 'metadata' ] if hostname.lower() in blocked_hostnames: return False # Try to resolve hostname to IP to check for internal addresses try: import socket ip = socket.gethostbyname(hostname) ip_obj = ipaddress.ip_address(ip) # Block private/internal IP ranges if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local: return False except socket.gaierror: # DNS resolution failed - domain doesn't exist or network issue # For safety in production, we should block unknown domains return False except ValueError: # Invalid IP address format return False return True except Exception: # Any other parsing error - block to be safe return False def fetch_url_content(url: str) -> str: """Helper function to fetch text content from a URL with safety checks.""" # Validate URL safety first if not is_safe_url(url): logger.warning(f"SECURITY: Blocked unsafe URL: {url}") return "Error: URL not allowed for security reasons" try: # Log request for monitoring logger.info(f"REQUEST: Fetching content from {url}") # Make request with streaming to check size resp = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, stream=True) resp.raise_for_status() # Log response details logger.info(f"RESPONSE: {resp.status_code} from {url}, Content-Type: {resp.headers.get('Content-Type', 'unknown')}") # Check content length header content_length = resp.headers.get('Content-Length') if content_length and int(content_length) > MAX_RESPONSE_SIZE: logger.warning(f"SECURITY: Content too large: {content_length} bytes for {url}") return f"Error: Content too large ({content_length} bytes, max {MAX_RESPONSE_SIZE})" # Read content with size limit content_chunks = [] total_size = 0 try: for chunk in resp.iter_content(chunk_size=8192, decode_unicode=True): if chunk: # filter out keep-alive new chunks total_size += len(chunk) if total_size > MAX_RESPONSE_SIZE: logger.warning(f"SECURITY: Content exceeded size limit for {url}") return f"Error: Content exceeded size limit ({MAX_RESPONSE_SIZE} bytes)" content_chunks.append(chunk) except UnicodeDecodeError: # If we can't decode as text, it's probably binary content logger.warning(f"CONTENT: Unable to decode content as text from {url}") return "Error: Unable to decode content as text" html_content = ''.join(content_chunks) # Parse with BeautifulSoup soup = BeautifulSoup(html_content, "html.parser") # Remove script and style elements for script in soup(["script", "style"]): script.decompose() text_content = soup.get_text(separator="\n", strip=True) # Limit final content length if len(text_content) > CONTENT_LENGTH_LIMIT: logger.info(f"CONTENT: Truncating content from {url} ({len(text_content)} -> {CONTENT_LENGTH_LIMIT} chars)") text_content = text_content[:CONTENT_LENGTH_LIMIT] + "... [Content truncated]" logger.info(f"SUCCESS: Fetched {len(text_content)} characters from {url}") return text_content except requests.RequestException as e: logger.error(f"REQUEST_ERROR: Failed to fetch {url}: {e}") return "Error: Unable to fetch URL content" except Exception as e: logger.error(f"UNEXPECTED_ERROR: Processing {url}: {e}", exc_info=True) return "Error: An unexpected error occurred while processing the URL" def brave_search(query: str, count: int = 10) -> List[dict]: """Perform a Brave search and return results with thread-safe rate limiting.""" if not BRAVE_API_KEY: logger.error("Brave Search API key not configured") raise ValueError("BRAVE_API_KEY environment variable is required") # Thread-safe rate limiting: ensure minimum interval between requests with rate_limit_lock: current_time = time.time() time_since_last_request = current_time - last_brave_request[0] if time_since_last_request < MIN_REQUEST_INTERVAL: sleep_time = MIN_REQUEST_INTERVAL - time_since_last_request logger.info(f"Rate limiting: sleeping for {sleep_time:.3f} seconds (limit: {BRAVE_RATE_LIMIT_RPS} req/s)") time.sleep(sleep_time) last_brave_request[0] = time.time() url = "https://api.search.brave.com/res/v1/web/search" headers = { 'User-Agent': 'Mozilla/5.0 (compatible; MCP-URL-Fetcher/1.0)', 'Accept': 'application/json', # Brave API requires application/json or */* "X-Subscription-Token": BRAVE_API_KEY } params = { "q": query, "count": count, "search_lang": "en", "country": "US", "safesearch": "moderate" } try: logger.info(f"SEARCH_REQUEST: Making Brave Search for '{query}' (count={count})") response = requests.get(url, headers=headers, params=params, timeout=REQUEST_TIMEOUT) # Log response details for debugging logger.info(f"SEARCH_RESPONSE: Status {response.status_code}, Content-Type: {response.headers.get('Content-Type', 'unknown')}") response.raise_for_status() data = response.json() results = [] if 'web' in data and 'results' in data['web']: for result in data['web']['results']: results.append({ 'title': result.get('title', ''), 'url': result.get('url', ''), 'description': result.get('description', ''), }) else: logger.warning(f"Unexpected response structure: {data}") logger.info(f"SEARCH_SUCCESS: Found {len(results)} results for '{query}'") return results except requests.HTTPError as e: logger.error(f"Brave Search API error: {e.response.status_code}") logger.error(f"Response headers: {dict(e.response.headers)}") logger.error(f"Response body: {e.response.text}") if e.response.status_code == 401: raise Exception("Invalid or missing API key - check your BRAVE_API_KEY") elif e.response.status_code == 403: raise Exception("API key does not have permission for this endpoint") elif e.response.status_code == 422: raise Exception("Search request was rejected - please check your query") elif e.response.status_code == 429: raise Exception("Rate limit exceeded - please wait before making another request") else: raise Exception(f"Search service error: {e.response.status_code} - {e.response.text}") except requests.RequestException as e: logger.error(f"Network error during search: {e}") raise Exception("Network error occurred during search") except Exception as e: logger.error(f"Unexpected error in brave_search: {e}", exc_info=True) raise Exception("An unexpected error occurred during search") @mcp.tool() async def get_server_info() -> str: """Get information about this MCP server including version, implementation, and capabilities. Returns: Server information including version, implementation type, and available features """ info = [ f"URL Text Fetcher MCP Server", f"Version: {__version__}", f"Implementation: {__implementation__}", f"Brave Search Rate Limit: {BRAVE_RATE_LIMIT_RPS} requests/second", f"Request Timeout: {REQUEST_TIMEOUT} seconds", f"Content Limit: {CONTENT_LENGTH_LIMIT:,} characters", f"Max Response Size: {MAX_RESPONSE_SIZE:,} bytes", "", "Available Tools:", "• fetch_url_text - Download visible text from any URL", "• fetch_page_links - Extract all links from a webpage", "• brave_search_and_fetch - Search web and fetch content from top results", "• test_brave_search - Test Brave Search API connectivity", "• get_server_info - Display this server information", "", "Security Features:", "• SSRF protection against internal network access", "• Input sanitization for URLs and search queries", "• Content size limiting and memory protection", "• Thread-safe rate limiting for API requests", "", f"Brave API Key: {'✓ Configured' if BRAVE_API_KEY else '✗ Missing'}" ] return "\n".join(info) @mcp.tool() async def test_brave_search(query: str = "test") -> str: """Test the Brave Search API connection and configuration. Args: query: Test query to search for (default: "test") """ if not BRAVE_API_KEY: return "❌ Error: BRAVE_API_KEY environment variable not set" try: logger.info(f"Testing Brave Search API with query: '{query}'") results = brave_search(query, count=1) if results: result = results[0] return f"""✅ Brave Search API Test Successful! Query: {query} Found: {len(results)} result(s) First Result: Title: {result.get('title', 'No title')} URL: {result.get('url', 'No URL')} Description: {result.get('description', 'No description')} API Key: ✓ Valid (length: {len(BRAVE_API_KEY)}) Rate Limit: {BRAVE_RATE_LIMIT_RPS} requests/second""" else: return f"⚠️ API connection successful but no results found for query: '{query}'" except Exception as e: return f"❌ Brave Search API Test Failed: {str(e)}" @mcp.tool() async def fetch_url_text(url: str) -> str: """Download all visible text from a URL. Args: url: The URL to fetch text from """ # Sanitize URL input url = sanitize_url(url) if not url: return "Error: Invalid URL format" logger.info(f"Fetching URL text: {url}") content = fetch_url_content(url) return f"Text content from {url}:\n\n{content}" @mcp.tool() async def fetch_page_links(url: str) -> str: """Return a list of all links on the page. Args: url: The URL to fetch links from """ # Sanitize URL input url = sanitize_url(url) if not url: return "Error: Invalid URL format" # Validate URL safety if not is_safe_url(url): logger.warning(f"Blocked unsafe URL for link fetching: {url}") return "Error: URL not allowed for security reasons" try: logger.info(f"Fetching page links: {url}") resp = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, stream=True) resp.raise_for_status() # Check content length content_length = resp.headers.get('Content-Length') if content_length and int(content_length) > MAX_RESPONSE_SIZE: return f"Error: Page too large ({content_length} bytes)" # Read content with size limit content_chunks = [] total_size = 0 for chunk in resp.iter_content(chunk_size=8192, decode_unicode=True): if chunk: total_size += len(chunk) if total_size > MAX_RESPONSE_SIZE: return "Error: Page content too large" content_chunks.append(chunk) html_content = ''.join(content_chunks) soup = BeautifulSoup(html_content, "html.parser") links = [a.get('href') for a in soup.find_all('a', href=True) if a.get('href')] # Filter and clean links valid_links = [] for link in links: if link.startswith(('http://', 'https://', '/')): valid_links.append(link) links_text = "\n".join(f"- {link}" for link in valid_links[:100]) # Limit to 100 links return f"Links found on {url} ({len(valid_links)} total, showing first 100):\n\n{links_text}" except requests.RequestException as e: logger.error(f"Request failed for {url}: {e}") return "Error: Unable to fetch page" except Exception as e: logger.error(f"Unexpected error fetching links from {url}: {e}", exc_info=True) return "Error: Unable to process page" @mcp.tool() async def brave_search_and_fetch(query: str, max_results: int = 3) -> str: """Search the web using Brave Search and automatically fetch content from the top results. Args: query: The search query max_results: Maximum number of results to fetch content for (default: 3, max: 10) """ # Sanitize query input query = sanitize_query(query) if not query: return "Error: Invalid or empty search query" max_results = max(1, min(10, max_results)) # Clamp between 1-10 try: logger.info(f"Performing Brave search: {query}") search_results = brave_search(query, count=max_results * 2) if not search_results: return f"No search results found for query: {query}" # Build response with search results and content response_parts = [f"Search Results for: {query}", "=" * 50, ""] fetched_count = 0 for result in search_results: if fetched_count >= max_results: break title = result.get('title', 'No title') url = result.get('url', '') description = result.get('description', 'No description') response_parts.append(f"{fetched_count + 1}. {title}") response_parts.append(f" URL: {url}") response_parts.append(f" Description: {description}") # Fetch content from this URL if url: content = fetch_url_content(url) # Limit content per result max_content_per_result = CONTENT_LENGTH_LIMIT // max_results if len(content) > max_content_per_result: content = content[:max_content_per_result] + "... [Truncated]" response_parts.append(f" Content: {content}") fetched_count += 1 else: response_parts.append(" Content: No URL available") response_parts.append("") # Add spacing final_response = "\n".join(response_parts) # Final length check if len(final_response) > CONTENT_LENGTH_LIMIT: final_response = final_response[:CONTENT_LENGTH_LIMIT] + "... [Response truncated]" return final_response except Exception as e: error_msg = str(e) logger.error(f"Search operation failed: {error_msg}", exc_info=True) # Return more detailed error information for debugging return f"Error: Search operation failed - {error_msg}" def main(): """Main entry point for the FastMCP server.""" mcp.run(transport='stdio') if __name__ == "__main__": # Run FastMCP server main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/billallison/brsearch-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server