Skip to main content
Glama
publish.py75 kB
""" 小红书发布功能 实现图文和视频内容的发布 """ import asyncio import os import platform from pathlib import Path from typing import List, Optional from playwright.async_api import Page, TimeoutError as PlaywrightTimeoutError from loguru import logger try: from fastmcp import Context except ImportError: Context = None from ..config import ( PublishImageContent, PublishVideoContent, PublishResponse, XiaohongshuUrls, XiaohongshuSelectors, BrowserConfig, PublishConfig, ) from ..config.settings import settings class PublishAction: """发布操作类""" def __init__(self, page: Page): """ 初始化发布操作 Args: page: Playwright页面对象 """ self.page = page async def publish(self, content: PublishImageContent, context: Optional[Context] = None) -> PublishResponse: """ 发布图文内容 Args: content: 发布内容 Returns: 发布结果 """ try: logger.info(f"开始发布图文内容: {content.title}") # 导航到发布页面 if context: await context.report_progress(progress=10, total=100) await self._navigate_to_publish_page(context) # 选择图文发布标签 if context: await context.report_progress(progress=20, total=100) await self._select_image_publish_tab() # 上传图片 if context: await context.report_progress(progress=30, total=100) await self._upload_images(content.images) # 等待图片上传完成 if context: await context.report_progress(progress=50, total=100) await self._wait_for_upload_complete(len(content.images)) # 处理可能出现的权限弹窗 await self._dismiss_permission_popups() # 填写内容 if context: await context.report_progress(progress=70, total=100) await self._fill_content(content.title, content.content, content.tags or []) # 再次处理可能出现的权限弹窗(填写内容时可能触发) await self._dismiss_permission_popups() # 点击发布按钮 if context: await context.report_progress(progress=90, total=100) await self._click_publish_button(is_video=False) # 等待发布完成 if context: await context.report_progress(progress=95, total=100) note_id = await self._wait_for_publish_complete() if context: await context.report_progress(progress=100, total=100) logger.info(f"图文发布成功: {note_id}") return PublishResponse( success=True, message="发布成功", note_id=note_id ) except Exception as e: logger.error(f"发布图文失败: {e}") return PublishResponse( success=False, message=f"发布失败: {str(e)}", error="PUBLISH_FAILED" ) async def publish_video(self, content: PublishVideoContent, context: Optional[Context] = None) -> PublishResponse: """ 发布视频内容 Args: content: 视频内容 context: 上下文对象,用于进度报告 Returns: 发布结果 """ try: logger.info(f"开始发布视频内容: {content.title}") # 导航到发布页面 if context: await context.report_progress(progress=10, total=100) await self._navigate_to_publish_page(context) # 选择视频发布标签 if context: await context.report_progress(progress=20, total=100) await self._select_video_publish_tab() # 验证会话有效性 logger.info("验证会话有效性...") try: # 等待页面稳定 await asyncio.sleep(2) # 检查是否被重定向到登录页 current_url = self.page.url if "/login" in current_url: raise Exception("会话已失效,页面重定向到登录页,请重新登录") # 测试会话有效性 is_valid = await self._test_session_validity() if not is_valid: logger.warning("会话验证失败,但继续尝试上传") else: logger.info("会话验证通过") except Exception as e: if "会话已失效" in str(e): raise logger.warning(f"会话验证时出现异常: {e}") # 上传视频(内部已包含等待上传完成的逻辑) if context: await context.report_progress(progress=30, total=100) await self._upload_video(content.video_path, content.cover_path) # 发送进度通知:视频上传完成 if context: await context.report_progress(progress=60, total=100) # 处理可能出现的权限弹窗 await self._dismiss_permission_popups() # 填写内容 if context: await context.report_progress(progress=70, total=100) await self._fill_content(content.title, content.content, content.tags or []) # 再次处理可能出现的权限弹窗(填写内容时可能触发) await self._dismiss_permission_popups() # 点击发布按钮 if context: await context.report_progress(progress=90, total=100) await self._click_publish_button(is_video=True) # 等待发布完成 if context: await context.report_progress(progress=95, total=100) note_id = await self._wait_for_publish_complete() if context: await context.report_progress(progress=100, total=100) logger.info(f"视频发布成功: {note_id}") return PublishResponse( success=True, message="发布成功", note_id=note_id ) except Exception as e: logger.error(f"发布视频失败: {e}") return PublishResponse( success=False, message=f"发布失败: {str(e)}", error="PUBLISH_FAILED" ) async def _navigate_to_publish_page(self, context: Optional[Context] = None): """导航到发布页面""" logger.info("导航到发布页面") # 发送进度报告:开始导航 if context: await context.report_progress(progress=5, total=100) # 使用更快的等待策略进行页面导航,避免长时间卡住 logger.info(f"开始跳转到发布页面: {XiaohongshuUrls.PUBLISH_URL}") navigation_timeout = 30000 # 30秒超时,避免长时间等待 try: # 使用 "load" 而不是 "networkidle",更快且更可靠 await self.page.goto( XiaohongshuUrls.PUBLISH_URL, wait_until="load", # 等待页面load事件,比networkidle更快 timeout=navigation_timeout ) logger.info("页面跳转完成") except Exception as e: logger.warning(f"页面跳转时出现异常: {e},继续执行") # 发送进度报告:页面加载中 if context: await context.report_progress(progress=8, total=100) # 不再等待networkidle,直接进入后续步骤,通过循环检测来判断页面是否准备好 logger.info("跳过网络空闲等待,直接进入页面检测流程") # 等待2秒,让页面稳定 logger.info("等待2秒,让页面稳定...") await asyncio.sleep(2) logger.info("页面稳定等待完成") # 循环检测,最多15次,每次间隔1秒 logger.info("开始循环检测页面是否准备好(最多15次)...") max_attempts = 15 page_ready = False for attempt in range(1, max_attempts + 1): logger.info(f"第 {attempt}/{max_attempts} 次检测...") try: # 1. 点击空白处去除弹窗 try: viewport_size = self.page.viewport_size if viewport_size: # 点击页面左侧中间位置(宽为0,长为中间) click_x = 0 click_y = viewport_size["height"] // 2 await self.page.mouse.click(click_x, click_y) logger.info(f"第 {attempt} 次检测:已点击空白处,尝试关闭弹窗") except Exception as e: logger.info(f"第 {attempt} 次检测:点击空白处时出现异常(可忽略): {e}") # 2. 判断是否可以hover(尝试hover到按钮容器) try: logger.info(f"第 {attempt} 次检测:尝试查找按钮容器...") btn_element = await self.page.query_selector('//div[@class="btn"]') if btn_element: # 尝试hover,如果成功说明页面准备好了 logger.info(f"第 {attempt} 次检测:找到按钮容器,尝试hover...") await btn_element.hover() logger.info(f"第 {attempt} 次检测:成功hover到按钮容器,页面已准备好") page_ready = True break else: logger.info(f"第 {attempt} 次检测:未找到按钮容器,继续等待") except Exception as e: logger.info(f"第 {attempt} 次检测:hover失败 - {e}") except Exception as e: logger.info(f"第 {attempt} 次检测出现异常: {e}") # 如果还没准备好,等待1秒后继续 if not page_ready and attempt < max_attempts: logger.info(f"第 {attempt} 次检测:页面未准备好,等待1秒后继续...") await asyncio.sleep(1) if page_ready: logger.info("确认已进入发布页面,页面已准备好") else: logger.warning(f"经过 {max_attempts} 次检测,未能确认页面状态,但继续执行") # 发送进度报告:移除弹窗 if context: await context.report_progress(progress=10, total=100) async def _select_image_publish_tab(self): """选择图文发布标签""" logger.info("选择图文发布标签") try: # 步骤1: 先 hover 到按钮容器 logger.debug("悬停到按钮容器") btn_element = await self.page.wait_for_selector( '//div[@class="btn"]', timeout=BrowserConfig.ELEMENT_TIMEOUT ) if btn_element: await btn_element.hover() await asyncio.sleep(0.3) # 等待悬停效果 logger.debug("已悬停到按钮容器") else: logger.warning("未找到按钮容器,直接尝试点击图文标签") # 步骤2: 点击图文发布标签 logger.debug("点击图文发布标签") tab_element = await self.page.wait_for_selector( XiaohongshuSelectors.PUBLISH_TAB, timeout=BrowserConfig.ELEMENT_TIMEOUT ) if tab_element: await tab_element.click() await asyncio.sleep(1) # 等待标签切换 logger.info("已选择图文发布标签") else: raise Exception("找不到图文发布标签") except PlaywrightTimeoutError: raise Exception("等待图文发布标签超时") async def _select_video_publish_tab(self): """选择视频发布标签""" logger.info("选择视频发布标签") try: # 步骤1: 先 hover 到按钮容器 logger.debug("悬停到按钮容器") btn_element = await self.page.wait_for_selector( '//div[@class="btn"]', timeout=BrowserConfig.ELEMENT_TIMEOUT ) if btn_element: await btn_element.hover() await asyncio.sleep(0.3) # 等待悬停效果 logger.debug("已悬停到按钮容器") else: logger.warning("未找到按钮容器,直接尝试点击视频标签") # 步骤2: 点击视频发布标签 logger.debug("点击视频发布标签") video_tab = await self.page.wait_for_selector( '//div[normalize-space(.)="上传视频"][@class="container"]', timeout=BrowserConfig.ELEMENT_TIMEOUT ) if video_tab: await video_tab.click() await asyncio.sleep(1) # 等待标签切换 logger.info("已选择视频发布标签") else: raise Exception("找不到视频发布标签") except PlaywrightTimeoutError: raise Exception("等待视频发布标签超时") async def _ensure_on_publish_page(self, timeout_seconds: int = 10): """确保当前停留在发布页面""" logger.info("检查是否停留在发布页面") deadline = asyncio.get_event_loop().time() + timeout_seconds last_url = None attempt = 0 while asyncio.get_event_loop().time() < deadline: attempt += 1 current_url = self.page.url if current_url != last_url: logger.debug(f"当前页面URL: {current_url}") last_url = current_url if "/publish/publish" in current_url and "/login" not in current_url: logger.info("确认停留在发布页面") return if "/login" in current_url: logger.warning("检测到登录页面,尝试重新导航到发布页") try: await self.page.goto( XiaohongshuUrls.PUBLISH_URL, wait_until="networkidle", timeout=BrowserConfig.PAGE_LOAD_TIMEOUT ) await asyncio.sleep(1) continue except Exception as e: logger.warning(f"重新导航到发布页失败: {e}") logger.debug(f"不在发布页,等待页面稳定 (第 {attempt} 次检测)") await asyncio.sleep(1) raise Exception("无法保持在发布页面,请检查登录状态") async def _simulate_human_behavior(self): """模拟人类行为,降低被检测风险""" import random try: # 随机延迟 0.5-2 秒 delay = random.uniform(0.5, 2.0) logger.debug(f"随机延迟 {delay:.2f} 秒") await asyncio.sleep(delay) # 模拟鼠标移动 viewport_size = self.page.viewport_size if viewport_size: width = viewport_size['width'] height = viewport_size['height'] # 随机移动鼠标 2-4 次 move_count = random.randint(2, 4) for _ in range(move_count): x = random.randint(100, width - 100) y = random.randint(100, height - 100) await self.page.mouse.move(x, y) await asyncio.sleep(random.uniform(0.1, 0.3)) logger.debug(f"模拟了 {move_count} 次鼠标移动") # 随机滚动页面 scroll_distance = random.randint(-200, 200) await self.page.evaluate(f"window.scrollBy(0, {scroll_distance})") await asyncio.sleep(random.uniform(0.2, 0.5)) logger.debug(f"模拟页面滚动: {scroll_distance}px") except Exception as e: logger.debug(f"模拟人类行为时出错: {e},继续执行") async def _test_session_validity(self): """测试会话有效性,通过触发一个需要认证的操作""" logger.info("测试会话有效性...") try: # 尝试执行一个需要认证的 JavaScript 调用 # 小红书会在页面加载时检查登录状态 result = await self.page.evaluate(""" () => { // 检查是否有用户信息 if (window.__INITIAL_STATE__ && window.__INITIAL_STATE__.user) { return { valid: true, user: window.__INITIAL_STATE__.user }; } return { valid: false }; } """) if result and result.get('valid'): logger.info("会话有效,检测到用户信息") return True else: logger.warning("会话可能无效,未检测到用户信息") return False except Exception as e: logger.warning(f"测试会话有效性时出错: {e}") return False async def _upload_images(self, image_paths: List[str]): """ 上传图片 Args: image_paths: 图片路径列表 """ if not image_paths: raise Exception("图片路径列表不能为空") # 验证文件存在性 for path in image_paths: if not os.path.exists(path): raise Exception(f"图片文件不存在: {path}") logger.info(f"开始上传 {len(image_paths)} 张图片") try: # 等待上传输入框 upload_input = await self.page.wait_for_selector( XiaohongshuSelectors.UPLOAD_INPUT, timeout=BrowserConfig.ELEMENT_TIMEOUT ) if not upload_input: raise Exception("找不到图片上传输入框") # 尝试批量上传所有图片 try: logger.info(f"尝试批量上传 {len(image_paths)} 张图片") await upload_input.set_input_files(image_paths) logger.info("批量上传成功,等待所有图片上传完成") except Exception as e: # 如果批量上传失败,尝试逐个上传 error_msg = str(e) if "Non-multiple" in error_msg or "single file" in error_msg.lower(): logger.warning(f"输入框不支持批量上传,改为逐个上传: {error_msg}") # 逐个上传 for index, image_path in enumerate(image_paths, 1): logger.info(f"上传第 {index}/{len(image_paths)} 张图片: {image_path}") # 每次上传前重新查找上传输入框(因为上传后DOM可能会变化) upload_input = await self.page.wait_for_selector( XiaohongshuSelectors.UPLOAD_INPUT, timeout=BrowserConfig.ELEMENT_TIMEOUT ) if not upload_input: raise Exception(f"找不到图片上传输入框(第 {index} 张)") # 逐个上传 await upload_input.set_input_files([image_path]) # 等待当前图片上传完成(检查已上传的图片数量) await asyncio.sleep(1) # 给一点时间让上传开始 # 等待上传进度更新 max_wait = 10 # 最多等待10秒 waited = 0 while waited < max_wait: uploaded_count = len(await self.page.query_selector_all( XiaohongshuSelectors.UPLOADED_IMAGE )) if uploaded_count >= index: logger.info(f"第 {index} 张图片上传完成") break await asyncio.sleep(0.5) waited += 0.5 # 如果不是最后一张,等待一小段时间再上传下一张 if index < len(image_paths): await asyncio.sleep(0.5) else: # 其他错误,直接抛出 raise # 等待所有图片上传完成 await self._wait_for_upload_complete(len(image_paths)) logger.info("所有图片上传完成") except PlaywrightTimeoutError: raise Exception("等待图片上传输入框超时") async def _upload_video(self, video_path: str, cover_path: Optional[str] = None): """ 上传视频 Args: video_path: 视频路径 cover_path: 封面路径 """ if not os.path.exists(video_path): raise Exception(f"视频文件不存在: {video_path}") logger.info(f"开始上传视频: {video_path}") # 记录当前URL initial_url = self.page.url logger.info(f"上传前页面URL: {initial_url}") # 设置导航监听器,用于调试 navigation_events = [] console_messages = [] network_errors = [] def on_framenavigated(frame): """框架导航事件监听器""" try: if frame == self.page.main_frame: url = frame.url event_info = { "timestamp": asyncio.get_event_loop().time(), "url": url, "type": "framenavigated" } navigation_events.append(event_info) logger.warning(f"[导航监听] 主框架导航: {url}") except Exception as e: logger.warning(f"[导航监听] 记录框架导航失败: {e}") def on_console(msg): """控制台消息监听器""" try: msg_text = msg.text msg_type = msg.type # 只记录警告和错误,以及包含导航、跳转、redirect等关键词的消息 if msg_type in ['warning', 'error'] or any(keyword in msg_text.lower() for keyword in ['navigate', 'redirect', '跳转', '导航', 'location', 'href']): console_messages.append({ "timestamp": asyncio.get_event_loop().time(), "type": msg_type, "text": msg_text }) logger.warning(f"[控制台监听] {msg_type.upper()}: {msg_text}") except Exception as e: logger.debug(f"[控制台监听] 记录消息失败: {e}") def on_response(response): """网络响应监听器,用于捕捉认证失败""" try: status = response.status if status != 401: return url = response.url event_info = { "timestamp": asyncio.get_event_loop().time(), "status": status, "url": url } network_errors.append(event_info) logger.warning(f"[网络监听] 捕捉到 401 响应: {url}") except Exception as e: logger.debug(f"[网络监听] 记录响应失败: {e}") # 注册监听器 self.page.on("framenavigated", on_framenavigated) self.page.on("console", on_console) self.page.on("response", on_response) try: # 等待视频上传输入框 logger.info("等待视频上传输入框...") video_input = await self.page.wait_for_selector( "//input[@class='upload-input']", timeout=BrowserConfig.ELEMENT_TIMEOUT ) if not video_input: raise Exception("找不到视频上传输入框") logger.info("找到视频上传输入框,准备上传文件...") await self._ensure_on_publish_page() current_url_before_upload = self.page.url logger.info(f"上传前当前URL: {current_url_before_upload}") # 模拟人类行为:随机延迟和鼠标移动 logger.info("模拟人类行为...") await self._simulate_human_behavior() # 上传视频 logger.info("开始设置文件到上传输入框...") await video_input.set_input_files([video_path]) logger.info("文件已设置到上传输入框") # 等待一小段时间,观察是否有立即的导航 await asyncio.sleep(1) current_url_after_upload = self.page.url logger.info(f"上传后1秒当前URL: {current_url_after_upload}") if current_url_before_upload != current_url_after_upload: logger.warning(f"检测到URL变化: {current_url_before_upload} -> {current_url_after_upload}") # 如果重定向到登录页面(401错误),直接抛出异常 if "/login" in current_url_after_upload and "redirectReason=401" in current_url_after_upload: logger.error("上传文件时触发401错误,会话已失效") logger.error("这通常是因为:") logger.error("1. Cookie 已过期或失效") logger.error("2. 小红书检测到自动化行为") logger.error("3. 需要重新登录以刷新会话") raise Exception( "上传失败:会话已失效 (401 Unauthorized)。" "请执行以下操作之一:\n" "1. 调用 xiaohongshu_cleanup_login_session 清理会话\n" "2. 调用 xiaohongshu_start_login_session(fresh=True) 重新登录\n" "3. 手动在浏览器中登录小红书创作者中心" ) # 等待视频上传完成(视频上传时间较长) logger.info("开始等待视频上传完成...") await self._wait_for_video_upload_complete() # 记录最终URL和所有导航事件 final_url = self.page.url logger.info(f"上传完成后最终URL: {final_url}") logger.info(f"共检测到 {len(navigation_events)} 次导航事件") for i, event in enumerate(navigation_events, 1): logger.info(f"导航事件 {i}: {event}") logger.info(f"共检测到 {len(console_messages)} 条相关控制台消息") for i, msg in enumerate(console_messages, 1): logger.info(f"控制台消息 {i}: [{msg['type']}] {msg['text']}") logger.info(f"共检测到 {len(network_errors)} 条 401 响应") for i, error in enumerate(network_errors, 1): logger.info(f"401 响应 {i}: {error['url']}") # 如果有封面,上传封面 if cover_path and os.path.exists(cover_path): await self._upload_video_cover(cover_path) logger.info("视频上传完成") except PlaywrightTimeoutError: raise Exception("等待视频上传输入框超时") except Exception as e: # 记录错误时的URL和导航事件 error_url = self.page.url logger.error(f"上传视频时发生错误,当前URL: {error_url}") logger.error(f"错误发生前共检测到 {len(navigation_events)} 次导航事件") for i, event in enumerate(navigation_events, 1): logger.error(f"导航事件 {i}: {event}") logger.error(f"错误发生前共检测到 {len(console_messages)} 条相关控制台消息") for i, msg in enumerate(console_messages, 1): logger.error(f"控制台消息 {i}: [{msg['type']}] {msg['text']}") logger.error(f"错误发生前共检测到 {len(network_errors)} 条 401 响应") for i, error in enumerate(network_errors, 1): logger.error(f"401 响应 {i}: {error['url']}") if network_errors or ("/login" in error_url and "redirectReason=401" in error_url): raise Exception("视频上传失败:检测到登录状态失效 (401),请重新登录后重试") from e raise finally: # 移除监听器 try: self.page.remove_listener("framenavigated", on_framenavigated) self.page.remove_listener("console", on_console) self.page.remove_listener("response", on_response) except Exception as e: logger.debug(f"移除监听器失败: {e}") async def _upload_video_cover(self, cover_path: str): """ 上传视频封面 Args: cover_path: 封面路径 """ logger.info(f"上传视频封面: {cover_path}") try: # 等待封面上传按钮 cover_button = await self.page.wait_for_selector( "text=上传封面", timeout=BrowserConfig.ELEMENT_TIMEOUT ) if cover_button: await cover_button.click() # 等待封面上传输入框 cover_input = await self.page.wait_for_selector( "input[type='file'][accept*='image']", timeout=BrowserConfig.ELEMENT_TIMEOUT ) if cover_input: await cover_input.set_input_files([cover_path]) await asyncio.sleep(2) # 等待封面上传 logger.info("视频封面上传完成") except PlaywrightTimeoutError: logger.warning("上传视频封面超时,使用默认封面") async def _wait_for_upload_complete(self, expected_count: int): """ 等待图片上传完成 Args: expected_count: 期望的图片数量 """ logger.info(f"等待 {expected_count} 张图片上传完成") timeout = BrowserConfig.UPLOAD_TIMEOUT start_time = asyncio.get_event_loop().time() while True: current_time = asyncio.get_event_loop().time() if (current_time - start_time) * 1000 > timeout: raise Exception("等待图片上传完成超时") # 检查已上传的图片数量 uploaded_images = await self.page.query_selector_all( XiaohongshuSelectors.UPLOADED_IMAGE ) if len(uploaded_images) >= expected_count: logger.info(f"图片上传完成,共 {len(uploaded_images)} 张") break # 添加进度日志,减少用户焦虑 logger.info(f"上传进度: {len(uploaded_images)}/{expected_count}") await asyncio.sleep(2) # 增加检查间隔,减少CPU占用 async def _wait_for_video_upload_complete(self): """等待视频上传完成""" logger.info("等待视频上传完成") timeout = 5 * 60 * 1000 # 5分钟超时时间(毫秒) start_time = asyncio.get_event_loop().time() navigation_detected = False last_url = self.page.url check_count = 0 login_detected_at: Optional[float] = None logger.info(f"[等待上传] 初始URL: {last_url}") while True: current_time = asyncio.get_event_loop().time() elapsed_seconds = (current_time - start_time) check_count += 1 if elapsed_seconds * 1000 > timeout: raise Exception("等待视频上传完成超时") # 监控URL变化 try: current_url = self.page.url if current_url != last_url: logger.warning(f"[等待上传] URL变化检测 (检查 #{check_count}, 已等待 {elapsed_seconds:.1f}秒):") logger.warning(f" 从: {last_url}") logger.warning(f" 到: {current_url}") last_url = current_url navigation_detected = True if "/login" in current_url and "redirectReason=401" in current_url: if login_detected_at is None: login_detected_at = elapsed_seconds logger.warning("[等待上传] 检测到进入登录页,监控会话恢复") elif elapsed_seconds - login_detected_at > 15: raise Exception("检测到会话反复跳转至登录页,可能需要重新登录") else: login_detected_at = None except Exception as e: logger.debug(f"[等待上传] 获取URL失败: {e}") try: # 如果检测到导航,等待页面加载完成 if navigation_detected: logger.info("检测到页面导航,等待页面加载完成...") try: await self.page.wait_for_load_state("networkidle", timeout=10000) await asyncio.sleep(1) # 额外等待1秒确保页面稳定 navigation_detected = False logger.info("页面导航完成,继续检查上传状态") # 导航后检查是否需要重新选择视频标签 # 如果找不到发布按钮,可能需要重新选择标签 temp_button = await self.page.query_selector(XiaohongshuSelectors.VIDEO_PUBLISH_BUTTON) if not temp_button: logger.info("导航后未找到发布按钮,尝试重新选择视频标签") try: await self._select_video_publish_tab() await asyncio.sleep(1) # 等待标签切换 except Exception as e: logger.warning(f"重新选择视频标签失败: {e},继续检查") except PlaywrightTimeoutError: logger.warning("等待页面加载超时,继续检查") # 检查发布按钮是否可点击(视频上传完成的标志) # 每次循环都重新获取元素,避免元素失效 publish_button = await self.page.query_selector(XiaohongshuSelectors.VIDEO_PUBLISH_BUTTON) if publish_button: try: # 检查按钮是否可见 is_visible = await publish_button.is_visible() if is_visible: # 检查按钮是否被禁用 is_disabled = await publish_button.is_disabled() if not is_disabled: # 检查按钮class是否包含disabled class_name = await publish_button.get_attribute("class") if class_name and "disabled" not in class_name: logger.info("视频上传完成,发布按钮可点击") break except Exception as e: # 如果元素失效(可能是导航导致),标记导航并继续 if "Execution context was destroyed" in str(e) or "navigation" in str(e).lower(): logger.warning(f"检测到页面导航: {e}") navigation_detected = True await asyncio.sleep(2) continue else: raise # 检查是否有错误 try: error_element = await self.page.query_selector(XiaohongshuSelectors.ERROR_MESSAGE) if error_element: error_text = await error_element.text_content() raise Exception(f"视频上传失败: {error_text}") except Exception as e: # 如果是导航错误,继续等待 if "Execution context was destroyed" in str(e) or "navigation" in str(e).lower(): logger.warning(f"检查错误时检测到导航: {e}") navigation_detected = True await asyncio.sleep(2) continue elif "视频上传失败" in str(e): raise except Exception as e: # 捕获导航相关的错误 if "Execution context was destroyed" in str(e) or "navigation" in str(e).lower(): logger.warning(f"检测到页面导航: {e}") navigation_detected = True await asyncio.sleep(2) continue else: # 其他错误直接抛出 raise # 定期输出状态日志(每10次检查或每5秒) if check_count % 10 == 0 or elapsed_seconds % 5 < 2: try: current_url_status = self.page.url publish_button_status = "未找到" try: temp_button = await self.page.query_selector(XiaohongshuSelectors.VIDEO_PUBLISH_BUTTON) if temp_button: is_visible = await temp_button.is_visible() is_disabled = await temp_button.is_disabled() publish_button_status = f"可见={is_visible}, 禁用={is_disabled}" except: pass logger.info(f"[等待上传] 状态检查 #{check_count} - 已等待 {elapsed_seconds:.1f}秒, URL: {current_url_status}, 发布按钮: {publish_button_status}") except Exception as e: logger.debug(f"[等待上传] 状态检查失败: {e}") await asyncio.sleep(2) async def _fill_content(self, title: str, content: str, tags: List[str]): """ 填写发布内容 Args: title: 标题 content: 正文内容 tags: 标签列表 """ logger.info(f"填写发布内容: 标题={title}, 正文={content}, 标签={tags}") # 填写标题 await self._input_title(title) # 填写正文(如果有) if content: await self._input_content(content) # 添加标签 if tags: await self._input_tags(tags) async def _input_title(self, title: str): """ 输入标题 Args: title: 标题内容 """ logger.info(f"输入标题: {title}") try: title_input = await self.page.wait_for_selector( XiaohongshuSelectors.TITLE_INPUT, timeout=BrowserConfig.ELEMENT_TIMEOUT ) if title_input: # 使用 fill() 方法,它会自动清空输入框然后填入新内容 await title_input.fill(title) logger.info("标题输入完成") else: raise Exception("找不到标题输入框") except PlaywrightTimeoutError: raise Exception("等待标题输入框超时") async def _find_content_editor(self): """ 定位正文编辑区(查找容器内的可编辑元素) Returns: 编辑器元素,如果找不到则返回None """ try: # 先找到编辑器容器 container = await self.page.wait_for_selector( XiaohongshuSelectors.CONTENT_TEXTAREA, timeout=BrowserConfig.ELEMENT_TIMEOUT, state="visible" ) if not container: logger.warning("未找到正文编辑器容器") return None # 在容器内查找可编辑元素(contenteditable 或 textarea/input) # 优先查找 contenteditable 元素 editable_element = await container.query_selector('[contenteditable="true"]') if editable_element: logger.debug("找到 contenteditable 编辑器") return editable_element # 如果没有 contenteditable,查找 textarea 或 input textarea = await container.query_selector('textarea') if textarea: logger.debug("找到 textarea 编辑器") return textarea input_elem = await container.query_selector('input') if input_elem: logger.debug("找到 input 编辑器") return input_elem # 如果都没找到,尝试使用容器本身(某些情况下容器可能就是可编辑的) logger.debug("使用容器作为编辑器") return container except PlaywrightTimeoutError: logger.warning("未找到正文编辑器") return None except Exception as e: logger.warning(f"查找正文编辑器失败: {e}") return None async def _input_content(self, content: str): """ 输入正文内容(模拟手动输入,包括处理换行符) Args: content: 正文内容 """ import random logger.info(f"输入正文内容(模拟手动输入),长度: {len(content)} 字符") editor = await self._find_content_editor() if not editor: raise Exception("找不到正文输入框") try: await editor.click() await asyncio.sleep(0.3) # 检查元素是否可编辑 is_contenteditable = await editor.get_attribute("contenteditable") if is_contenteditable == "true": # 对于 contenteditable 元素,清空内容 await editor.evaluate("(el) => { el.textContent = ''; el.innerHTML = ''; }") await asyncio.sleep(0.2) else: # 对于 input/textarea,清空内容 await editor.fill("") await asyncio.sleep(0.2) # 模拟手动输入:逐字符输入,处理换行符 logger.info("开始模拟手动输入...") # 将内容按换行符分割,然后逐段输入 lines = content.split('\n') total_lines = len(lines) for line_idx, line in enumerate(lines): if line: # 如果行不为空,逐字符输入 # 逐字符输入,模拟真实打字速度 for char in line: await self.page.keyboard.type(char, delay=random.randint(30, 80)) # 偶尔添加更长的延迟,模拟思考停顿 if random.random() < 0.05: # 5% 的概率 await asyncio.sleep(random.uniform(0.1, 0.3)) # 如果不是最后一行,按 Enter 键换行 if line_idx < total_lines - 1: await asyncio.sleep(random.uniform(0.1, 0.2)) # 换行前短暂停顿 await self.page.keyboard.press("Enter") await asyncio.sleep(random.uniform(0.1, 0.2)) # 换行后短暂停顿 # 等待输入完成 await asyncio.sleep(0.3) # 验证内容是否已输入 if is_contenteditable == "true": actual_content = await editor.text_content() else: actual_content = await editor.input_value() if actual_content and len(actual_content.strip()) > 0: logger.info(f"模拟手动输入完成,实际输入长度: {len(actual_content)} 字符") else: logger.warning("输入后验证内容为空,但继续执行") # 再次点击确保焦点 await editor.click() await asyncio.sleep(0.2) logger.info("正文内容输入完成") except PlaywrightTimeoutError as e: logger.error(f"输入正文内容超时: {e}") raise Exception(f"输入正文内容失败: 超时 - {str(e)}") except Exception as e: logger.error(f"输入正文内容失败: {e}") raise Exception(f"输入正文内容失败: {e}") async def _input_tags(self, tags: List[str]): """ 输入标签(在正文编辑区以"#话题"形式输入) Args: tags: 标签列表 """ if not tags: return logger.info(f"输入标签: {tags}") # 定位正文编辑区 editor = await self._find_content_editor() if not editor: logger.warning("找不到正文编辑器,无法输入标签") return # 进入"可输入话题"的状态 await self._prepare_for_tag_input(editor) # 逐个输入标签 for tag in tags: await self._input_single_tag_in_editor(editor, tag) await asyncio.sleep(0.5) # 每个标签完成后等待500ms,给页面时间渲染标签块 async def _prepare_for_tag_input(self, editor): """ 进入"可输入话题"的状态 Args: editor: 编辑器元素 """ logger.debug("准备输入话题状态") try: # 点击编辑器确保焦点 await editor.click() await asyncio.sleep(0.3) # 方法1: 优先使用快捷键移动到文本末尾(最可靠的方法) logger.debug("使用快捷键移动光标到文本末尾") try: # 尝试使用 Ctrl+End (Windows/Linux) 或 Cmd+End (Mac) if platform.system() == 'Darwin': # Mac await self.page.keyboard.press("Meta+End") else: # Windows/Linux await self.page.keyboard.press("Control+End") await asyncio.sleep(0.3) # 等待光标移动完成 logger.debug("已使用快捷键移动光标") except Exception as e: logger.warning(f"快捷键移动光标失败: {e},尝试备用方案") # 备用:使用 End 键 try: await self.page.keyboard.press("End") await asyncio.sleep(0.3) except Exception: pass # 方法2: 使用 JavaScript 验证并确保光标在末尾 logger.debug("使用 JavaScript 验证并确保光标在文本末尾") cursor_at_end = await editor.evaluate(""" (element) => { try { // 检查是否是 textarea 或 input if (element.tagName === 'TEXTAREA' || element.tagName === 'INPUT') { // 验证光标位置 const cursorPos = element.selectionStart || element.selectionEnd; const textLength = element.value.length; if (cursorPos === textLength) { return true; // 光标已在末尾 } // 如果不在末尾,移动到末尾 element.focus(); element.setSelectionRange(textLength, textLength); return true; } // 对于 contenteditable 元素(如 ProseMirror 编辑器) if (element.contentEditable === 'true' || element.isContentEditable) { const selection = window.getSelection(); if (!selection || selection.rangeCount === 0) { // 没有选择,创建新的范围到末尾 const range = document.createRange(); range.selectNodeContents(element); range.collapse(false); selection.removeAllRanges(); selection.addRange(range); return true; } // 获取当前范围 const range = selection.getRangeAt(0); const container = range.endContainer; // 检查是否在末尾 let isAtEnd = false; if (container.nodeType === Node.TEXT_NODE) { // 文本节点:检查是否在文本末尾 isAtEnd = (range.endOffset === container.textContent.length); } else { // 元素节点:检查是否是最后一个子节点 isAtEnd = (!container.nextSibling && (!container.parentNode || container.parentNode === element || !container.parentNode.nextSibling)); } if (!isAtEnd) { // 不在末尾,移动到末尾 const newRange = document.createRange(); // 找到最后一个文本节点 let lastTextNode = null; const walker = document.createTreeWalker( element, NodeFilter.SHOW_TEXT, null ); let node; while (node = walker.nextNode()) { lastTextNode = node; } if (lastTextNode) { // 设置到最后一个文本节点的末尾 newRange.setStart(lastTextNode, lastTextNode.textContent.length); newRange.setEnd(lastTextNode, lastTextNode.textContent.length); } else { // 没有文本节点,移动到元素末尾 newRange.selectNodeContents(element); newRange.collapse(false); } selection.removeAllRanges(); selection.addRange(newRange); } return true; } return false; } catch (e) { console.error('移动光标失败:', e); return false; } } """) if not cursor_at_end: logger.warning("JavaScript 方法可能失败,再次尝试快捷键") # 再次尝试快捷键 try: if platform.system() == 'Darwin': await self.page.keyboard.press("Meta+End") else: await self.page.keyboard.press("Control+End") await asyncio.sleep(0.3) except Exception: pass # 等待光标移动完成 await asyncio.sleep(0.2) # 回车两次:创建新的段落或行,避免在已有inline元素中插入#导致联想不弹出 logger.debug("创建新行用于输入标签") await self.page.keyboard.press("Enter") await asyncio.sleep(0.2) await self.page.keyboard.press("Enter") await asyncio.sleep(0.3) # 增加等待时间,确保新行创建完成 logger.debug("已进入可输入话题状态") except Exception as e: logger.warning(f"准备输入话题状态失败: {e}") async def _input_single_tag_in_editor(self, editor, tag: str): """ 在编辑器中输入单个标签 Args: editor: 编辑器元素 tag: 标签内容(会自动去掉左侧的#) """ # 规范化:去掉左侧的# normalized_tag = tag.lstrip("#").strip() if not normalized_tag: logger.warning(f"标签为空,跳过: {tag}") return logger.debug(f"输入标签: {normalized_tag}") try: # 确保编辑器有焦点 await editor.click() await asyncio.sleep(0.2) # 确保光标在文本末尾(每次输入标签前都检查) logger.debug("确保光标在文本末尾") try: # 使用快捷键快速移动到末尾 if platform.system() == 'Darwin': await self.page.keyboard.press("Meta+End") else: await self.page.keyboard.press("Control+End") await asyncio.sleep(0.2) except Exception: # 备用:使用 End 键 try: await self.page.keyboard.press("End") await asyncio.sleep(0.2) except Exception: pass # 触发联想:先输入# await self.page.keyboard.type("#", delay=50) await asyncio.sleep(0.1) # 逐字符输入标签名(每字符约50ms延时) for ch in normalized_tag: await self.page.keyboard.type(ch, delay=50) # 等待联想容器出现(约1s) await asyncio.sleep(1.0) # 查找联想容器并选择第一项 picked = await self._try_pick_topic_suggestion() if not picked: # 未找到或没有联想项:输入一个空格结束当前话题,使其作为"自由话题"插入 await self.page.keyboard.press("Space") logger.debug(f"标签作为自由话题插入: {normalized_tag}") else: logger.debug(f"标签通过联想项选择: {normalized_tag}") except Exception as e: logger.warning(f"输入标签失败: {normalized_tag}, 错误: {e}") # 尝试输入空格作为兜底 try: await self.page.keyboard.press("Space") except Exception: pass async def _try_pick_topic_suggestion(self) -> bool: """ 尝试选择第一条话题建议 Returns: 是否成功选择 """ try: # 等待联想容器出现 container = await self.page.wait_for_selector( XiaohongshuSelectors.TOPIC_SUGGEST_CONTAINER, timeout=1000, state="visible" ) if container: # 使用 xpath 在整个页面中查找第一项(因为 xpath 是绝对路径) # 或者如果 xpath 是相对路径,可以在容器内查找 # 这里使用 page.query_selector 因为 xpath 是绝对路径 item = await self.page.query_selector(XiaohongshuSelectors.TOPIC_SUGGEST_ITEM) if item: # 检查项是否在容器内(可选验证) is_visible = await item.is_visible() if is_visible: await item.click() await asyncio.sleep(0.3) logger.debug("成功选择话题联想项") return True else: logger.debug("话题联想项不可见") else: logger.debug("联想容器存在但无建议项") else: logger.debug("未找到话题联想容器") except PlaywrightTimeoutError: logger.debug("等待话题联想容器超时") except Exception as e: logger.debug(f"选择话题建议失败: {e}") return False async def _click_publish_button(self, is_video: bool = False): """ 点击发布按钮 Args: is_video: 是否为视频发布 """ logger.info(f"点击发布按钮 ({'视频' if is_video else '图文'})") # 检查是否启用发布阻塞测试模式 if settings.PUBLISH_BLOCK_TEST: logger.warning("=" * 60) logger.warning(f"⚠️ 发布阻塞测试模式已启用 (PUBLISH_BLOCK_TEST=true)") logger.warning(f" 当前为 {'视频' if is_video else '图文'} 发布模式") logger.warning(f" 发布流程已阻塞,不会点击发布按钮") logger.warning(f" 请手动检查页面内容是否正确") logger.warning(f" 如需继续发布,请在 .env 中设置 PUBLISH_BLOCK_TEST=false") logger.warning("=" * 60) # 阻塞等待,直到用户手动操作或程序被中断 try: while True: await asyncio.sleep(10) # 每10秒输出一次提示 logger.info("发布阻塞测试模式:仍在等待中... (按 Ctrl+C 中断)") except KeyboardInterrupt: logger.info("发布阻塞测试模式:用户中断,退出阻塞") raise Exception("发布阻塞测试模式:用户中断,未执行发布操作") try: if is_video: # 视频发布:等待 button.publishBtn 变为可点击(无 disabled 属性且可见) # 因为视频处理需要较长时间,按钮可点击即表示处理完成 logger.info("等待视频发布按钮变为可点击...") publish_button = await self.page.wait_for_selector( XiaohongshuSelectors.VIDEO_PUBLISH_BUTTON, timeout=BrowserConfig.ELEMENT_TIMEOUT * 3, # 视频处理可能需要更长时间 state="visible" ) # 等待按钮变为可点击(无 disabled 属性) max_wait = 60 # 最多等待60秒 wait_interval = 0.5 waited = 0 while waited < max_wait: is_disabled = await publish_button.get_attribute("disabled") if is_disabled is None or is_disabled == "false": # 按钮可点击 break await asyncio.sleep(wait_interval) waited += wait_interval logger.debug(f"等待视频处理完成... ({waited:.1f}s)") if waited >= max_wait: logger.warning("视频处理超时,但继续尝试点击发布按钮") await publish_button.click() logger.info("视频发布按钮已点击") else: # 图文发布:使用 xpath 精确匹配"发布"按钮 # 注意:使用 xpath 的 normalize-space 确保精确匹配文本内容 publish_button = await self.page.wait_for_selector( XiaohongshuSelectors.IMAGE_PUBLISH_BUTTON, timeout=BrowserConfig.ELEMENT_TIMEOUT, state="visible" ) await publish_button.click() logger.info("图文发布按钮已点击") # 等待页面响应 await asyncio.sleep(1) except PlaywrightTimeoutError: raise Exception("等待发布按钮超时") except Exception as e: logger.error(f"点击发布按钮失败: {e}") raise Exception(f"点击发布按钮失败: {e}") async def _wait_for_publish_complete(self) -> Optional[str]: """ 等待发布完成 Returns: 发布成功返回笔记ID,失败返回None """ logger.info("等待发布完成") timeout = BrowserConfig.DEFAULT_TIMEOUT * 2 # 增加发布等待时间 start_time = asyncio.get_event_loop().time() last_log_time = start_time check_count = 0 # 记录初始URL initial_url = self.page.url while True: current_time = asyncio.get_event_loop().time() elapsed = (current_time - start_time) * 1000 check_count += 1 if elapsed > timeout: logger.error(f"等待发布完成超时({timeout}ms),已等待 {elapsed:.0f}ms") raise Exception(f"等待发布完成超时({timeout}ms)") # 1. 检查URL是否改变(发布成功后可能跳转到笔记页面) current_url = self.page.url if current_url != initial_url and "/discovery/item/" in current_url: logger.info(f"检测到URL变化,可能已跳转到笔记页面: {current_url}") logger.info("发布成功") return None # 不提取笔记ID,直接返回成功 # 2. 检查是否发布成功(多种方式) success_selectors = [ "text=发布成功", "text=笔记发布成功", "//div[contains(text(), '发布成功')]", "//div[contains(text(), '笔记发布成功')]" ] for success_selector in success_selectors: try: success_element = await self.page.query_selector(success_selector) if success_element: logger.info("检测到发布成功提示") logger.info("发布成功") return None # 不提取笔记ID,直接返回成功 except Exception: pass # 3. 检查是否有错误 error_selectors = [ XiaohongshuSelectors.ERROR_MESSAGE, "text=发布失败", "text=上传失败", "//div[contains(@class, 'error')]", "//div[contains(@class, 'toast-error')]" ] for error_selector in error_selectors: try: error_element = await self.page.query_selector(error_selector) if error_element: error_text = await error_element.text_content() if error_text and len(error_text.strip()) > 0: error_text_lower = error_text.lower() # 检查是否是正常的上传状态提示(不是错误) is_uploading_status = ( "图片上传中" in error_text or "上传中" in error_text or "请稍后" in error_text or "正在上传" in error_text or "处理中" in error_text ) if is_uploading_status: # 这是正常的上传状态,继续等待 if current_time - last_log_time >= 5.0: logger.info(f"检测到上传状态提示: {error_text},继续等待...") continue # 检测内容字符数限制相关的错误(如 "1232 /1000") is_char_limit_error = ( "/1000" in error_text or ("字符" in error_text and "1000" in error_text) or ("字数" in error_text and "1000" in error_text) or (error_text.strip().replace(" ", "").replace("/", "").isdigit() and "/1000" in error_text) ) if is_char_limit_error: # 抛出明确的错误信息 error_msg = ( f"内容字符数超过平台限制!" f"错误信息: {error_text}。" f"小红书平台限制内容最多1000字符,请缩短内容后重试。" ) logger.error(error_msg) raise Exception(error_msg) logger.error(f"检测到错误信息: {error_text}") raise Exception(f"发布失败: {error_text}") except Exception as e: if "发布失败" in str(e) or "内容字符数超过平台限制" in str(e): raise # 4. 检查发布按钮是否还存在(如果不存在,可能在发布中) publish_button = self.page.locator(XiaohongshuSelectors.IMAGE_PUBLISH_BUTTON) is_button_visible = await publish_button.is_visible() # 5. 每5秒记录一次详细状态(避免日志过多) if current_time - last_log_time >= 5.0: logger.info(f"正在发布中... (已等待 {elapsed/1000:.1f}s / {timeout/1000:.1f}s, 检查次数: {check_count}, 按钮可见: {is_button_visible})") last_log_time = current_time await asyncio.sleep(2) # 检查间隔 async def _dismiss_permission_popups(self): """关闭权限请求弹窗(如位置权限)""" try: # 方法1: 通过 JavaScript 阻止地理位置请求,避免触发权限弹窗 await self.page.evaluate(""" () => { // 阻止地理位置请求 if (navigator.geolocation) { navigator.geolocation.getCurrentPosition = function(success, error) { if (error) { error({ code: 1, message: "User denied Geolocation" }); } }; navigator.geolocation.watchPosition = function(success, error) { if (error) { error({ code: 1, message: "User denied Geolocation" }); } }; navigator.geolocation.clearWatch = function() {}; } } """) # 方法2: 尝试查找并点击浏览器权限弹窗的拒绝按钮 # 浏览器权限弹窗通常在页面加载后立即出现 await asyncio.sleep(0.5) # 尝试多种可能的拒绝按钮文本和选择器 deny_selectors = [ 'button:has-text("Never allow")', 'button:has-text("拒绝")', 'button:has-text("Block")', 'button:has-text("不允许")', 'button[aria-label*="Never allow"]', 'button[aria-label*="拒绝"]', '[role="button"]:has-text("Never allow")', '[role="button"]:has-text("拒绝")', ] for selector in deny_selectors: try: deny_button = await self.page.query_selector(selector) if deny_button: # 检查按钮是否可见 is_visible = await deny_button.is_visible() if is_visible: await deny_button.click() logger.info(f"已点击权限弹窗拒绝按钮: {selector}") await asyncio.sleep(0.3) break except Exception: continue # 方法3: 尝试按 ESC 键关闭弹窗(如果存在) try: await self.page.keyboard.press("Escape") await asyncio.sleep(0.2) except Exception: pass except Exception as e: logger.debug(f"处理权限弹窗时出错: {e}") async def _remove_popups(self): """移除弹窗""" try: # 等待页面加载完毕 - 检查上传容器是否存在或等待最多3秒 await self._wait_for_page_loaded() # 查找并关闭可能的弹窗 popup_close = await self.page.query_selector(XiaohongshuSelectors.POPUP_CLOSE) if popup_close: await popup_close.click() await asyncio.sleep(0.5) # 查找并点击短笔记提示按钮 short_note_tooltip_button = await self.page.query_selector('//button[contains(@class, "short-note-rooltip-button")]') if short_note_tooltip_button: await short_note_tooltip_button.click() await asyncio.sleep(0.5) # 点击空白区域关闭遮罩 modal_mask = await self.page.query_selector(XiaohongshuSelectors.MODAL_MASK) if modal_mask: await modal_mask.click() await asyncio.sleep(0.5) except Exception as e: logger.debug(f"移除弹窗时出错: {e}") async def _wait_for_page_loaded(self): """等待页面加载完毕""" try: # 等待上传容器出现或超时3秒 await self.page.wait_for_selector( '//div[contains(@class, "upload-container")]', timeout=3000, state="visible" ) logger.debug("页面加载完毕 - 上传容器已出现") except PlaywrightTimeoutError: # 超时也继续执行,可能页面已经加载完毕但没有上传容器 logger.debug("等待上传容器超时,继续执行") except Exception as e: logger.debug(f"等待页面加载时出错: {e}") async def _click_empty_position(self): """点击页面空白位置""" try: # 点击页面中央空白区域 await self.page.click("body", position={"x": 500, "y": 300}) await asyncio.sleep(0.5) except Exception as e: logger.debug(f"点击空白位置时出错: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/luyike221/xiaohongshu-mcp-python'

If you have feedback or need assistance with the MCP directory API, please join our Discord server