web_scrape.py•9.18 kB
"""
Web scraping capability implementation.
"""
import asyncio
import aiohttp
from typing import Dict, Any, Optional
from urllib.parse import urlparse
import logging
from bs4 import BeautifulSoup
import re
logger = logging.getLogger(__name__)
class WebScrapeCapability:
"""Web page scraping with content extraction."""
def __init__(self):
self.session: Optional[aiohttp.ClientSession] = None
async def _ensure_session(self):
"""Ensure HTTP session is available."""
if self.session is None or self.session.closed:
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
headers={'User-Agent': 'Mozilla/5.0 (compatible; KatamariMCP/1.0)'}
)
def _is_valid_url(self, url: str) -> bool:
"""Validate URL format."""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception:
return False
async def fetch_page(self, url: str) -> Dict[str, Any]:
"""Fetch web page content."""
await self._ensure_session()
if not self._is_valid_url(url):
raise ValueError(f"Invalid URL: {url}")
try:
async with self.session.get(url) as response:
if response.status != 200:
raise Exception(f"HTTP {response.status}: {response.reason}")
content_type = response.headers.get('content-type', '').lower()
if 'text/html' not in content_type:
raise Exception(f"Unsupported content type: {content_type}")
html = await response.text()
return {
'url': url,
'status_code': response.status,
'content_type': content_type,
'html': html,
'content_length': len(html)
}
except aiohttp.ClientError as e:
raise Exception(f"Network error: {str(e)}")
except Exception as e:
raise Exception(f"Fetch failed: {str(e)}")
def _extract_text_content(self, soup: BeautifulSoup) -> str:
"""Extract clean text content from HTML."""
# Remove script and style elements
for script in soup(["script", "style", "nav", "footer", "header"]):
script.decompose()
# Get main content areas
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=re.compile(r'content|main|body'))
if main_content:
text = main_content.get_text()
else:
text = soup.get_text()
# Clean up text
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
return text
def _extract_metadata(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]:
"""Extract page metadata."""
metadata = {'url': url}
# Title
title_tag = soup.find('title')
metadata['title'] = title_tag.get_text().strip() if title_tag else ''
# Meta description
desc_tag = soup.find('meta', attrs={'name': 'description'})
if desc_tag:
metadata['description'] = desc_tag.get('content', '').strip()
# Meta keywords
keywords_tag = soup.find('meta', attrs={'name': 'keywords'})
if keywords_tag:
metadata['keywords'] = keywords_tag.get('content', '').strip()
# Open Graph tags
og_tags = {}
for meta in soup.find_all('meta', property=re.compile(r'^og:')):
og_tags[meta.get('property', '')] = meta.get('content', '')
if og_tags:
metadata['open_graph'] = og_tags
# Headings
headings = {}
for level in ['h1', 'h2', 'h3']:
tags = soup.find_all(level)
headings[level] = [tag.get_text().strip() for tag in tags if tag.get_text().strip()]
if any(headings.values()):
metadata['headings'] = headings
# Links
links = []
for a_tag in soup.find_all('a', href=True):
href = a_tag.get('href')
text = a_tag.get_text().strip()
if href and text:
links.append({'url': href, 'text': text})
metadata['links'] = links[:20] # Limit to first 20 links
return metadata
def _convert_to_markdown(self, soup: BeautifulSoup, metadata: Dict[str, Any]) -> str:
"""Convert HTML content to markdown format."""
markdown_parts = []
# Title
if metadata.get('title'):
markdown_parts.append(f"# {metadata['title']}\n")
# Metadata
if metadata.get('description'):
markdown_parts.append(f"*{metadata['description']}*\n")
# Headings and content
current_section = []
for element in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'ul', 'ol', 'blockquote']):
if element.name.startswith('h'):
# Save previous section
if current_section:
markdown_parts.extend(current_section)
current_section = []
# Add heading
level = int(element.name[1])
heading_text = element.get_text().strip()
if heading_text:
current_section.append(f"{'#' * (level + 1)} {heading_text}\n")
elif element.name == 'p':
text = element.get_text().strip()
if text:
current_section.append(f"{text}\n")
elif element.name in ['ul', 'ol']:
items = []
for li in element.find_all('li', recursive=False):
item_text = li.get_text().strip()
if item_text:
prefix = '-' if element.name == 'ul' else '1.'
items.append(f"{prefix} {item_text}")
if items:
current_section.extend(items)
current_section.append('')
elif element.name == 'blockquote':
text = element.get_text().strip()
if text:
current_section.append(f"> {text}\n")
# Add final section
if current_section:
markdown_parts.extend(current_section)
# Add links section if available
if metadata.get('links'):
markdown_parts.append("\n## Links\n")
for link in metadata['links'][:10]:
markdown_parts.append(f"- [{link['text']}]({link['url']})")
return '\n'.join(markdown_parts)
async def scrape(self, url: str, format: str = 'markdown') -> Dict[str, Any]:
"""Scrape web page content."""
if not url or not url.strip():
raise ValueError("URL cannot be empty")
if format not in ['text', 'markdown']:
raise ValueError("Format must be 'text' or 'markdown'")
try:
# Fetch page
page_data = await self.fetch_page(url)
html = page_data['html']
# Parse HTML
soup = BeautifulSoup(html, 'html.parser')
# Extract metadata
metadata = self._extract_metadata(soup, url)
# Extract content
if format == 'text':
content = self._extract_text_content(soup)
else: # markdown
content = self._convert_to_markdown(soup, metadata)
return {
'url': url,
'title': metadata.get('title', ''),
'format': format,
'content': content,
'metadata': metadata,
'content_length': len(content),
'status': 'success'
}
except Exception as e:
logger.error(f"Scraping failed for {url}: {e}")
return {
'url': url,
'format': format,
'content': '',
'error': str(e),
'status': 'error'
}
async def close(self):
"""Close HTTP session."""
if self.session and not self.session.closed:
await self.session.close()
# Global instance
_web_scrape = WebScrapeCapability()
async def web_scrape(url: str, format: str = 'markdown') -> Dict[str, Any]:
"""Main web scraping function."""
return await _web_scrape.scrape(url, format)
async def close_web_scrape():
"""Cleanup function."""
if _web_scrape.session and not _web_scrape.session.closed:
await _web_scrape.session.close()