Skip to main content
Glama

search_url

Retrieve plain-text content from any web page URL with robust error handling. Specify a timeout to ensure efficient processing of web page data.

Instructions

Fetch and return plain-text content from a web page URL. Handles various content types and provides comprehensive error handling.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
timeoutNoOptional timeout in seconds (default: 10)
urlYesThe URL to fetch content from. Must be a valid HTTP/HTTPS URL.

Implementation Reference

  • The main handler function for the 'search_url' tool. It validates the URL, extracts text content using TextExtractor, formats the response with metadata, and handles errors.
    @self.server.call_tool() async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]: """Handle tool calls.""" if name != "search_url": raise ValueError(f"Unknown tool: {name}") url = arguments.get("url") timeout = arguments.get("timeout", self.config.default_timeout) if not url: return [TextContent( type="text", text="Error: URL parameter is required" )] try: # Validate URL validation_result = self.url_validator.validate(url) if not validation_result.is_valid: return [TextContent( type="text", text=f"Error: Invalid URL - {validation_result.error_message}" )] # Get extractor and extract text content extractor = await self._get_extractor() result = await extractor.extract_text( url, timeout=timeout, user_agent=self.config.user_agent ) if result.success: # Prepare response with metadata response_text = f"Content from: {url}\n" response_text += f"Content-Type: {result.content_type}\n" response_text += f"Status Code: {result.status_code}\n" if result.title: response_text += f"Title: {result.title}\n" response_text += "\n--- Content ---\n" response_text += result.text_content return [TextContent( type="text", text=response_text )] else: return [TextContent( type="text", text=f"Error fetching content from {url}: {result.error_message}" )] except Exception as e: logger.exception(f"Unexpected error processing URL {url}") return [TextContent( type="text", text=f"Unexpected error: {str(e)}" )]
  • The input schema defining the parameters for the 'search_url' tool: required 'url' string and optional 'timeout' number.
    inputSchema={ "type": "object", "properties": { "url": { "type": "string", "description": "The URL to fetch content from. Must be a valid HTTP/HTTPS URL." }, "timeout": { "type": "number", "description": "Optional timeout in seconds (default: 10)", "default": 10, "minimum": 1, "maximum": 60 } }, "required": ["url"] }
  • Registration of the 'search_url' tool in the list_tools handler, including name, description, and schema.
    Tool( name="search_url", description="Fetch and return plain-text content from a web page URL. " "Handles various content types and provides comprehensive error handling.", inputSchema={ "type": "object", "properties": { "url": { "type": "string", "description": "The URL to fetch content from. Must be a valid HTTP/HTTPS URL." }, "timeout": { "type": "number", "description": "Optional timeout in seconds (default: 10)", "default": 10, "minimum": 1, "maximum": 60 } }, "required": ["url"] } ) ]
  • Core helper function that performs the HTTP request to fetch the URL content using aiohttp, handles rate limiting, status codes, content types, and extracts readable text using trafilatura or BeautifulSoup.
    async def extract_text(self, url: str, timeout: int = 10, user_agent: str = None) -> ExtractionResult: """ Extract text content from a URL. Args: url: The URL to fetch content from timeout: Request timeout in seconds user_agent: User agent string for the request Returns: ExtractionResult with extracted content or error details """ # Apply rate limiting if not self._check_rate_limit(): return ExtractionResult( success=False, error_message="Rate limit exceeded. Please try again later.", url=url ) # Record request time for rate limiting self.request_times.append(time.time()) try: # Ensure session exists await self._ensure_session(timeout) headers = { 'User-Agent': user_agent or 'MCP-URL-Search-Server/1.0.0', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,text/plain;q=0.8,*/*;q=0.5', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } async with self.session.get(url, headers=headers) as response: status_code = response.status content_type = response.headers.get('content-type', '').lower() # Check status code if status_code == 404: return ExtractionResult( success=False, error_message="Page not found (404)", status_code=status_code, url=url ) elif status_code == 403: return ExtractionResult( success=False, error_message="Access forbidden (403)", status_code=status_code, url=url ) elif status_code == 429: return ExtractionResult( success=False, error_message="Too many requests (429). Please try again later.", status_code=status_code, url=url ) elif status_code >= 400: return ExtractionResult( success=False, error_message=f"HTTP error {status_code}", status_code=status_code, url=url ) # Check content type if not self._is_supported_content_type(content_type): return ExtractionResult( success=False, error_message=f"Unsupported content type: {content_type}", content_type=content_type, status_code=status_code, url=url ) # Check content length content_length = response.headers.get('content-length') if content_length and int(content_length) > 10 * 1024 * 1024: # 10MB return ExtractionResult( success=False, error_message="Content too large (>10MB)", content_type=content_type, status_code=status_code, url=url ) # Read content try: content = await response.text() except UnicodeDecodeError as e: return ExtractionResult( success=False, error_message=f"Failed to decode content: {str(e)}", content_type=content_type, status_code=status_code, url=url ) # Extract text content extracted_text, title = self._extract_text_content(content, content_type) if not extracted_text: return ExtractionResult( success=False, error_message="No readable text content found", content_type=content_type, status_code=status_code, url=url ) return ExtractionResult( success=True, text_content=extracted_text, title=title, content_type=content_type, status_code=status_code, url=url ) except asyncio.TimeoutError: return ExtractionResult( success=False, error_message=f"Request timed out after {timeout} seconds", url=url ) except aiohttp.ClientError as e: return ExtractionResult( success=False, error_message=f"Network error: {str(e)}", url=url ) except Exception as e: logger.exception(f"Unexpected error extracting text from {url}") return ExtractionResult( success=False, error_message=f"Unexpected error: {str(e)}", url=url )
  • Helper function for URL validation, checking format, scheme (only HTTP/HTTPS), blocked domains/IPs, and length.
    def validate(self, url: str) -> ValidationResult: """ Validate a URL for safety and accessibility. Args: url: The URL to validate Returns: ValidationResult with validation status and details """ if not url: return ValidationResult( is_valid=False, error_message="URL cannot be empty" ) # Basic format validation if not isinstance(url, str): return ValidationResult( is_valid=False, error_message="URL must be a string" ) # Normalize URL (add protocol if missing) normalized_url = self._normalize_url(url) # Use validators library for basic validation if not validators.url(normalized_url): return ValidationResult( is_valid=False, error_message="Invalid URL format" ) # Parse URL for detailed validation try: parsed = urlparse(normalized_url) except Exception as e: return ValidationResult( is_valid=False, error_message=f"Failed to parse URL: {str(e)}" ) # Check scheme if parsed.scheme.lower() not in {'http', 'https'}: if parsed.scheme.lower() in self.blocked_schemes: return ValidationResult( is_valid=False, error_message=f"Blocked scheme: {parsed.scheme}" ) else: return ValidationResult( is_valid=False, error_message=f"Unsupported scheme: {parsed.scheme}. Only HTTP and HTTPS are allowed." ) # Check for blocked domains hostname = parsed.hostname if hostname: hostname_lower = hostname.lower() # Check blocked domains if hostname_lower in self.blocked_domains: return ValidationResult( is_valid=False, error_message=f"Access to {hostname} is not allowed" ) # Check blocked IP ranges for pattern in self.blocked_ip_patterns: if re.match(pattern, hostname_lower): return ValidationResult( is_valid=False, error_message=f"Access to private IP ranges is not allowed" ) # Check URL length if len(normalized_url) > 2048: return ValidationResult( is_valid=False, error_message="URL is too long (max 2048 characters)" ) return ValidationResult( is_valid=True, normalized_url=normalized_url )

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/crybo-rybo/websurfer-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server