Skip to main content
Glama

Google Search Tool

by iwanghc
html_extractor.py11.5 kB
""" HTML提取模块 """ import json import re from datetime import datetime from pathlib import Path from typing import Optional from playwright.async_api import async_playwright, Page from common.types import HtmlResponse from common import logger from .fingerprint import get_random_delay class HtmlExtractor: """HTML提取器""" def __init__(self): # Google域名列表 self.google_domains = [ "https://www.google.com", "https://www.google.co.uk", "https://www.google.ca", "https://www.google.com.au" ] async def extract_html(self, query: str, timeout: int, state_file: str, no_save_state: bool, locale: str, saved_state, fingerprint_file: str, save_to_file: bool = False, output_path: str = None) -> HtmlResponse: """提取Google搜索页面的HTML""" async with async_playwright() as p: # 初始化浏览器,添加更多参数以避免检测 browser = await p.chromium.launch( headless=True, # 总是以无头模式启动 timeout=timeout * 2, # 增加浏览器启动超时时间 args=[ "--disable-blink-features=AutomationControlled", "--disable-features=IsolateOrigins,site-per-process", "--disable-site-isolation-trials", "--disable-web-security", "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-accelerated-2d-canvas", "--no-first-run", "--no-zygote", "--disable-gpu", "--hide-scrollbars", "--mute-audio", "--disable-background-networking", "--disable-background-timer-throttling", "--disable-backgrounding-occluded-windows", "--disable-breakpad", "--disable-component-extensions-with-background-pages", "--disable-extensions", "--disable-features=TranslateUI", "--disable-ipc-flooding-protection", "--disable-renderer-backgrounding", "--enable-features=NetworkService,NetworkServiceInProcess", "--force-color-profile=srgb", "--metrics-recording-only" ] ) logger.info("浏览器已成功启动!") try: # 这里实现获取HTML的具体逻辑(简化版本) # 由于篇幅限制,这里使用简化的实现 context = await browser.new_context() page = await context.new_page() # 使用保存的Google域名或随机选择一个 if saved_state.google_domain: selected_domain = saved_state.google_domain logger.info(f"使用保存的Google域名: {selected_domain}") else: import random selected_domain = random.choice(self.google_domains) saved_state.google_domain = selected_domain logger.info(f"随机选择Google域名: {selected_domain}") logger.info("正在访问Google搜索页面...") # 访问Google搜索页面 await page.goto(selected_domain, timeout=timeout, wait_until="networkidle") # 输入搜索关键词并执行搜索(简化版本) search_input = await page.wait_for_selector("textarea[name='q'], input[name='q']", timeout=5000) await search_input.click() await page.keyboard.type(query, delay=get_random_delay(10, 30)) await page.wait_for_timeout(get_random_delay(100, 300)) await page.keyboard.press("Enter") logger.info("正在等待搜索结果页面加载完成...") # 等待页面加载完成 await page.wait_for_load_state("networkidle", timeout=timeout) # 获取当前页面URL final_url = page.url logger.info(f"搜索结果页面已加载,准备提取HTML: {final_url}") # 添加额外的等待时间,确保页面完全加载和稳定 logger.info("等待页面稳定...") await page.wait_for_timeout(1000) # 等待1秒,让页面完全稳定 # 再次等待网络空闲,确保所有异步操作完成 await page.wait_for_load_state("networkidle", timeout=timeout) # 获取页面HTML内容 full_html = await page.content() # 移除CSS和JavaScript内容,只保留纯HTML html = re.sub(r'<style\b[^<]*(?:(?!<\/style>)<[^<]*)*<\/style>', '', full_html, flags=re.IGNORECASE) html = re.sub(r'<link\s+[^>]*rel=["\']stylesheet["\'][^>]*>', '', html, flags=re.IGNORECASE) html = re.sub(r'<script\b[^<]*(?:(?!<\/script>)<[^<]*)*<\/script>', '', html, flags=re.IGNORECASE) logger.info(f"成功获取并清理页面HTML内容: originalLength={len(full_html)}, cleanedLength={len(html)}") # 如果需要,将HTML保存到文件并截图 saved_file_path = None screenshot_path = None if save_to_file: # 生成默认文件名(如果未提供) if not output_path: # 确保目录存在 output_dir = Path("./google-search-html") output_dir.mkdir(exist_ok=True) # 生成文件名:查询词-时间戳.html timestamp = datetime.now().isoformat().replace(":", "-").replace(".", "-") sanitized_query = re.sub(r'[^a-zA-Z0-9]', "_", query)[:50] output_path = output_dir / f"{sanitized_query}-{timestamp}.html" else: # 使用用户指定的路径 output_path = Path(output_path) # 如果是目录,在目录下生成文件名 if output_path.is_dir() or not output_path.suffix: if output_path.is_dir(): output_dir = output_path else: output_dir = output_path output_dir.mkdir(parents=True, exist_ok=True) # 在指定目录下生成文件名 timestamp = datetime.now().isoformat().replace(":", "-").replace(".", "-") sanitized_query = re.sub(r'[^a-zA-Z0-9]', "_", query)[:50] output_path = output_dir / f"{sanitized_query}-{timestamp}.html" # 确保文件目录存在 file_dir = output_path.parent file_dir.mkdir(parents=True, exist_ok=True) # 写入HTML文件 with open(output_path, 'w', encoding='utf-8') as f: f.write(html) saved_file_path = str(output_path) logger.info(f"清理后的HTML内容已保存到文件: {output_path}") # 保存网页截图 screenshot_file_path = str(output_path).replace('.html', '.png') # 截取整个页面的截图 logger.info("正在截取网页截图...") await page.screenshot(path=screenshot_file_path, full_page=True) screenshot_path = screenshot_file_path logger.info(f"网页截图已保存: {screenshot_file_path}") try: # 保存浏览器状态(除非用户指定了不保存) if not no_save_state: logger.info(f"正在保存浏览器状态: {state_file}") # 确保目录存在 state_dir = Path(state_file).parent state_dir.mkdir(parents=True, exist_ok=True) # 保存状态 await context.storage_state(path=state_file) logger.info("浏览器状态保存成功!") # 保存指纹配置 try: fingerprint_data = { 'fingerprint': { 'device_name': saved_state.fingerprint.device_name, 'locale': saved_state.fingerprint.locale, 'timezone_id': saved_state.fingerprint.timezone_id, 'color_scheme': saved_state.fingerprint.color_scheme, 'reduced_motion': saved_state.fingerprint.reduced_motion, 'forced_colors': saved_state.fingerprint.forced_colors } if saved_state.fingerprint else None, 'google_domain': saved_state.google_domain } with open(fingerprint_file, 'w', encoding='utf-8') as f: json.dump(fingerprint_data, f, indent=2, ensure_ascii=False) logger.info(f"指纹配置已保存: {fingerprint_file}") except Exception as fingerprint_error: logger.error(f"保存指纹配置时发生错误: {fingerprint_error}") else: logger.info("根据用户设置,不保存浏览器状态") except Exception as error: logger.error(f"保存浏览器状态时发生错误: {error}") # 返回HTML响应 return HtmlResponse( query=query, html=html, url=final_url, saved_path=saved_file_path, screenshot_path=screenshot_path, original_html_length=len(full_html) ) except Exception as error: logger.error(f"获取页面HTML过程中发生错误: {error}") # 返回错误信息 raise Exception(f"获取Google搜索页面HTML失败: {str(error)}") finally: # 关闭浏览器 logger.info("正在关闭浏览器...") await browser.close()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/iwanghc/mcp_web_search'

If you have feedback or need assistance with the MCP directory API, please join our Discord server