Skip to main content
Glama

Scrapy MCP Server

by ThreeFish-AI
advanced_features.py26.9 kB
"""Advanced scraping features including anti-detection and form handling.""" import asyncio import random from typing import Dict, Any, Optional from urllib.parse import urljoin import logging from selenium import webdriver from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait, Select from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.action_chains import ActionChains from selenium.common.exceptions import TimeoutException, NoSuchElementException from fake_useragent import UserAgent import undetected_chromedriver as uc from playwright.async_api import async_playwright from .config import settings logger = logging.getLogger(__name__) class AntiDetectionScraper: """Advanced scraper with anti-detection capabilities.""" def __init__(self) -> None: self.ua = UserAgent() self.driver = None self.page = None self.browser = None self.context = None self.playwright = None async def _get_undetected_chrome_driver(self) -> webdriver.Chrome: """Get undetected Chrome driver for anti-bot detection.""" options = ChromeOptions() if settings.browser_headless: options.add_argument("--headless") # Anti-detection options options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.add_argument("--disable-extensions") options.add_argument("--disable-plugins") options.add_argument("--disable-images") options.add_argument("--disable-javascript") options.add_argument("--window-size=1920,1080") options.add_argument("--start-maximized") # Randomize user agent if settings.use_random_user_agent: options.add_argument(f"--user-agent={self.ua.random}") # Additional stealth options options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option("useAutomationExtension", False) options.add_argument("--disable-blink-features=AutomationControlled") # Proxy support if settings.use_proxy and settings.proxy_url: options.add_argument(f"--proxy-server={settings.proxy_url}") # Use undetected-chromedriver driver = uc.Chrome(options=options) # Execute stealth scripts driver.execute_script( "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" ) return driver async def _setup_playwright_browser(self) -> None: """Setup Playwright browser for advanced scraping.""" self.playwright = await async_playwright().start() # Launch browser with stealth settings self.browser = await self.playwright.chromium.launch( headless=settings.browser_headless, args=[ "--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu", "--disable-extensions", "--window-size=1920,1080", ], ) # Create context with randomized settings context_options = { "viewport": {"width": 1920, "height": 1080}, "user_agent": self.ua.random if settings.use_random_user_agent else settings.default_user_agent, } if settings.use_proxy and settings.proxy_url: context_options["proxy"] = {"server": settings.proxy_url} self.context = await self.browser.new_context(**context_options) # Add stealth scripts await self.context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined, }); Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5], }); Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'], }); """) self.page = await self.context.new_page() async def scrape_with_stealth( self, url: str, method: str = "selenium", extract_config: Optional[Dict[str, Any]] = None, wait_for_element: Optional[str] = None, scroll_page: bool = False, ) -> Dict[str, Any]: """ Scrape using stealth techniques to avoid detection. Args: url: URL to scrape method: "selenium" or "playwright" extract_config: Data extraction configuration wait_for_element: Element to wait for scroll_page: Whether to scroll the page to load dynamic content """ try: if method == "selenium": return await self._scrape_with_selenium_stealth( url, extract_config, wait_for_element, scroll_page ) elif method == "playwright": return await self._scrape_with_playwright_stealth( url, extract_config, wait_for_element, scroll_page ) else: raise ValueError(f"Unknown stealth method: {method}") except Exception as e: logger.error(f"Stealth scraping failed for {url}: {str(e)}") return {"error": str(e), "url": url} finally: await self.cleanup() async def _scrape_with_selenium_stealth( self, url: str, extract_config: Optional[Dict[str, Any]], wait_for_element: Optional[str], scroll_page: bool, ) -> Dict[str, Any]: """Scrape using Selenium with stealth techniques.""" self.driver = await self._get_undetected_chrome_driver() # Random delay before navigation await asyncio.sleep(random.uniform(1, 3)) self.driver.get(url) # Wait for page load await asyncio.sleep(random.uniform(2, 4)) # Wait for specific element if specified if wait_for_element: try: WebDriverWait(self.driver, settings.browser_timeout).until( EC.presence_of_element_located((By.CSS_SELECTOR, wait_for_element)) ) except TimeoutException: logger.warning(f"Timeout waiting for element: {wait_for_element}") # Scroll page to load dynamic content if scroll_page: await self._scroll_page_selenium() # Human-like mouse movements await self._simulate_human_behavior_selenium() # Extract data result = await self._extract_data_selenium(extract_config) result["url"] = self.driver.current_url return result async def _scrape_with_playwright_stealth( self, url: str, extract_config: Optional[Dict[str, Any]], wait_for_element: Optional[str], scroll_page: bool, ) -> Dict[str, Any]: """Scrape using Playwright with stealth techniques.""" await self._setup_playwright_browser() # Random delay before navigation await asyncio.sleep(random.uniform(1, 3)) # Navigate with random timeout await self.page.goto(url, timeout=60000, wait_until="domcontentloaded") # Wait for specific element if specified if wait_for_element: try: await self.page.wait_for_selector( wait_for_element, timeout=settings.browser_timeout * 1000 ) except Exception: logger.warning(f"Timeout waiting for element: {wait_for_element}") # Scroll page to load dynamic content if scroll_page: await self._scroll_page_playwright() # Human-like interactions await self._simulate_human_behavior_playwright() # Extract data result = await self._extract_data_playwright(extract_config) result["url"] = self.page.url return result async def _scroll_page_selenium(self) -> None: """Scroll page naturally to trigger dynamic content loading.""" total_height = self.driver.execute_script("return document.body.scrollHeight") current_height = 0 while current_height < total_height: # Scroll down by a random amount scroll_amount = random.randint(200, 600) self.driver.execute_script(f"window.scrollBy(0, {scroll_amount});") # Random delay between scrolls await asyncio.sleep(random.uniform(0.5, 2.0)) current_height += scroll_amount # Check if page height changed (lazy loading) new_height = self.driver.execute_script("return document.body.scrollHeight") if new_height > total_height: total_height = new_height async def _scroll_page_playwright(self) -> None: """Scroll page naturally using Playwright.""" await self.page.evaluate(""" new Promise((resolve) => { let totalHeight = 0; const distance = 100; const timer = setInterval(() => { const scrollHeight = document.body.scrollHeight; window.scrollBy(0, distance); totalHeight += distance; if(totalHeight >= scrollHeight){ clearInterval(timer); resolve(); } }, 100); }) """) async def _simulate_human_behavior_selenium(self) -> None: """Simulate human-like behavior to avoid detection.""" try: # Random mouse movements actions = ActionChains(self.driver) # Move mouse to random positions for _ in range(random.randint(2, 5)): x = random.randint(100, 800) y = random.randint(100, 600) actions.move_by_offset(x, y) await asyncio.sleep(random.uniform(0.1, 0.5)) actions.perform() # Random delays await asyncio.sleep(random.uniform(1, 3)) except Exception as e: logger.debug(f"Error simulating human behavior: {str(e)}") async def _simulate_human_behavior_playwright(self) -> None: """Simulate human-like behavior using Playwright.""" try: # Random mouse movements for _ in range(random.randint(2, 4)): x = random.randint(100, 800) y = random.randint(100, 600) await self.page.mouse.move(x, y) await asyncio.sleep(random.uniform(0.1, 0.5)) # Random delays await asyncio.sleep(random.uniform(1, 2)) except Exception as e: logger.debug(f"Error simulating human behavior: {str(e)}") async def _extract_data_selenium( self, extract_config: Optional[Dict[str, Any]] ) -> Dict[str, Any]: """Extract data using Selenium.""" from bs4 import BeautifulSoup soup = BeautifulSoup(self.driver.page_source, "html.parser") result = {"title": self.driver.title, "content": {}} # Get meta description try: meta_desc = self.driver.find_element( By.CSS_SELECTOR, "meta[name='description']" ) result["meta_description"] = meta_desc.get_attribute("content") except NoSuchElementException: result["meta_description"] = None if extract_config: for key, config in extract_config.items(): try: if isinstance(config, str): elements = self.driver.find_elements(By.CSS_SELECTOR, config) result["content"][key] = [elem.text for elem in elements] elif isinstance(config, dict): selector = config.get("selector") attr = config.get("attr") multiple = config.get("multiple", False) if multiple: elements = self.driver.find_elements( By.CSS_SELECTOR, selector ) if attr == "text": extracted = [elem.text for elem in elements] elif attr: extracted = [ elem.get_attribute(attr) for elem in elements ] else: extracted = [ elem.get_attribute("outerHTML") for elem in elements ] else: try: element = self.driver.find_element( By.CSS_SELECTOR, selector ) if attr == "text": extracted = element.text elif attr: extracted = element.get_attribute(attr) else: extracted = element.get_attribute("outerHTML") except Exception: extracted = None result["content"][key] = extracted except Exception as e: logger.warning(f"Failed to extract {key}: {str(e)}") result["content"][key] = None else: # Default extraction result["content"]["text"] = soup.get_text(strip=True) result["content"]["links"] = [ { "url": urljoin(self.driver.current_url, str(a.get("href", ""))), "text": a.get_text(strip=True), } for a in soup.find_all("a", href=True) if hasattr(a, "get") ] return result async def _extract_data_playwright( self, extract_config: Optional[Dict[str, Any]] ) -> Dict[str, Any]: """Extract data using Playwright.""" result = {"title": await self.page.title(), "content": {}} # Get meta description try: meta_desc = await self.page.get_attribute( "meta[name='description']", "content" ) result["meta_description"] = meta_desc except Exception: result["meta_description"] = None if extract_config: for key, config in extract_config.items(): try: if isinstance(config, str): elements = await self.page.query_selector_all(config) texts = [] for element in elements: text = await element.text_content() texts.append(text) result["content"][key] = texts elif isinstance(config, dict): selector = config.get("selector") attr = config.get("attr") multiple = config.get("multiple", False) if multiple: elements = await self.page.query_selector_all(selector) extracted = [] for element in elements: if attr == "text": value = await element.text_content() elif attr: value = await element.get_attribute(attr) else: value = await element.inner_html() extracted.append(value) else: element = await self.page.query_selector(selector) if element: if attr == "text": extracted = await element.text_content() elif attr: extracted = await element.get_attribute(attr) else: extracted = await element.inner_html() else: extracted = None result["content"][key] = extracted except Exception as e: logger.warning(f"Failed to extract {key}: {str(e)}") result["content"][key] = None else: # Default extraction result["content"]["text"] = await self.page.text_content("body") # Extract links links = [] link_elements = await self.page.query_selector_all("a[href]") for link_elem in link_elements: href = await link_elem.get_attribute("href") text = await link_elem.text_content() if href: links.append( { "url": urljoin(self.page.url, href), "text": text.strip() if text else "", } ) result["content"]["links"] = links return result async def cleanup(self) -> None: """Clean up resources.""" if self.driver: self.driver.quit() self.driver = None if self.page: await self.page.close() self.page = None if self.context: await self.context.close() self.context = None if self.browser: await self.browser.close() self.browser = None if self.playwright: await self.playwright.stop() self.playwright = None class FormHandler: """Handle form interactions and submissions.""" def __init__(self, driver_or_page: Any) -> None: self.driver_or_page = driver_or_page # Simple check for Playwright page self.is_playwright = hasattr(driver_or_page, "fill") async def fill_form( self, form_data: Dict[str, Any], submit: bool = False, submit_button_selector: Optional[str] = None, ) -> Dict[str, Any]: """ Fill and optionally submit a form. Args: form_data: Dict mapping field selectors to values submit: Whether to submit the form submit_button_selector: Selector for submit button (if not using default) """ try: results = {} for field_selector, value in form_data.items(): field_result = await self._fill_field(field_selector, value) results[field_selector] = field_result if submit: submit_result = await self._submit_form(submit_button_selector) results["_submit"] = submit_result return {"success": True, "results": results} except Exception as e: logger.error(f"Error filling form: {str(e)}") return {"success": False, "error": str(e)} async def _fill_field(self, selector: str, value: Any) -> Dict[str, Any]: """Fill a single form field.""" try: if self.is_playwright: return await self._fill_field_playwright(selector, value) else: return await self._fill_field_selenium(selector, value) except Exception as e: return {"success": False, "error": str(e)} async def _fill_field_selenium(self, selector: str, value: Any) -> Dict[str, Any]: """Fill field using Selenium.""" try: element = self.driver_or_page.find_element(By.CSS_SELECTOR, selector) tag_name = element.tag_name.lower() input_type = element.get_attribute("type") if tag_name == "select": # Handle select dropdown select = Select(element) if isinstance(value, int): select.select_by_index(value) elif str(value).isdigit(): select.select_by_value(str(value)) else: select.select_by_visible_text(str(value)) elif input_type in ["checkbox", "radio"]: # Handle checkbox/radio if value and not element.is_selected(): element.click() elif not value and element.is_selected(): element.click() elif input_type == "file": # Handle file upload element.send_keys(str(value)) else: # Handle text inputs element.clear() element.send_keys(str(value)) return {"success": True, "value": value} except Exception as e: return {"success": False, "error": str(e)} async def _fill_field_playwright(self, selector: str, value: Any) -> Dict[str, Any]: """Fill field using Playwright.""" try: element = await self.driver_or_page.query_selector(selector) if not element: return {"success": False, "error": "Element not found"} tag_name = await element.evaluate("el => el.tagName.toLowerCase()") input_type = await element.get_attribute("type") if tag_name == "select": # Handle select dropdown if isinstance(value, int): await element.select_option(index=value) else: await element.select_option(label=str(value)) elif input_type in ["checkbox", "radio"]: # Handle checkbox/radio is_checked = await element.is_checked() if value and not is_checked: await element.check() elif not value and is_checked: await element.uncheck() elif input_type == "file": # Handle file upload await element.set_input_files(str(value)) else: # Handle text inputs await element.fill(str(value)) return {"success": True, "value": value} except Exception as e: return {"success": False, "error": str(e)} async def _submit_form( self, submit_button_selector: Optional[str] = None ) -> Dict[str, Any]: """Submit the form.""" try: if self.is_playwright: return await self._submit_form_playwright(submit_button_selector) else: return await self._submit_form_selenium(submit_button_selector) except Exception as e: return {"success": False, "error": str(e)} async def _submit_form_selenium( self, submit_button_selector: Optional[str] = None ) -> Dict[str, Any]: """Submit form using Selenium.""" try: if submit_button_selector: # Use specific submit button submit_button = self.driver_or_page.find_element( By.CSS_SELECTOR, submit_button_selector ) submit_button.click() else: # Try to find submit button automatically submit_selectors = [ "input[type='submit']", "button[type='submit']", "button:contains('Submit')", "input[value*='Submit']", "button:contains('Send')", ] for selector in submit_selectors: try: if "contains" in selector: # Use XPath for text content xpath = f"//button[contains(text(), 'Submit')] | //button[contains(text(), 'Send')]" submit_button = self.driver_or_page.find_element( By.XPATH, xpath ) else: submit_button = self.driver_or_page.find_element( By.CSS_SELECTOR, selector ) submit_button.click() break except NoSuchElementException: continue else: # If no submit button found, try submitting the form directly form = self.driver_or_page.find_element(By.TAG_NAME, "form") form.submit() # Wait for page to load after submission await asyncio.sleep(2) return {"success": True, "new_url": self.driver_or_page.current_url} except Exception as e: return {"success": False, "error": str(e)} async def _submit_form_playwright( self, submit_button_selector: Optional[str] = None ) -> Dict[str, Any]: """Submit form using Playwright.""" try: if submit_button_selector: # Use specific submit button await self.driver_or_page.click(submit_button_selector) else: # Try to find submit button automatically submit_selectors = [ "input[type='submit']", "button[type='submit']", "text=Submit", "text=Send", ] for selector in submit_selectors: try: await self.driver_or_page.click(selector) break except Exception: continue else: # If no submit button found, press Enter on the form await self.driver_or_page.keyboard.press("Enter") # Wait for navigation or response try: await self.driver_or_page.wait_for_load_state( "networkidle", timeout=10000 ) except Exception: pass # Ignore timeout, form might submit without navigation return {"success": True, "new_url": self.driver_or_page.url} except Exception as e: return {"success": False, "error": str(e)}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ThreeFish-AI/scrapy-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server