"""
反检测工具模块
提供各种反机器人检测的工具函数和类
"""
import random
import asyncio
from typing import Optional, Tuple
from datetime import datetime, timedelta
from playwright.async_api import Page, BrowserContext
from loguru import logger
class HumanBehavior:
"""人类行为模拟器"""
@staticmethod
async def random_delay(min_ms: int = 500, max_ms: int = 2000) -> None:
"""
随机延迟,模拟人类思考时间
Args:
min_ms: 最小延迟(毫秒)
max_ms: 最大延迟(毫秒)
"""
delay = random.uniform(min_ms / 1000, max_ms / 1000)
await asyncio.sleep(delay)
@staticmethod
async def typing_delay() -> None:
"""打字延迟,模拟人类打字速度"""
delay = random.uniform(0.05, 0.15)
await asyncio.sleep(delay)
@staticmethod
async def reading_delay(text_length: int = 100) -> None:
"""
阅读延迟,根据文本长度模拟阅读时间
Args:
text_length: 文本长度
"""
# 假设每个字符阅读时间 20-50ms
delay = text_length * random.uniform(0.02, 0.05)
# 最少 0.5 秒,最多 5 秒
delay = max(0.5, min(delay, 5.0))
await asyncio.sleep(delay)
@staticmethod
async def random_mouse_movement(page: Page, steps: Optional[int] = None) -> None:
"""
随机鼠标移动,模拟人类鼠标轨迹
Args:
page: Playwright 页面对象
steps: 移动步数,None 则随机
"""
try:
# 获取视口大小
viewport = page.viewport_size
if not viewport:
viewport = {"width": 1920, "height": 1080}
# 随机目标位置
x = random.randint(100, viewport["width"] - 100)
y = random.randint(100, viewport["height"] - 100)
# 随机步数
if steps is None:
steps = random.randint(10, 30)
await page.mouse.move(x, y, steps=steps)
await asyncio.sleep(random.uniform(0.1, 0.3))
except Exception as e:
logger.debug(f"鼠标移动失败: {e}")
@staticmethod
async def random_scroll(page: Page) -> None:
"""随机滚动页面,模拟浏览行为"""
try:
# 随机滚动距离
scroll_distance = random.randint(100, 500)
direction = random.choice([1, -1]) # 向下或向上
await page.evaluate(f"""
window.scrollBy({{
top: {scroll_distance * direction},
behavior: 'smooth'
}});
""")
await asyncio.sleep(random.uniform(0.5, 1.5))
except Exception as e:
logger.debug(f"页面滚动失败: {e}")
@staticmethod
async def human_type(page_element, text: str, min_delay: float = 0.05, max_delay: float = 0.15) -> None:
"""
模拟人类打字,逐字输入带随机延迟
Args:
page_element: 页面元素
text: 要输入的文本
min_delay: 最小延迟(秒)
max_delay: 最大延迟(秒)
"""
for char in text:
await page_element.type(char)
delay = random.uniform(min_delay, max_delay)
await asyncio.sleep(delay)
class RateLimiter:
"""操作频率限制器"""
def __init__(self):
self.last_action_time: dict[str, datetime] = {}
self.action_counts: dict[str, int] = {}
self.daily_limits: dict[str, int] = {
"publish": 10, # 每日发布限制
"comment": 50, # 每日评论限制
"like": 100, # 每日点赞限制
"favorite": 50, # 每日收藏限制
}
async def wait_if_needed(
self,
action_type: str,
min_interval: int = 5,
max_interval: int = 10
) -> None:
"""
确保操作间隔,避免频繁操作
Args:
action_type: 操作类型
min_interval: 最小间隔(秒)
max_interval: 最大间隔(秒)
"""
now = datetime.now()
# 检查上次操作时间
if action_type in self.last_action_time:
elapsed = (now - self.last_action_time[action_type]).total_seconds()
required_interval = random.uniform(min_interval, max_interval)
if elapsed < required_interval:
wait_time = required_interval - elapsed
logger.info(f"速率限制: {action_type} 需要等待 {wait_time:.1f} 秒")
await asyncio.sleep(wait_time)
# 更新最后操作时间
self.last_action_time[action_type] = datetime.now()
def check_daily_limit(self, action_type: str) -> bool:
"""
检查是否超过每日限制
Args:
action_type: 操作类型
Returns:
是否可以继续操作
"""
if action_type not in self.daily_limits:
return True
current_count = self.action_counts.get(action_type, 0)
limit = self.daily_limits[action_type]
if current_count >= limit:
logger.warning(f"已达到每日 {action_type} 限制: {current_count}/{limit}")
return False
return True
def increment_action_count(self, action_type: str) -> None:
"""增加操作计数"""
self.action_counts[action_type] = self.action_counts.get(action_type, 0) + 1
def reset_daily_counts(self) -> None:
"""重置每日计数"""
self.action_counts.clear()
logger.info("已重置每日操作计数")
class BrowserFingerprint:
"""浏览器指纹配置"""
@staticmethod
def get_random_viewport() -> dict:
"""获取随机视口大小"""
viewports = [
{"width": 1920, "height": 1080},
{"width": 1366, "height": 768},
{"width": 1536, "height": 864},
{"width": 1440, "height": 900},
]
return random.choice(viewports)
@staticmethod
def get_user_agent() -> str:
"""获取真实的 User-Agent"""
user_agents = [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15",
]
return random.choice(user_agents)
@staticmethod
def get_context_options() -> dict:
"""获取浏览器上下文配置"""
return {
"viewport": BrowserFingerprint.get_random_viewport(),
"user_agent": BrowserFingerprint.get_user_agent(),
"locale": "zh-CN",
"timezone_id": "Asia/Shanghai",
"permissions": ["geolocation", "notifications"],
"geolocation": {
"latitude": 31.2304 + random.uniform(-0.1, 0.1),
"longitude": 121.4737 + random.uniform(-0.1, 0.1),
},
"color_scheme": "light",
"extra_http_headers": {
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
},
}
@staticmethod
async def apply_stealth(page: Page) -> None:
"""
应用反检测脚本
注意: 这是一个简化版本,如果需要更强的反检测,建议使用 playwright-stealth
"""
try:
# 隐藏 webdriver 标识
await page.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
""")
# 修改 plugins 长度
await page.add_init_script("""
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
""")
# 修改 languages
await page.add_init_script("""
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh', 'en']
});
""")
# 覆盖 permissions
await page.add_init_script("""
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
""")
logger.debug("已应用反检测脚本")
except Exception as e:
logger.warning(f"应用反检测脚本失败: {e}")
class AntiDetectionManager:
"""反检测管理器 - 统一管理所有反检测功能"""
def __init__(self, enable_stealth: bool = True):
self.enable_stealth = enable_stealth
self.human_behavior = HumanBehavior()
self.rate_limiter = RateLimiter()
self.fingerprint = BrowserFingerprint()
async def prepare_page(self, page: Page) -> None:
"""
准备页面,应用反检测措施
Args:
page: Playwright 页面对象
"""
if self.enable_stealth:
await self.fingerprint.apply_stealth(page)
# 随机初始鼠标移动
await self.human_behavior.random_mouse_movement(page)
async def before_action(self, action_type: str, min_delay: int = 1000, max_delay: int = 3000) -> bool:
"""
操作前检查
Args:
action_type: 操作类型
min_delay: 最小延迟(毫秒)
max_delay: 最大延迟(毫秒)
Returns:
是否可以继续操作
"""
# 检查每日限制
if not self.rate_limiter.check_daily_limit(action_type):
return False
# 等待速率限制
await self.rate_limiter.wait_if_needed(
action_type,
min_interval=min_delay / 1000,
max_interval=max_delay / 1000
)
# 随机延迟
await self.human_behavior.random_delay(min_delay, max_delay)
return True
async def after_action(self, action_type: str, page: Optional[Page] = None) -> None:
"""
操作后处理
Args:
action_type: 操作类型
page: Playwright 页面对象(可选)
"""
# 增加操作计数
self.rate_limiter.increment_action_count(action_type)
# 随机行为
if page and random.random() < 0.3: # 30% 概率
await self.human_behavior.random_mouse_movement(page)
# 全局实例
_anti_detection_manager: Optional[AntiDetectionManager] = None
def get_anti_detection_manager(enable_stealth: bool = True) -> AntiDetectionManager:
"""获取全局反检测管理器实例"""
global _anti_detection_manager
if _anti_detection_manager is None:
_anti_detection_manager = AntiDetectionManager(enable_stealth=enable_stealth)
return _anti_detection_manager