"""
Web scraping tools for Percepta MCP server.
"""
import asyncio
import aiohttp
import json
import re
from typing import Dict, Any, Optional, List, Union
from urllib.parse import urljoin, urlparse, parse_qs
from bs4 import BeautifulSoup, Tag
import logging
from datetime import datetime, UTC
from ..config import Settings
logger = logging.getLogger(__name__)
class WebScraper:
"""Web scraping and data extraction tools."""
def __init__(self, settings: Settings):
self.settings = settings
self.session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create HTTP session."""
if not self.session or self.session.closed:
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
timeout = aiohttp.ClientTimeout(total=30)
self.session = aiohttp.ClientSession(headers=headers, timeout=timeout)
return self.session
async def scrape(self, url: str, selector: Optional[str] = None, extract_type: str = "text") -> Dict[str, Any]:
"""Scrape content from a web page."""
try:
logger.info(f"Scraping URL: {url}")
session = await self._get_session()
async with session.get(url) as response:
if response.status != 200:
return {
"success": False,
"error": f"HTTP {response.status}: {response.reason}",
"url": url
}
content = await response.text()
content_type = response.headers.get('content-type', '')
# Parse HTML content
soup = BeautifulSoup(content, 'html.parser')
# Extract data based on selector and type
extracted_data = await self._extract_data(soup, selector, extract_type, url)
return {
"success": True,
"url": url,
"title": soup.title.string if soup.title else None,
"content_type": content_type,
"data": extracted_data,
"timestamp": datetime.now(UTC).isoformat()
}
except Exception as e:
logger.error(f"Scraping error for {url}: {e}")
return {
"success": False,
"error": str(e),
"url": url
}
async def _extract_data(self, soup: BeautifulSoup, selector: Optional[str], extract_type: str, base_url: str) -> Any:
"""Extract data from parsed HTML."""
try:
if selector:
elements = soup.select(selector)
if not elements:
return None
if extract_type == "text":
return [elem.get_text(strip=True) for elem in elements]
elif extract_type == "html":
return [str(elem) for elem in elements]
elif extract_type == "attributes":
return [dict(elem.attrs) for elem in elements]
elif extract_type == "links":
links = []
for elem in elements:
if elem.name == 'a' and elem.get('href'):
links.append({
'url': urljoin(base_url, elem['href']),
'text': elem.get_text(strip=True),
'title': elem.get('title')
})
return links
else:
return [elem.get_text(strip=True) for elem in elements]
else:
# Extract all content based on type
if extract_type == "text":
return soup.get_text(strip=True)
elif extract_type == "html":
return str(soup)
elif extract_type == "links":
return await self._extract_all_links(soup, base_url)
elif extract_type == "images":
return await self._extract_images(soup, base_url)
elif extract_type == "metadata":
return await self._extract_metadata(soup)
elif extract_type == "structured":
return await self._extract_structured_data(soup)
else:
return soup.get_text(strip=True)
except Exception as e:
logger.error(f"Data extraction error: {e}")
return None
async def _extract_all_links(self, soup: BeautifulSoup, base_url: str) -> List[Dict[str, Any]]:
"""Extract all links from the page."""
links = []
for a_tag in soup.find_all('a', href=True):
link_url = urljoin(base_url, a_tag['href'])
links.append({
'url': link_url,
'text': a_tag.get_text(strip=True),
'title': a_tag.get('title'),
'internal': urlparse(link_url).netloc == urlparse(base_url).netloc
})
return links
async def _extract_images(self, soup: BeautifulSoup, base_url: str) -> List[Dict[str, Any]]:
"""Extract all images from the page."""
images = []
for img_tag in soup.find_all('img'):
src = img_tag.get('src')
if src:
images.append({
'url': urljoin(base_url, src),
'alt': img_tag.get('alt'),
'title': img_tag.get('title'),
'width': img_tag.get('width'),
'height': img_tag.get('height')
})
return images
async def _extract_metadata(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract page metadata."""
metadata = {}
# Basic metadata
if soup.title:
metadata['title'] = soup.title.string
# Meta tags
meta_tags = {}
for meta in soup.find_all('meta'):
name = meta.get('name') or meta.get('property')
content = meta.get('content')
if name and content:
meta_tags[name] = content
metadata['meta_tags'] = meta_tags
# Open Graph data
og_data = {}
for meta in soup.find_all('meta', property=re.compile(r'^og:')):
property_name = meta.get('property')
content = meta.get('content')
if property_name and content:
og_data[property_name] = content
metadata['open_graph'] = og_data
# Twitter Card data
twitter_data = {}
for meta in soup.find_all('meta', attrs={'name': re.compile(r'^twitter:')}):
name = meta.get('name')
content = meta.get('content')
if name and content:
twitter_data[name] = content
metadata['twitter_card'] = twitter_data
return metadata
async def _extract_structured_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract structured data (JSON-LD, microdata, etc.)."""
structured_data = {}
# JSON-LD
json_ld_scripts = soup.find_all('script', type='application/ld+json')
json_ld_data = []
for script in json_ld_scripts:
try:
data = json.loads(script.string)
json_ld_data.append(data)
except (json.JSONDecodeError, TypeError):
continue
structured_data['json_ld'] = json_ld_data
# Microdata (simplified extraction)
microdata = []
for elem in soup.find_all(attrs={'itemscope': True}):
item = {'type': elem.get('itemtype')}
properties = {}
for prop_elem in elem.find_all(attrs={'itemprop': True}):
prop_name = prop_elem.get('itemprop')
prop_value = prop_elem.get('content') or prop_elem.get_text(strip=True)
properties[prop_name] = prop_value
item['properties'] = properties
microdata.append(item)
structured_data['microdata'] = microdata
return structured_data
async def crawl_sitemap(self, url: str, max_urls: int = 100) -> Dict[str, Any]:
"""Crawl a sitemap XML file."""
try:
logger.info(f"Crawling sitemap: {url}")
session = await self._get_session()
async with session.get(url) as response:
if response.status != 200:
return {
"success": False,
"error": f"HTTP {response.status}: {response.reason}",
"url": url
}
content = await response.text()
soup = BeautifulSoup(content, 'xml')
urls = []
for loc in soup.find_all('loc'):
if len(urls) >= max_urls:
break
url_data = {'url': loc.get_text(strip=True)}
# Try to find associated metadata
parent = loc.parent
if parent:
lastmod = parent.find('lastmod')
if lastmod:
url_data['lastmod'] = lastmod.get_text(strip=True)
changefreq = parent.find('changefreq')
if changefreq:
url_data['changefreq'] = changefreq.get_text(strip=True)
priority = parent.find('priority')
if priority:
url_data['priority'] = priority.get_text(strip=True)
urls.append(url_data)
return {
"success": True,
"sitemap_url": url,
"urls": urls,
"total_found": len(urls),
"timestamp": datetime.now(UTC).isoformat()
}
except Exception as e:
logger.error(f"Sitemap crawling error: {e}")
return {
"success": False,
"error": str(e),
"url": url
}
async def extract_forms(self, url: str) -> Dict[str, Any]:
"""Extract all forms from a web page."""
try:
logger.info(f"Extracting forms from: {url}")
session = await self._get_session()
async with session.get(url) as response:
if response.status != 200:
return {
"success": False,
"error": f"HTTP {response.status}: {response.reason}",
"url": url
}
content = await response.text()
soup = BeautifulSoup(content, 'html.parser')
forms = []
for form in soup.find_all('form'):
form_data = {
'action': form.get('action', ''),
'method': form.get('method', 'GET').upper(),
'enctype': form.get('enctype', ''),
'id': form.get('id'),
'class': form.get('class'),
'fields': []
}
# Extract form fields
for field in form.find_all(['input', 'select', 'textarea']):
field_data = {
'tag': field.name,
'type': field.get('type', ''),
'name': field.get('name'),
'id': field.get('id'),
'placeholder': field.get('placeholder'),
'required': field.has_attr('required'),
'value': field.get('value', '')
}
if field.name == 'select':
options = []
for option in field.find_all('option'):
options.append({
'value': option.get('value', ''),
'text': option.get_text(strip=True),
'selected': option.has_attr('selected')
})
field_data['options'] = options
form_data['fields'].append(field_data)
forms.append(form_data)
return {
"success": True,
"url": url,
"forms": forms,
"total_forms": len(forms),
"timestamp": datetime.now(UTC).isoformat()
}
except Exception as e:
logger.error(f"Form extraction error: {e}")
return {
"success": False,
"error": str(e),
"url": url
}
async def close(self) -> None:
"""Close HTTP session."""
if self.session and not self.session.closed:
await self.session.close()
self.session = None