Skip to main content
Glama

Katamari MCP Server

by ciphernaut
web_scrape.py9.18 kB
""" Web scraping capability implementation. """ import asyncio import aiohttp from typing import Dict, Any, Optional from urllib.parse import urlparse import logging from bs4 import BeautifulSoup import re logger = logging.getLogger(__name__) class WebScrapeCapability: """Web page scraping with content extraction.""" def __init__(self): self.session: Optional[aiohttp.ClientSession] = None async def _ensure_session(self): """Ensure HTTP session is available.""" if self.session is None or self.session.closed: self.session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=30), headers={'User-Agent': 'Mozilla/5.0 (compatible; KatamariMCP/1.0)'} ) def _is_valid_url(self, url: str) -> bool: """Validate URL format.""" try: result = urlparse(url) return all([result.scheme, result.netloc]) except Exception: return False async def fetch_page(self, url: str) -> Dict[str, Any]: """Fetch web page content.""" await self._ensure_session() if not self._is_valid_url(url): raise ValueError(f"Invalid URL: {url}") try: async with self.session.get(url) as response: if response.status != 200: raise Exception(f"HTTP {response.status}: {response.reason}") content_type = response.headers.get('content-type', '').lower() if 'text/html' not in content_type: raise Exception(f"Unsupported content type: {content_type}") html = await response.text() return { 'url': url, 'status_code': response.status, 'content_type': content_type, 'html': html, 'content_length': len(html) } except aiohttp.ClientError as e: raise Exception(f"Network error: {str(e)}") except Exception as e: raise Exception(f"Fetch failed: {str(e)}") def _extract_text_content(self, soup: BeautifulSoup) -> str: """Extract clean text content from HTML.""" # Remove script and style elements for script in soup(["script", "style", "nav", "footer", "header"]): script.decompose() # Get main content areas main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=re.compile(r'content|main|body')) if main_content: text = main_content.get_text() else: text = soup.get_text() # Clean up text lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = ' '.join(chunk for chunk in chunks if chunk) return text def _extract_metadata(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]: """Extract page metadata.""" metadata = {'url': url} # Title title_tag = soup.find('title') metadata['title'] = title_tag.get_text().strip() if title_tag else '' # Meta description desc_tag = soup.find('meta', attrs={'name': 'description'}) if desc_tag: metadata['description'] = desc_tag.get('content', '').strip() # Meta keywords keywords_tag = soup.find('meta', attrs={'name': 'keywords'}) if keywords_tag: metadata['keywords'] = keywords_tag.get('content', '').strip() # Open Graph tags og_tags = {} for meta in soup.find_all('meta', property=re.compile(r'^og:')): og_tags[meta.get('property', '')] = meta.get('content', '') if og_tags: metadata['open_graph'] = og_tags # Headings headings = {} for level in ['h1', 'h2', 'h3']: tags = soup.find_all(level) headings[level] = [tag.get_text().strip() for tag in tags if tag.get_text().strip()] if any(headings.values()): metadata['headings'] = headings # Links links = [] for a_tag in soup.find_all('a', href=True): href = a_tag.get('href') text = a_tag.get_text().strip() if href and text: links.append({'url': href, 'text': text}) metadata['links'] = links[:20] # Limit to first 20 links return metadata def _convert_to_markdown(self, soup: BeautifulSoup, metadata: Dict[str, Any]) -> str: """Convert HTML content to markdown format.""" markdown_parts = [] # Title if metadata.get('title'): markdown_parts.append(f"# {metadata['title']}\n") # Metadata if metadata.get('description'): markdown_parts.append(f"*{metadata['description']}*\n") # Headings and content current_section = [] for element in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'ul', 'ol', 'blockquote']): if element.name.startswith('h'): # Save previous section if current_section: markdown_parts.extend(current_section) current_section = [] # Add heading level = int(element.name[1]) heading_text = element.get_text().strip() if heading_text: current_section.append(f"{'#' * (level + 1)} {heading_text}\n") elif element.name == 'p': text = element.get_text().strip() if text: current_section.append(f"{text}\n") elif element.name in ['ul', 'ol']: items = [] for li in element.find_all('li', recursive=False): item_text = li.get_text().strip() if item_text: prefix = '-' if element.name == 'ul' else '1.' items.append(f"{prefix} {item_text}") if items: current_section.extend(items) current_section.append('') elif element.name == 'blockquote': text = element.get_text().strip() if text: current_section.append(f"> {text}\n") # Add final section if current_section: markdown_parts.extend(current_section) # Add links section if available if metadata.get('links'): markdown_parts.append("\n## Links\n") for link in metadata['links'][:10]: markdown_parts.append(f"- [{link['text']}]({link['url']})") return '\n'.join(markdown_parts) async def scrape(self, url: str, format: str = 'markdown') -> Dict[str, Any]: """Scrape web page content.""" if not url or not url.strip(): raise ValueError("URL cannot be empty") if format not in ['text', 'markdown']: raise ValueError("Format must be 'text' or 'markdown'") try: # Fetch page page_data = await self.fetch_page(url) html = page_data['html'] # Parse HTML soup = BeautifulSoup(html, 'html.parser') # Extract metadata metadata = self._extract_metadata(soup, url) # Extract content if format == 'text': content = self._extract_text_content(soup) else: # markdown content = self._convert_to_markdown(soup, metadata) return { 'url': url, 'title': metadata.get('title', ''), 'format': format, 'content': content, 'metadata': metadata, 'content_length': len(content), 'status': 'success' } except Exception as e: logger.error(f"Scraping failed for {url}: {e}") return { 'url': url, 'format': format, 'content': '', 'error': str(e), 'status': 'error' } async def close(self): """Close HTTP session.""" if self.session and not self.session.closed: await self.session.close() # Global instance _web_scrape = WebScrapeCapability() async def web_scrape(url: str, format: str = 'markdown') -> Dict[str, Any]: """Main web scraping function.""" return await _web_scrape.scrape(url, format) async def close_web_scrape(): """Cleanup function.""" if _web_scrape.session and not _web_scrape.session.closed: await _web_scrape.session.close()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server