#!/usr/bin/env python3
"""
Feed 列表调试脚本
详细调试 Feed 列表获取功能
"""
import asyncio
import json
from browser.browser import BrowserManager
from xiaohongshu.feeds import FeedsListAction
from loguru import logger
async def debug_feeds():
"""调试 Feed 列表"""
print("="*60)
print("Feed 列表调试")
print("="*60)
# 创建浏览器(非无头模式,方便查看)
print("\n1. 启动浏览器...")
browser = BrowserManager(headless=False)
await browser.start()
print(" ✓ 浏览器已启动")
try:
# 创建页面
print("\n2. 创建页面...")
page = await browser.new_page()
print(" ✓ 页面已创建")
# 导航到探索页面
print("\n3. 导航到小红书探索页...")
await page.goto("https://www.xiaohongshu.com/explore")
print(" ✓ 页面已加载")
# 等待页面完全加载(增加超时时间)
print("\n4. 等待页面加载...")
try:
await page.wait_for_load_state("load", timeout=60000)
print(" ✓ 页面基本加载完成")
except Exception as e:
print(f" ⚠️ 等待超时,继续执行: {e}")
# 等待一下让页面稳定
await asyncio.sleep(3)
# 等待一下让 JavaScript 执行
await asyncio.sleep(2)
# 检查页面标题
title = await page.title()
print(f"\n5. 页面标题: {title}")
# 检查 URL
url = page.url
print(f" 当前 URL: {url}")
# 尝试获取 __INITIAL_STATE__
print("\n6. 检查 __INITIAL_STATE__...")
has_initial_state = await page.evaluate("""
() => {
return typeof window.__INITIAL_STATE__ !== 'undefined';
}
""")
print(f" __INITIAL_STATE__ 存在: {has_initial_state}")
if has_initial_state:
# 检查数据结构
print("\n7. 检查数据结构...")
state_structure = await page.evaluate("""
() => {
const state = window.__INITIAL_STATE__;
return {
hasFeed: 'feed' in state,
hasFeeds: state.feed && 'feeds' in state.feed,
hasFeedsValue: state.feed && state.feed.feeds && '_value' in state.feed.feeds,
feedKeys: state.feed ? Object.keys(state.feed) : [],
feedsKeys: state.feed && state.feed.feeds ? Object.keys(state.feed.feeds) : []
};
}
""")
print(f" 数据结构: {json.dumps(state_structure, indent=2, ensure_ascii=False)}")
# 尝试获取 feeds 数据
print("\n8. 尝试获取 feeds 数据...")
feeds_data = await page.evaluate("""
() => {
const state = window.__INITIAL_STATE__;
if (state && state.feed && state.feed.feeds && state.feed.feeds._value) {
return state.feed.feeds._value;
}
return null;
}
""")
if feeds_data:
print(f" ✓ 成功获取 {len(feeds_data)} 条 Feed")
# 显示前 3 条
print("\n9. 前 3 条 Feed:")
for i, feed in enumerate(feeds_data[:3], 1):
title = feed.get('noteCard', {}).get('displayTitle', 'N/A')
feed_id = feed.get('id', 'N/A')
print(f" {i}. [{feed_id}] {title[:50]}...")
# 显示第一条的完整结构
print("\n10. 第一条 Feed 的数据结构:")
if feeds_data:
first_feed = feeds_data[0]
print(f" Keys: {list(first_feed.keys())}")
print(f" ID: {first_feed.get('id')}")
print(f" xsecToken: {first_feed.get('xsecToken', 'N/A')[:20]}...")
print(f" modelType: {first_feed.get('modelType')}")
note_card = first_feed.get('noteCard', {})
print(f" NoteCard keys: {list(note_card.keys())}")
# 测试使用 FeedsListAction
print("\n11. 测试 FeedsListAction...")
action = FeedsListAction(page)
try:
feeds = await action.get_feeds_list()
print(f" ✓ FeedsListAction 成功获取 {len(feeds)} 条 Feed")
except Exception as e:
print(f" ✗ FeedsListAction 失败: {e}")
import traceback
traceback.print_exc()
else:
print(" ✗ 无法获取 feeds 数据")
# 尝试其他可能的路径
print("\n 尝试其他数据路径...")
alternative_data = await page.evaluate("""
() => {
const state = window.__INITIAL_STATE__;
// 尝试不同的路径
const paths = [
'feed.feeds._value',
'feed.feeds.value',
'feed.list',
'feeds',
'explore.feeds'
];
const results = {};
for (const path of paths) {
try {
const parts = path.split('.');
let current = state;
for (const part of parts) {
current = current[part];
if (!current) break;
}
if (current) {
results[path] = Array.isArray(current) ? current.length : typeof current;
}
} catch (e) {
// ignore
}
}
return results;
}
""")
print(f" 可能的数据路径: {json.dumps(alternative_data, indent=2)}")
else:
print(" ✗ __INITIAL_STATE__ 不存在")
print("\n 可能的原因:")
print(" - 页面还未完全加载")
print(" - 需要登录")
print(" - 页面结构已变化")
# 截图保存
print("\n12. 保存页面截图...")
await page.screenshot(path="debug_feeds.png")
print(" ✓ 截图已保存: debug_feeds.png")
# 保存页面 HTML
print("\n13. 保存页面 HTML...")
html = await page.content()
with open("debug_feeds.html", "w", encoding="utf-8") as f:
f.write(html)
print(" ✓ HTML 已保存: debug_feeds.html")
# 等待查看
print("\n按 Ctrl+C 关闭浏览器...")
try:
await asyncio.sleep(30)
except KeyboardInterrupt:
pass
except Exception as e:
print(f"\n✗ 调试过程出错: {e}")
import traceback
traceback.print_exc()
finally:
print("\n14. 关闭浏览器...")
await browser.close()
print(" ✓ 浏览器已关闭")
print("\n" + "="*60)
print("调试完成")
print("="*60)
print("\n生成的文件:")
print(" - debug_feeds.png (页面截图)")
print(" - debug_feeds.html (页面 HTML)")
if __name__ == "__main__":
try:
asyncio.run(debug_feeds())
except KeyboardInterrupt:
print("\n\n调试被用户中断")
except Exception as e:
print(f"\n\n调试失败: {e}")
import traceback
traceback.print_exc()