"""发布图文功能"""
from typing import List, Optional
from pathlib import Path
import asyncio
import random
from playwright.async_api import Page
from loguru import logger
from anti_detection import get_anti_detection_manager
class PublishAction:
"""发布操作"""
def __init__(self, page: Page):
self.page = page
self.url_of_public = "https://creator.xiaohongshu.com/publish/publish?source=official"
self._anti_detection = get_anti_detection_manager()
async def publish(
self,
title: str,
content: str,
images: List[str],
tags: Optional[List[str]] = None
) -> bool:
"""发布图文内容"""
if not images:
raise ValueError("图片不能为空")
logger.info(f"Publishing content: {title}")
try:
# 操作前检查 - 速率限制和随机延迟
if not await self._anti_detection.before_action("publish", min_delay=2000, max_delay=5000):
raise Exception("发布操作被速率限制阻止")
# 1. 导航到发布页面
await self.page.goto(self.url_of_public, timeout=60000)
await self.page.wait_for_load_state("load", timeout=60000)
# 人类化延迟 - 模拟阅读页面
await self._anti_detection.human_behavior.reading_delay(50)
# 2. 移除遮挡元素
await self._remove_all_overlays()
# 3. 点击"上传图文" TAB(页面默认是"上传视频")
logger.info("点击'上传图文' TAB...")
if not await self._click_publish_tab("上传图文"):
raise Exception("点击上传图文 TAB 失败")
await self._anti_detection.human_behavior.random_delay(1500, 2500)
# 4. 上传图片
await self._upload_images(images)
# 5. 限制标签数量
if tags and len(tags) > 10:
logger.warning("标签数量超过10,截取前10个标签")
tags = tags[:10]
logger.info(f"发布内容: title={title}, images={len(images)}, tags={tags}")
# 6. 提交发布
await self._submit_publish(title, content, tags or [])
# 操作后处理
await self._anti_detection.after_action("publish", self.page)
logger.info("Content published successfully")
return True
except Exception as e:
logger.error(f"Failed to publish content: {e}")
raise
async def _click_publish_tab(self, tabname: str) -> bool:
"""点击发布 TAB"""
try:
# 随机延迟 - 模拟人类寻找元素
await self._anti_detection.human_behavior.random_delay(1000, 2000)
# 先移除所有可能的遮挡元素
logger.info("移除页面遮挡元素...")
await self._remove_all_overlays()
# 等待上传内容区域出现
try:
await self.page.wait_for_selector("div.upload-content", timeout=10000)
except:
logger.warning("未找到 upload-content,继续尝试...")
# 随机鼠标移动
await self._anti_detection.human_behavior.random_mouse_movement(self.page)
# 方法1: 使用 Playwright 的 has-text 选择器
logger.info(f"方法1: 尝试使用 has-text 选择器点击 '{tabname}'")
try:
tab = await self.page.query_selector(f'div.creator-tab:has-text("{tabname}")')
if tab:
await tab.click(timeout=5000)
logger.info("方法1成功")
return True
except Exception as e:
logger.warning(f"方法1失败: {e}")
# 方法2: 使用 JavaScript 强制点击
logger.info(f"方法2: 尝试使用 JavaScript 点击 '{tabname}'")
try:
clicked = await self.page.evaluate(f"""
() => {{
const tabs = document.querySelectorAll('div.creator-tab');
for (const tab of tabs) {{
if (tab.textContent.trim() === '{tabname}') {{
tab.click();
return true;
}}
}}
return false;
}}
""")
if clicked:
logger.info("方法2成功")
return True
except Exception as e:
logger.warning(f"方法2失败: {e}")
# 方法3: 查找所有 TAB 并逐个尝试
logger.info(f"方法3: 遍历所有 TAB 查找 '{tabname}'")
try:
tabs = await self.page.query_selector_all("div.creator-tab")
for tab in tabs:
text = await tab.text_content()
if text and tabname in text.strip():
# 先滚动到元素可见
await tab.scroll_into_view_if_needed()
await asyncio.sleep(0.5)
# 使用 force 参数强制点击
await tab.click(force=True)
logger.info("方法3成功")
return True
except Exception as e:
logger.warning(f"方法3失败: {e}")
logger.error(f"所有方法都失败,无法点击 TAB - {tabname}")
return False
except Exception as e:
logger.error(f"点击发布 TAB 失败: {e}")
return False
async def _remove_all_overlays(self):
"""移除所有遮挡元素"""
try:
removed = await self.page.evaluate("""
() => {
const removed = [];
// 移除所有可能的遮挡元素
const selectors = [
'.d-popover',
'.modal',
'.dialog',
'[class*="mask"]',
'[class*="overlay"]',
'[class*="guide"]',
'[class*="tutorial"]',
'[class*="popup"]',
'[class*="toast"]'
];
selectors.forEach(sel => {
try {
const elements = document.querySelectorAll(sel);
elements.forEach(el => {
const style = window.getComputedStyle(el);
if (style.display !== 'none' && style.visibility !== 'hidden') {
el.remove();
removed.push(sel);
}
});
} catch (e) {
// 忽略错误
}
});
return removed;
}
""")
if removed:
logger.info(f"移除了 {len(removed)} 个遮挡元素")
except Exception as e:
logger.warning(f"移除遮挡元素失败: {e}")
async def _get_tab_element(self, tabname: str):
"""获取 TAB 元素"""
try:
tabs = await self.page.query_selector_all("div.creator-tab")
for tab in tabs:
# 检查是否可见
if not await tab.is_visible():
continue
# 检查文本
text = await tab.text_content()
if text and text.strip() == tabname:
return tab
return None
except Exception as e:
logger.debug(f"获取发布 TAB 元素失败: {e}")
return None
async def _is_element_blocked(self, elem) -> bool:
"""检查元素是否被遮挡"""
try:
result = await elem.evaluate("""
(element) => {
const rect = element.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) {
return true;
}
const x = rect.left + rect.width / 2;
const y = rect.top + rect.height / 2;
const target = document.elementFromPoint(x, y);
return !(target === element || element.contains(target));
}
""")
return result
except:
return False
async def _remove_pop_cover(self):
"""移除弹窗遮挡"""
try:
# 移除弹窗
popover = await self.page.query_selector("div.d-popover")
if popover:
await popover.evaluate("element => element.remove()")
# 点击空白位置
await self._click_empty_position()
except:
pass
async def _click_empty_position(self):
"""点击空白位置"""
x = 380 + random.randint(0, 100)
y = 20 + random.randint(0, 60)
await self.page.mouse.click(x, y)
async def _upload_images(self, image_paths: List[str]):
"""上传图片"""
# 验证文件路径
valid_paths = []
for path in image_paths:
if Path(path).exists():
valid_paths.append(path)
logger.info(f"获取有效图片:{path}")
else:
logger.warning(f"图片文件不存在: {path}")
if not valid_paths:
raise ValueError("没有有效的图片文件")
# 人类化延迟 - 模拟选择文件
await self._anti_detection.human_behavior.random_delay(800, 1500)
# 等待上传输入框
upload_input = await self.page.wait_for_selector(".upload-input", timeout=30000)
# 上传文件
await upload_input.set_input_files(valid_paths)
# 等待上传完成
await self._wait_for_upload_complete(len(valid_paths))
async def _wait_for_upload_complete(self, expected_count: int):
"""等待上传完成"""
max_wait_time = 60
check_interval = 0.5
start_time = asyncio.get_event_loop().time()
logger.info(f"开始等待图片上传完成, expected_count={expected_count}")
while asyncio.get_event_loop().time() - start_time < max_wait_time:
try:
# 检查已上传的图片
uploaded_images = await self.page.query_selector_all(".img-preview-area .pr")
current_count = len(uploaded_images)
logger.info(f"检测到已上传图片: current_count={current_count}, expected_count={expected_count}")
if current_count >= expected_count:
logger.info(f"所有图片上传完成, count={current_count}")
return
except:
pass
await asyncio.sleep(check_interval)
raise TimeoutError("上传超时,请检查网络连接和图片大小")
async def _submit_publish(self, title: str, content: str, tags: List[str]):
"""提交发布"""
# 1. 填写标题 - 人类化输入
title_input = await self.page.wait_for_selector("div.d-input input")
# 随机延迟
await self._anti_detection.human_behavior.random_delay(500, 1000)
# 使用人类化打字
await self._anti_detection.human_behavior.human_type(title_input, title)
# 检查标题长度
await asyncio.sleep(0.5)
await self._check_title_max_length()
logger.info("检查标题长度:通过")
await self._anti_detection.human_behavior.random_delay(800, 1500)
# 2. 填写内容
content_elem = await self._get_content_element()
if not content_elem:
raise Exception("没有找到内容输入框")
# 随机延迟
await self._anti_detection.human_behavior.random_delay(500, 1000)
# 使用人类化打字
await self._anti_detection.human_behavior.human_type(content_elem, content)
# 3. 输入标签
await self._input_tags(content_elem, tags)
await self._anti_detection.human_behavior.random_delay(1000, 2000)
# 检查正文长度
await self._check_content_max_length()
logger.info("检查正文长度:通过")
# 4. 点击发布按钮 - 随机鼠标移动
await self._anti_detection.human_behavior.random_mouse_movement(self.page)
submit_button = await self.page.wait_for_selector("div.submit div.d-button-content")
await submit_button.click()
await self._anti_detection.human_behavior.random_delay(2000, 3000)
async def _check_title_max_length(self):
"""检查标题长度"""
max_suffix = await self.page.query_selector("div.title-container div.max_suffix")
if max_suffix:
length_text = await max_suffix.text_content()
raise ValueError(f"标题长度超过限制: {length_text}")
async def _check_content_max_length(self):
"""检查正文长度"""
length_error = await self.page.query_selector("div.edit-container div.length-error")
if length_error:
length_text = await length_error.text_content()
raise ValueError(f"正文长度超过限制: {length_text}")
async def _get_content_element(self):
"""获取内容输入框"""
# 尝试方法1: 查找 ql-editor
content_elem = await self.page.query_selector("div.ql-editor")
if content_elem:
return content_elem
# 尝试方法2: 通过 placeholder 查找
content_elem = await self._find_textbox_by_placeholder()
if content_elem:
return content_elem
logger.warning("未找到内容输入框")
return None
async def _find_textbox_by_placeholder(self):
"""通过 placeholder 查找输入框"""
try:
# 查找所有 p 元素
p_elements = await self.page.query_selector_all("p")
# 查找包含指定 placeholder 的元素
for elem in p_elements:
placeholder = await elem.get_attribute("data-placeholder")
if placeholder and "输入正文描述" in placeholder:
# 向上查找 textbox 父元素
current = elem
for _ in range(5):
parent = await current.evaluate_handle("element => element.parentElement")
parent_elem = parent.as_element()
if not parent_elem:
break
role = await parent_elem.get_attribute("role")
if role == "textbox":
return parent_elem
current = parent_elem
return None
except:
return None
async def _input_tags(self, content_elem, tags: List[str]):
"""输入标签"""
if not tags:
return
await self._anti_detection.human_behavior.random_delay(800, 1500)
# 移动到内容末尾
for _ in range(20):
await content_elem.press("ArrowDown")
await asyncio.sleep(0.01)
# 换行
await content_elem.press("Enter")
await content_elem.press("Enter")
await self._anti_detection.human_behavior.random_delay(500, 1000)
# 输入每个标签
for tag in tags:
tag = tag.lstrip("#")
await self._input_single_tag(content_elem, tag)
async def _input_single_tag(self, content_elem, tag: str):
"""输入单个标签"""
# 输入 #
await content_elem.type("#")
await self._anti_detection.human_behavior.random_delay(200, 400)
# 逐字输入标签 - 使用人类化打字
await self._anti_detection.human_behavior.human_type(
content_elem,
tag,
min_delay=0.05,
max_delay=0.15
)
await self._anti_detection.human_behavior.random_delay(800, 1500)
# 尝试点击联想选项
try:
topic_container = await self.page.query_selector("#creator-editor-topic-container")
if topic_container:
first_item = await topic_container.query_selector(".item")
if first_item:
await first_item.click()
logger.info(f"成功点击标签联想选项: {tag}")
await self._anti_detection.human_behavior.random_delay(200, 400)
return
except:
pass
# 如果没有联想选项,输入空格
logger.warning(f"未找到标签联想选项,直接输入空格: {tag}")
await content_elem.type(" ")
await self._anti_detection.human_behavior.random_delay(300, 600)