Skip to main content
Glama

MCP Web Research Agent

scraper.pyโ€ข16 kB
""" MCP Web Scraper - Adapted from original scraper for MCP compatibility """ import asyncio import requests import re import time import logging from urllib.parse import urljoin, urlparse, quote_plus from bs4 import BeautifulSoup import threading import json import csv import os from datetime import datetime logger = logging.getLogger("mcp-web-scraper") class MCPWebScraper: def __init__(self, db_manager, max_depth=3, delay=1): self.db_manager = db_manager self.max_depth = max_depth self.delay = delay self.visited_urls = set() self.keywords = [] self.is_running = False self.current_depth = 0 self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) # Disable SSL verification to ignore certificate errors self.session.verify = False # Suppress SSL warnings import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) async def scrape_url(self, url: str, keywords: list, extract_links: bool = False, max_depth: int = 1): """Scrape a single URL for keywords""" self.keywords = keywords self.visited_urls.clear() logger.info(f"Starting scrape of {url} for keywords: {keywords}") results = { "url": url, "keywords": keywords, "matches": [], "stats": {"pages_crawled": 0, "total_matches": 0} } try: if extract_links: # Use threading for async-like behavior in sync context await self._crawl_recursive_async(url, 0, max_depth, results) else: matches = await self._process_single_url(url) results["matches"].extend(matches) results["stats"]["pages_crawled"] = 1 results["stats"]["total_matches"] = len(matches) except Exception as e: logger.error(f"Scraping error: {e}") results["error"] = str(e) return results async def search_and_scrape(self, query: str, keywords: list, search_engine_url: str = "https://searx.gophernuttz.us/search/", max_results: int = 10): """Search for query and scrape results""" self.keywords = keywords self.visited_urls.clear() logger.info(f"Searching for '{query}' and scraping results") results = { "query": query, "keywords": keywords, "search_results": [], "stats": {"pages_crawled": 0, "total_matches": 0} } try: # Execute search search_urls = await self._execute_search(query, search_engine_url, max_results) # Process each search result for url in search_urls: if len(results["search_results"]) >= max_results: break matches = await self._process_single_url(url) search_result = { "url": url, "matches": matches, "match_count": len(matches) } results["search_results"].append(search_result) results["stats"]["pages_crawled"] += 1 results["stats"]["total_matches"] += len(matches) # Respect delay between requests await asyncio.sleep(self.delay) except Exception as e: logger.error(f"Search and scrape error: {e}") results["error"] = str(e) return results async def _execute_search(self, query: str, search_engine_url: str, max_results: int): """Execute search query and return result URLs""" try: # URL encode the query encoded_query = quote_plus(query) search_url = f"{search_engine_url}?q={encoded_query}" logger.info(f"Executing search: {search_url}") # Fetch search results page response = self.session.get(search_url, timeout=10) response.raise_for_status() # Parse search results soup = BeautifulSoup(response.content, 'html.parser') # Extract result URLs result_urls = [] selectors = [ 'a[href*="http"]', '.result a', '.url a', 'h4 a', '.result_header a', '.result-url a', 'a.result-url', '.search-result a', 'article a', '.mainline a', ] for selector in selectors: links = soup.select(selector) for link in links: href = link.get('href', '') if href and href.startswith(('http://', 'https://')): # Filter out search engine internal links if 'searx' not in href.lower() and search_engine_url not in href: result_urls.append(href) if len(result_urls) >= max_results: return result_urls # If no URLs found via selectors, try all external links if not result_urls: all_links = soup.find_all('a', href=True) for link in all_links: href = link.get('href', '') if href.startswith(('http://', 'https://')): if ('searx' not in href.lower() and search_engine_url not in href and 'about:blank' not in href and 'javascript:' not in href): result_urls.append(href) if len(result_urls) >= max_results: return result_urls return result_urls[:max_results] except Exception as e: logger.error(f"Search execution error: {e}") return [] async def _process_single_url(self, url: str): """Process a single URL for keyword matches""" if url in self.visited_urls: return [] self.visited_urls.add(url) try: # Fetch the page response = self.session.get(url, timeout=10) response.raise_for_status() # Parse HTML soup = BeautifulSoup(response.content, 'html.parser') # Extract page title and content title = soup.title.string if soup.title else "No title" content = self._extract_content(soup) # Search for keywords matches = self._search_keywords(content, url) # Store results in database if matches: url_id = self.db_manager.insert_url(url, title, content) for keyword, context in matches: keyword_id = self.db_manager.get_or_create_keyword(keyword) if url_id and keyword_id: self.db_manager.add_keyword_match(url_id, keyword_id, context) return matches except requests.RequestException as e: logger.warning(f"Failed to fetch {url}: {e}") except Exception as e: logger.error(f"Error processing {url}: {e}") return [] async def _crawl_recursive_async(self, url: str, depth: int, max_depth: int, results: dict): """Recursively crawl URLs (async wrapper)""" if depth > max_depth or url in self.visited_urls: return self.visited_urls.add(url) try: matches = await self._process_single_url(url) results["matches"].extend(matches) results["stats"]["pages_crawled"] += 1 results["stats"]["total_matches"] += len(matches) # Extract and follow internal links if depth < max_depth: links = self._extract_links(url) for link in links: await self._crawl_recursive_async(link, depth + 1, max_depth, results) except Exception as e: logger.error(f"Crawling error at {url}: {e}") def _extract_links(self, base_url: str): """Extract internal links from a URL""" try: response = self.session.get(base_url, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') base_domain = urlparse(base_url).netloc links = set() for link in soup.find_all('a', href=True): href = link['href'] if not href or href.startswith('#'): continue # Convert relative URLs to absolute absolute_url = urljoin(base_url, href) # Parse the URL parsed = urlparse(absolute_url) # Only follow internal links if parsed.netloc == base_domain: clean_url = parsed.scheme + "://" + parsed.netloc + parsed.path if parsed.query: clean_url += "?" + parsed.query links.add(clean_url) return list(links) except Exception as e: logger.error(f"Link extraction error: {e}") return [] def _extract_content(self, soup): """Extract meaningful content from the page""" # Remove script and style elements for script in soup(["script", "style"]): script.decompose() # Get text from main content areas content_areas = [] main_selectors = [ 'main', 'article', '.content', '#content', '.main', '#main', '.post', '.entry', '.article', '.blog-post' ] for selector in main_selectors: elements = soup.select(selector) for element in elements: content_areas.append(element.get_text(strip=True)) # If no specific content areas found, use body if not content_areas: body = soup.find('body') if body: content_areas.append(body.get_text(strip=True)) return ' '.join(content_areas) def _search_keywords(self, content, url): """Search for keywords in content""" matches = [] content_lower = content.lower() for keyword in self.keywords: if keyword in content_lower: # Find context around the keyword context = self._get_keyword_context(content, keyword) matches.append((keyword, context)) return matches def _get_keyword_context(self, content, keyword, context_length=100): """Get context around the keyword match""" try: # Find first occurrence index = content.lower().find(keyword.lower()) if index == -1: return "" # Extract context start = max(0, index - context_length) end = min(len(content), index + len(keyword) + context_length) context = content[start:end] # Add ellipsis if needed if start > 0: context = "..." + context if end < len(content): context = context + "..." return context except: return "" def export_results(self, format: str = "json", keyword_filter: str = None, output_path: str = None): """Export scraping results to various formats""" results = self.db_manager.get_scraping_results(keyword_filter=keyword_filter) if format == "json": if output_path: with open(output_path, 'w') as f: json.dump(results, f, indent=2) return {"status": "success", "file": output_path, "format": "json"} else: return results elif format == "markdown": markdown_content = self._generate_markdown(results, keyword_filter) if output_path: with open(output_path, 'w') as f: f.write(markdown_content) return {"status": "success", "file": output_path, "format": "markdown"} else: return {"content": markdown_content, "format": "markdown"} elif format == "csv": csv_content = self._generate_csv(results) if output_path: with open(output_path, 'w', newline='') as f: f.write(csv_content) return {"status": "success", "file": output_path, "format": "csv"} else: return {"content": csv_content, "format": "csv"} else: raise ValueError(f"Unsupported format: {format}") def _generate_markdown(self, results, keyword_filter): """Generate markdown report""" markdown = f"# Web Scraping Results\n\n" if keyword_filter: markdown += f"**Filter:** {keyword_filter}\n\n" markdown += f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" for result in results: markdown += f"## {result['title']}\n\n" markdown += f"**URL:** {result['url']}\n\n" markdown += f"**Keywords Found:** {', '.join(result['keywords'])} ({result['match_count']} matches)\n\n" for keyword, context in result['keyword_matches']: markdown += f"### {keyword}\n\n" markdown += f"{context}\n\n" markdown += "---\n\n" return markdown def _generate_csv(self, results): """Generate CSV report""" import io import csv output = io.StringIO() writer = csv.writer(output) # Write header writer.writerow(['URL', 'Title', 'Keywords', 'Match Count', 'Context']) # Write data for result in results: for keyword, context in result['keyword_matches']: writer.writerow([ result['url'], result['title'], keyword, result['match_count'], context.replace('\n', ' ').replace('\r', ' ') ]) return output.getvalue() def get_stats(self): """Get scraping statistics""" return { 'visited_urls': len(self.visited_urls), 'active_keywords': len(self.keywords), 'is_running': self.is_running, 'max_depth': self.max_depth, 'delay': self.delay } def get_config(self): """Get current configuration""" return { 'max_depth': self.max_depth, 'delay': self.delay, 'user_agent': self.session.headers.get('User-Agent'), 'ssl_verification': self.session.verify }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SnotacusNexus/mcp-web-research-agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server