Skip to main content
Glama
web_fetcher.py18.8 kB
#!/usr/bin/env python """ Web Fetcher Module - Comprehensive web fetching with pagination support Integrates Jina AI Reader, Serper scrape, and traditional HTTP fetching All English implementation """ import httpx import asyncio import json import re import uuid import hashlib from typing import Dict, List, Any, Optional, Union from urllib.parse import urljoin, urlparse, quote from loguru import logger from bs4 import BeautifulSoup class WebFetcher: """Comprehensive web fetcher with multiple methods and pagination support""" def __init__(self, jina_api_key: Optional[str] = None, serper_api_key: Optional[str] = None): """Initialize web fetcher Args: jina_api_key: Jina API key (optional) serper_api_key: Serper API key (optional) """ self.jina_api_key = jina_api_key self.serper_api_key = serper_api_key self.timeout = 30 # Jina API endpoints self.jina_read_api = "https://r.jina.ai/" self.jina_search_api = "https://s.jina.ai/" self.jina_search_api_premium = "https://svip.jina.ai/" # Page content storage with TTL self.page_cache: Dict[str, Dict[str, Any]] = {} self.cache_ttl = 3600 # 1 hour TTL for page cache async def fetch_url(self, url: str, use_jina: bool = True, with_image_alt: bool = False, max_length: int = 50000, page_number: int = 1) -> Dict[str, Any]: """Fetch web content with intelligent pagination support Args: url: Target URL use_jina: Whether to prioritize Jina AI with_image_alt: Whether to include image alt text max_length: Maximum content length per page page_number: Specific page to retrieve (for paginated content) Returns: Dictionary containing content, pagination info, etc. """ try: # If requesting specific page, check cache first if page_number > 1: return await self._get_cached_page(url, page_number) # Try different fetching methods in priority order if use_jina: result = await self._fetch_with_jina(url, with_image_alt) if result and result.get('success'): return await self._process_content(url, result, max_length) # Fallback: Use Serper scrape if self.serper_api_key: result = await self._fetch_with_serper(url) if result and result.get('success'): return await self._process_content(url, result, max_length) # Last fallback: Traditional HTTP fetching result = await self._fetch_with_httpx(url) if result and result.get('success'): return await self._process_content(url, result, max_length) return { 'success': False, 'error': 'All fetching methods failed', 'url': url } except Exception as e: logger.error(f"❌ Web fetching failed: {e}") return { 'success': False, 'error': str(e), 'url': url } async def _fetch_with_jina(self, url: str, with_image_alt: bool = False) -> Dict[str, Any]: """Fetch using Jina AI Reader""" try: headers = { 'Accept': 'application/json', 'Content-Type': 'application/json', } if self.jina_api_key: headers['Authorization'] = f'Bearer {self.jina_api_key}' # Add optional headers extra_headers = {} if with_image_alt: extra_headers["X-With-Generated-Alt"] = "true" extra_headers["X-With-Links-Summary"] = "all" extra_headers["X-Retain-Images"] = "none" extra_headers["X-Respond-With"] = "markdown" headers.update(extra_headers) async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post(self.jina_read_api, headers=headers, json={"url": url}) response.raise_for_status() content = response.text if content and content.strip(): return { 'success': True, 'content': content, 'method': 'jina', 'format': 'markdown' } else: return {'success': False, 'error': 'Jina returned empty content'} except Exception as e: logger.error(f"Jina fetching failed: {e}") return {'success': False, 'error': str(e)} async def _fetch_with_serper(self, url: str) -> Dict[str, Any]: """Fetch using Serper scrape""" try: headers = { 'Content-Type': 'application/json', 'X-API-KEY': self.serper_api_key } payload = { 'url': url, 'includeMarkdown': True } async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( 'https://scrape.serper.dev', headers=headers, json=payload ) response.raise_for_status() data = response.json() content = data.get('markdown') or data.get('text', '') if content.strip(): return { 'success': True, 'content': content, 'method': 'serper', 'format': 'markdown' if data.get('markdown') else 'text', 'metadata': data.get('metadata', {}) } else: return {'success': False, 'error': 'Serper returned empty content'} except Exception as e: logger.error(f"Serper fetching failed: {e}") return {'success': False, 'error': str(e)} async def _fetch_with_httpx(self, url: str) -> Dict[str, Any]: """Fetch using traditional HTTP and convert to Markdown""" try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } async with httpx.AsyncClient(timeout=self.timeout, headers=headers) as client: response = await client.get(url) response.raise_for_status() content_type = response.headers.get('content-type', '').lower() if 'text/html' in content_type: # Parse HTML and convert to Markdown soup = BeautifulSoup(response.text, 'html.parser') # Remove scripts and styles for script in soup(["script", "style"]): script.decompose() # Extract main content content = self._html_to_markdown(soup) return { 'success': True, 'content': content, 'method': 'httpx', 'format': 'markdown', 'title': soup.find('title').get_text() if soup.find('title') else '' } else: # Return text content directly return { 'success': True, 'content': response.text, 'method': 'httpx', 'format': 'text' } except Exception as e: logger.error(f"HTTP fetching failed: {e}") return {'success': False, 'error': str(e)} def _html_to_markdown(self, soup: BeautifulSoup) -> str: """Convert HTML to simple Markdown format""" content_parts = [] # Extract title title = soup.find('title') if title: content_parts.append(f"# {title.get_text().strip()}\n") # Extract main content area main_content = (soup.find('main') or soup.find('article') or soup.find('div', class_=re.compile(r'content|main|article', re.I)) or soup.find('body')) if main_content: # Process headings for i, heading in enumerate(main_content.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])): level = int(heading.name[1]) text = heading.get_text().strip() if text: content_parts.append(f"{'#' * level} {text}\n") # Process paragraphs for p in main_content.find_all('p'): text = p.get_text().strip() if text: content_parts.append(f"{text}\n") # Process lists for ul in main_content.find_all('ul'): for li in ul.find_all('li'): text = li.get_text().strip() if text: content_parts.append(f"- {text}\n") # Process links for a in main_content.find_all('a', href=True): text = a.get_text().strip() href = a.get('href') if text and href: content_parts.append(f"[{text}]({href})\n") return '\n'.join(content_parts) if content_parts else soup.get_text() async def _process_content(self, url: str, result: Dict[str, Any], max_length: int) -> Dict[str, Any]: """Process fetching result with pagination functionality""" content = result.get('content', '') if len(content) <= max_length: # Content doesn't need pagination processed_result = { 'url': url, 'method': result.get('method', 'unknown'), 'format': result.get('format', 'text'), 'success': True, 'content': content, 'total_length': len(content), 'is_paginated': False, 'pages': 1, 'current_page': 1, 'page_info': f"Complete content ({len(content)} characters)" } else: # Needs pagination pages = self._split_content_into_pages(content, max_length) # Create unique page ID for this session session_id = str(uuid.uuid4())[:8] content_hash = hashlib.md5(content.encode()).hexdigest()[:8] page_id = f"{session_id}_{content_hash}_{len(pages)}" self.page_cache[page_id] = { 'pages': pages, 'metadata': result, 'url': url, 'created_at': asyncio.get_event_loop().time() } processed_result = { 'url': url, 'method': result.get('method', 'unknown'), 'format': result.get('format', 'text'), 'success': True, 'content': pages[0], # Return first page 'total_length': len(content), 'is_paginated': True, 'pages': len(pages), 'current_page': 1, 'page_id': page_id, 'page_info': f"Page 1 of {len(pages)} (total {len(content)} characters)", 'next_page_hint': f"To get next page, use: fetch_url(url='{url}', page_number=2) or get other pages by changing page_number parameter" } # Add metadata if 'metadata' in result: processed_result['metadata'] = result['metadata'] if 'title' in result: processed_result['title'] = result['title'] return processed_result async def _get_cached_page(self, url: str, page_number: int) -> Dict[str, Any]: """Get cached page content""" # Clean expired cache entries first self._cleanup_expired_cache() # Find matching cache entry for this URL for page_id, cached_data in self.page_cache.items(): if cached_data.get('url') == url: pages = cached_data['pages'] if page_number < 1 or page_number > len(pages): return { 'success': False, 'error': f'Invalid page number, should be between 1-{len(pages)}', 'url': url } page_content = pages[page_number - 1] return { 'success': True, 'url': url, 'content': page_content, 'current_page': page_number, 'total_pages': len(pages), 'page_info': f"Page {page_number} of {len(pages)}", 'is_paginated': True, 'page_id': page_id, 'metadata': cached_data.get('metadata', {}) } # If no cache found, need to re-fetch return { 'success': False, 'error': f'Page cache not found for URL: {url}. Please fetch the URL first.', 'url': url } def _split_content_into_pages(self, content: str, max_length: int) -> List[str]: """Intelligently split content into pages, preferring paragraph boundaries""" if len(content) <= max_length: return [content] pages = [] current_page = "" # Split by paragraphs paragraphs = content.split('\n\n') for paragraph in paragraphs: # If current paragraph is too long, need to force split if len(paragraph) > max_length: # Save current page first if current_page: pages.append(current_page.strip()) current_page = "" # Force split long paragraph for i in range(0, len(paragraph), max_length): chunk = paragraph[i:i + max_length] pages.append(chunk) continue # Check if adding current paragraph exceeds limit if len(current_page) + len(paragraph) + 2 > max_length: # +2 for \n\n if current_page: pages.append(current_page.strip()) current_page = paragraph else: if current_page: current_page += '\n\n' + paragraph else: current_page = paragraph # Add last page if current_page: pages.append(current_page.strip()) return pages def _cleanup_expired_cache(self): """Clean up expired cache entries""" current_time = asyncio.get_event_loop().time() expired_keys = [] for page_id, cached_data in self.page_cache.items(): if current_time - cached_data.get('created_at', 0) > self.cache_ttl: expired_keys.append(page_id) for key in expired_keys: del self.page_cache[key] if expired_keys: logger.info(f"Cleaned up {len(expired_keys)} expired cache entries") def clear_cache(self): """Clear page cache""" self.page_cache.clear() logger.info("Page cache cleared") async def search_jina(self, query: str, num_results: int = 10) -> List[Dict[str, Any]]: """Search using Jina AI""" try: # If have API key and requesting more than 10 results, use premium API if self.jina_api_key and num_results > 10: return await self._jina_premium_search(query, num_results) else: return await self._jina_basic_search(query, num_results) except Exception as e: error_msg = f"Jina search failed: {str(e)}" logger.error(error_msg) return [] async def _jina_basic_search(self, query: str, num_results: int = 10) -> List[Dict[str, Any]]: """Basic Jina search (free version)""" encoded_query = quote(query) jina_url = f"{self.jina_search_api}{encoded_query}" headers = { "Accept": "application/json" } async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get(jina_url, headers=headers) response.raise_for_status() data = response.json() # Convert to unified format if isinstance(data, list): return data[:num_results] elif isinstance(data, dict) and 'results' in data: return data['results'][:num_results] else: return [] async def _jina_premium_search(self, query: str, num_results: int = 30) -> List[Dict[str, Any]]: """Premium Jina search (requires API key)""" if not self.jina_api_key: raise Exception("Premium search requires Jina API key") headers = { 'Accept': 'application/json', 'Content-Type': 'application/json', 'Authorization': f'Bearer {self.jina_api_key}' } payload = { "q": query, "num": min(num_results, 100) # API limit } async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post(self.jina_search_api_premium, headers=headers, json=payload) response.raise_for_status() data = response.json() return data.get('results', [])

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sailaoda/search-fusion-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server