Skip to main content
Glama
anti_bot.py13 kB
""" 反爬虫策略工具类 提供统一的反爬虫策略和人类行为模拟 """ import asyncio import random from typing import Optional from playwright.async_api import Page, TimeoutError as PlaywrightTimeoutError from loguru import logger class AntiBotStrategy: """反爬虫策略工具类""" @staticmethod async def add_random_delay(base_delay: float = 1.0, max_extra: int = 3, seed: Optional[str] = None) -> None: """ 添加随机延迟,模拟人类行为 Args: base_delay: 基础延迟时间(秒) max_extra: 最大额外延迟时间(秒) seed: 用于生成随机数的种子 """ if seed: extra_delay = hash(seed) % max_extra else: extra_delay = random.randint(0, max_extra - 1) delay = base_delay + extra_delay logger.debug(f"添加随机延迟: {delay}秒") await asyncio.sleep(delay) @staticmethod async def simulate_human_navigation(page: Page, url: str, timeout: int = 60000) -> None: """ 模拟人类导航行为 Args: page: Playwright页面对象 url: 目标URL timeout: 超时时间(毫秒) """ logger.debug(f"模拟人类导航到: {url}") # 使用更自然的等待策略 await page.goto(url, wait_until="domcontentloaded", timeout=timeout) # 等待页面稳定 await asyncio.sleep(2) # 模拟滚动行为 await AntiBotStrategy.simulate_scroll_behavior(page) # 等待网络空闲 try: await page.wait_for_load_state("networkidle", timeout=30000) except PlaywrightTimeoutError: logger.warning("等待网络空闲超时,继续执行") @staticmethod async def simulate_scroll_behavior(page: Page, scroll_distance: int = 100) -> None: """ 模拟人类滚动行为 Args: page: Playwright页面对象 scroll_distance: 滚动距离 """ logger.debug("模拟滚动行为") # 向下滚动一点 await page.evaluate(f"window.scrollTo(0, {scroll_distance})") await asyncio.sleep(0.5) # 滚回顶部 await page.evaluate("window.scrollTo(0, 0)") await asyncio.sleep(0.3) @staticmethod async def simulate_natural_scrolling(page: Page, scroll_count: int = 3) -> None: """ 模拟自然的分步滚动行为 Args: page: Playwright页面对象 scroll_count: 滚动步数 """ logger.debug(f"模拟自然滚动,步数: {scroll_count}") try: current_height = await page.evaluate("document.body.scrollHeight") scroll_step = current_height // (scroll_count + 1) for i in range(scroll_count): # 添加随机延迟 await asyncio.sleep(0.5 + random.random()) # 滚动到指定位置 scroll_position = scroll_step * (i + 1) await page.evaluate(f"window.scrollTo(0, {scroll_position})") # 短暂停留 await asyncio.sleep(0.3) # 最后滚动到底部 await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") except Exception as e: logger.warning(f"模拟自然滚动失败: {e}") @staticmethod async def wait_for_page_stable(page: Page, timeout: int = 10000) -> None: """ 等待页面稳定 Args: page: Playwright页面对象 timeout: 超时时间(毫秒) """ try: await page.wait_for_load_state("networkidle", timeout=timeout) except PlaywrightTimeoutError: logger.warning("等待页面稳定超时,继续执行") @staticmethod async def extract_initial_state_safely(page: Page) -> str: """ 安全地提取__INITIAL_STATE__数据,参考search.py的成功实现 Args: page: Playwright页面对象 Returns: 序列化后的状态数据 """ logger.debug("安全提取__INITIAL_STATE__数据") # 等待__INITIAL_STATE__加载完成 await page.wait_for_function("() => window.__INITIAL_STATE__ !== undefined", timeout=30000) # 使用与search.py相同的方法提取数据 initial_state_js = """ () => { if (window.__INITIAL_STATE__) { // 安全地序列化,避免循环引用 try { return JSON.stringify(window.__INITIAL_STATE__, (key, value) => { // 跳过可能导致循环引用的属性 if (key === 'dep' || key === 'computed' || typeof value === 'function') { return undefined; } return value; }); } catch (e) { console.log('JSON序列化失败,尝试提取关键部分:', e.message); // 如果还是有问题,尝试提取我们需要的部分 const state = window.__INITIAL_STATE__; const result = {}; // 提取用户相关数据 if (state.user) { result.user = {}; if (state.user.userPageData) { result.user.userPageData = state.user.userPageData; } if (state.user.notes) { result.user.notes = {}; if (state.user.notes._rawValue) { result.user.notes._rawValue = state.user.notes._rawValue; } } } // 提取笔记相关数据 if (state.note) { result.note = state.note; } // 提取评论数据 if (state.comments) { result.comments = state.comments; } // 提取feed数据 if (state.feed) { result.feed = state.feed; } // 提取搜索相关数据 if (state.Main && state.Main.feedData) { result.Main = { feedData: state.Main.feedData }; } try { return JSON.stringify(result); } catch (e2) { console.log('备用方案也失败:', e2.message); return "{}"; } } } return ""; } """ result = await page.evaluate(initial_state_js) return result @staticmethod async def extract_feed_detail_state(page: Page) -> str: """ 专门用于笔记详情页的数据提取方法 去除 Vue 响应式的 _rawValue / _value 层和响应式字段,仅保留核心内容 Args: page: Playwright页面对象 Returns: 清理后的状态数据 JSON 字符串 """ logger.debug("提取笔记详情页状态数据(去除Vue响应式)") # 等待__INITIAL_STATE__加载完成 await page.wait_for_function("() => window.__INITIAL_STATE__ !== undefined", timeout=30000) # 提取并清理 Vue 响应式数据的 JavaScript 代码 extract_js = """ () => { if (!window.__INITIAL_STATE__) { return ""; } // 递归函数:去除 Vue 响应式包装 function unwrapVueReactive(obj, visited = new WeakSet()) { // 处理 null 和 undefined if (obj === null || obj === undefined) { return obj; } // 处理循环引用 if (visited.has(obj)) { return null; } // 处理基本类型 if (typeof obj !== 'object') { return obj; } // 处理数组 if (Array.isArray(obj)) { visited.add(obj); return obj.map(item => unwrapVueReactive(item, visited)); } // 处理 Vue 响应式对象 // Vue 3 使用 _rawValue, Vue 2 可能使用 _value if (obj._rawValue !== undefined) { visited.add(obj); return unwrapVueReactive(obj._rawValue, visited); } if (obj._value !== undefined && typeof obj._value === 'object') { visited.add(obj); return unwrapVueReactive(obj._value, visited); } // 跳过 Vue 内部属性 const vueInternalKeys = [ '__v_isRef', '__v_isReactive', '__v_isReadonly', '__v_raw', 'dep', 'computed', '_shallow', '_dirty', 'effect', 'deps', 'activeEffect', 'targetMap' ]; // 构建清理后的对象 const result = {}; visited.add(obj); for (const key in obj) { // 跳过 Vue 内部属性 if (vueInternalKeys.includes(key)) { continue; } // 跳过函数 if (typeof obj[key] === 'function') { continue; } // 跳过以 _ 开头的 Vue 内部属性(除了我们需要的) if (key.startsWith('__') && key !== '__INITIAL_STATE__') { continue; } // 递归处理值 try { result[key] = unwrapVueReactive(obj[key], visited); } catch (e) { // 如果处理失败,跳过该属性 continue; } } return result; } try { // 提取并清理状态数据 const state = window.__INITIAL_STATE__; const cleanedState = unwrapVueReactive(state); // 序列化为 JSON return JSON.stringify(cleanedState, (key, value) => { // 再次过滤函数和 Vue 内部属性 if (typeof value === 'function') { return undefined; } if (key && (key.startsWith('__v_') || key === 'dep' || key === 'computed')) { return undefined; } return value; }); } catch (e) { console.error('提取笔记详情状态失败:', e); // 如果失败,尝试提取核心部分 try { const state = window.__INITIAL_STATE__; const coreData = { global: state.global, user: state.user, note: state.note, board: state.board, login: state.login, feed: state.feed, layout: state.layout, search: state.search, notification: state.notification, redMoji: state.redMoji }; // 清理核心数据 const cleanedCore = unwrapVueReactive(coreData); return JSON.stringify(cleanedCore); } catch (e2) { console.error('备用提取方案也失败:', e2); return "{}"; } } } """ result = await page.evaluate(extract_js) return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/luyike221/xiaohongshu-mcp-python'

If you have feedback or need assistance with the MCP directory API, please join our Discord server