scraper.pyโข16 kB
"""
MCP Web Scraper - Adapted from original scraper for MCP compatibility
"""
import asyncio
import requests
import re
import time
import logging
from urllib.parse import urljoin, urlparse, quote_plus
from bs4 import BeautifulSoup
import threading
import json
import csv
import os
from datetime import datetime
logger = logging.getLogger("mcp-web-scraper")
class MCPWebScraper:
def __init__(self, db_manager, max_depth=3, delay=1):
self.db_manager = db_manager
self.max_depth = max_depth
self.delay = delay
self.visited_urls = set()
self.keywords = []
self.is_running = False
self.current_depth = 0
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
# Disable SSL verification to ignore certificate errors
self.session.verify = False
# Suppress SSL warnings
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
async def scrape_url(self, url: str, keywords: list, extract_links: bool = False, max_depth: int = 1):
"""Scrape a single URL for keywords"""
self.keywords = keywords
self.visited_urls.clear()
logger.info(f"Starting scrape of {url} for keywords: {keywords}")
results = {
"url": url,
"keywords": keywords,
"matches": [],
"stats": {"pages_crawled": 0, "total_matches": 0}
}
try:
if extract_links:
# Use threading for async-like behavior in sync context
await self._crawl_recursive_async(url, 0, max_depth, results)
else:
matches = await self._process_single_url(url)
results["matches"].extend(matches)
results["stats"]["pages_crawled"] = 1
results["stats"]["total_matches"] = len(matches)
except Exception as e:
logger.error(f"Scraping error: {e}")
results["error"] = str(e)
return results
async def search_and_scrape(self, query: str, keywords: list, search_engine_url: str = "https://searx.gophernuttz.us/search/", max_results: int = 10):
"""Search for query and scrape results"""
self.keywords = keywords
self.visited_urls.clear()
logger.info(f"Searching for '{query}' and scraping results")
results = {
"query": query,
"keywords": keywords,
"search_results": [],
"stats": {"pages_crawled": 0, "total_matches": 0}
}
try:
# Execute search
search_urls = await self._execute_search(query, search_engine_url, max_results)
# Process each search result
for url in search_urls:
if len(results["search_results"]) >= max_results:
break
matches = await self._process_single_url(url)
search_result = {
"url": url,
"matches": matches,
"match_count": len(matches)
}
results["search_results"].append(search_result)
results["stats"]["pages_crawled"] += 1
results["stats"]["total_matches"] += len(matches)
# Respect delay between requests
await asyncio.sleep(self.delay)
except Exception as e:
logger.error(f"Search and scrape error: {e}")
results["error"] = str(e)
return results
async def _execute_search(self, query: str, search_engine_url: str, max_results: int):
"""Execute search query and return result URLs"""
try:
# URL encode the query
encoded_query = quote_plus(query)
search_url = f"{search_engine_url}?q={encoded_query}"
logger.info(f"Executing search: {search_url}")
# Fetch search results page
response = self.session.get(search_url, timeout=10)
response.raise_for_status()
# Parse search results
soup = BeautifulSoup(response.content, 'html.parser')
# Extract result URLs
result_urls = []
selectors = [
'a[href*="http"]',
'.result a',
'.url a',
'h4 a',
'.result_header a',
'.result-url a',
'a.result-url',
'.search-result a',
'article a',
'.mainline a',
]
for selector in selectors:
links = soup.select(selector)
for link in links:
href = link.get('href', '')
if href and href.startswith(('http://', 'https://')):
# Filter out search engine internal links
if 'searx' not in href.lower() and search_engine_url not in href:
result_urls.append(href)
if len(result_urls) >= max_results:
return result_urls
# If no URLs found via selectors, try all external links
if not result_urls:
all_links = soup.find_all('a', href=True)
for link in all_links:
href = link.get('href', '')
if href.startswith(('http://', 'https://')):
if ('searx' not in href.lower() and
search_engine_url not in href and
'about:blank' not in href and
'javascript:' not in href):
result_urls.append(href)
if len(result_urls) >= max_results:
return result_urls
return result_urls[:max_results]
except Exception as e:
logger.error(f"Search execution error: {e}")
return []
async def _process_single_url(self, url: str):
"""Process a single URL for keyword matches"""
if url in self.visited_urls:
return []
self.visited_urls.add(url)
try:
# Fetch the page
response = self.session.get(url, timeout=10)
response.raise_for_status()
# Parse HTML
soup = BeautifulSoup(response.content, 'html.parser')
# Extract page title and content
title = soup.title.string if soup.title else "No title"
content = self._extract_content(soup)
# Search for keywords
matches = self._search_keywords(content, url)
# Store results in database
if matches:
url_id = self.db_manager.insert_url(url, title, content)
for keyword, context in matches:
keyword_id = self.db_manager.get_or_create_keyword(keyword)
if url_id and keyword_id:
self.db_manager.add_keyword_match(url_id, keyword_id, context)
return matches
except requests.RequestException as e:
logger.warning(f"Failed to fetch {url}: {e}")
except Exception as e:
logger.error(f"Error processing {url}: {e}")
return []
async def _crawl_recursive_async(self, url: str, depth: int, max_depth: int, results: dict):
"""Recursively crawl URLs (async wrapper)"""
if depth > max_depth or url in self.visited_urls:
return
self.visited_urls.add(url)
try:
matches = await self._process_single_url(url)
results["matches"].extend(matches)
results["stats"]["pages_crawled"] += 1
results["stats"]["total_matches"] += len(matches)
# Extract and follow internal links
if depth < max_depth:
links = self._extract_links(url)
for link in links:
await self._crawl_recursive_async(link, depth + 1, max_depth, results)
except Exception as e:
logger.error(f"Crawling error at {url}: {e}")
def _extract_links(self, base_url: str):
"""Extract internal links from a URL"""
try:
response = self.session.get(base_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
base_domain = urlparse(base_url).netloc
links = set()
for link in soup.find_all('a', href=True):
href = link['href']
if not href or href.startswith('#'):
continue
# Convert relative URLs to absolute
absolute_url = urljoin(base_url, href)
# Parse the URL
parsed = urlparse(absolute_url)
# Only follow internal links
if parsed.netloc == base_domain:
clean_url = parsed.scheme + "://" + parsed.netloc + parsed.path
if parsed.query:
clean_url += "?" + parsed.query
links.add(clean_url)
return list(links)
except Exception as e:
logger.error(f"Link extraction error: {e}")
return []
def _extract_content(self, soup):
"""Extract meaningful content from the page"""
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Get text from main content areas
content_areas = []
main_selectors = [
'main', 'article', '.content', '#content', '.main', '#main',
'.post', '.entry', '.article', '.blog-post'
]
for selector in main_selectors:
elements = soup.select(selector)
for element in elements:
content_areas.append(element.get_text(strip=True))
# If no specific content areas found, use body
if not content_areas:
body = soup.find('body')
if body:
content_areas.append(body.get_text(strip=True))
return ' '.join(content_areas)
def _search_keywords(self, content, url):
"""Search for keywords in content"""
matches = []
content_lower = content.lower()
for keyword in self.keywords:
if keyword in content_lower:
# Find context around the keyword
context = self._get_keyword_context(content, keyword)
matches.append((keyword, context))
return matches
def _get_keyword_context(self, content, keyword, context_length=100):
"""Get context around the keyword match"""
try:
# Find first occurrence
index = content.lower().find(keyword.lower())
if index == -1:
return ""
# Extract context
start = max(0, index - context_length)
end = min(len(content), index + len(keyword) + context_length)
context = content[start:end]
# Add ellipsis if needed
if start > 0:
context = "..." + context
if end < len(content):
context = context + "..."
return context
except:
return ""
def export_results(self, format: str = "json", keyword_filter: str = None, output_path: str = None):
"""Export scraping results to various formats"""
results = self.db_manager.get_scraping_results(keyword_filter=keyword_filter)
if format == "json":
if output_path:
with open(output_path, 'w') as f:
json.dump(results, f, indent=2)
return {"status": "success", "file": output_path, "format": "json"}
else:
return results
elif format == "markdown":
markdown_content = self._generate_markdown(results, keyword_filter)
if output_path:
with open(output_path, 'w') as f:
f.write(markdown_content)
return {"status": "success", "file": output_path, "format": "markdown"}
else:
return {"content": markdown_content, "format": "markdown"}
elif format == "csv":
csv_content = self._generate_csv(results)
if output_path:
with open(output_path, 'w', newline='') as f:
f.write(csv_content)
return {"status": "success", "file": output_path, "format": "csv"}
else:
return {"content": csv_content, "format": "csv"}
else:
raise ValueError(f"Unsupported format: {format}")
def _generate_markdown(self, results, keyword_filter):
"""Generate markdown report"""
markdown = f"# Web Scraping Results\n\n"
if keyword_filter:
markdown += f"**Filter:** {keyword_filter}\n\n"
markdown += f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
for result in results:
markdown += f"## {result['title']}\n\n"
markdown += f"**URL:** {result['url']}\n\n"
markdown += f"**Keywords Found:** {', '.join(result['keywords'])} ({result['match_count']} matches)\n\n"
for keyword, context in result['keyword_matches']:
markdown += f"### {keyword}\n\n"
markdown += f"{context}\n\n"
markdown += "---\n\n"
return markdown
def _generate_csv(self, results):
"""Generate CSV report"""
import io
import csv
output = io.StringIO()
writer = csv.writer(output)
# Write header
writer.writerow(['URL', 'Title', 'Keywords', 'Match Count', 'Context'])
# Write data
for result in results:
for keyword, context in result['keyword_matches']:
writer.writerow([
result['url'],
result['title'],
keyword,
result['match_count'],
context.replace('\n', ' ').replace('\r', ' ')
])
return output.getvalue()
def get_stats(self):
"""Get scraping statistics"""
return {
'visited_urls': len(self.visited_urls),
'active_keywords': len(self.keywords),
'is_running': self.is_running,
'max_depth': self.max_depth,
'delay': self.delay
}
def get_config(self):
"""Get current configuration"""
return {
'max_depth': self.max_depth,
'delay': self.delay,
'user_agent': self.session.headers.get('User-Agent'),
'ssl_verification': self.session.verify
}