Skip to main content
Glama
chenningling

Xiaohongshu (RedBook) MCP Server

by chenningling
xiaohongshu_mcp.py69.7 kB
from typing import Any, List, Dict, Optional import asyncio import json import os import pandas as pd from datetime import datetime from playwright.async_api import async_playwright from fastmcp import FastMCP # 初始化 FastMCP 服务器 mcp = FastMCP("xiaohongshu_scraper") # 全局变量 BROWSER_DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "browser_data") DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S") # 确保目录存在 os.makedirs(BROWSER_DATA_DIR, exist_ok=True) os.makedirs(DATA_DIR, exist_ok=True) # 用于存储浏览器上下文,以便在不同方法之间共享 browser_context = None main_page = None is_logged_in = False def process_url(url: str) -> str: """处理URL,确保格式正确并保留所有参数 Args: url: 原始URL Returns: str: 处理后的URL """ processed_url = url.strip() # 移除可能的@符号前缀 if processed_url.startswith('@'): processed_url = processed_url[1:] # 确保URL使用https协议 if processed_url.startswith('http://'): processed_url = 'https://' + processed_url[7:] elif not processed_url.startswith('https://'): processed_url = 'https://' + processed_url # 如果URL不包含www.xiaohongshu.com,则添加它 if 'xiaohongshu.com' in processed_url and 'www.xiaohongshu.com' not in processed_url: processed_url = processed_url.replace('xiaohongshu.com', 'www.xiaohongshu.com') return processed_url async def ensure_browser(): """确保浏览器已启动并登录""" global browser_context, main_page, is_logged_in if browser_context is None: # 启动浏览器 playwright_instance = await async_playwright().start() # 使用持久化上下文来保存用户状态 browser_context = await playwright_instance.chromium.launch_persistent_context( user_data_dir=BROWSER_DATA_DIR, headless=False, # 非隐藏模式,方便用户登录 viewport={"width": 1280, "height": 800}, timeout=60000 ) # 创建一个新页面 if browser_context.pages: main_page = browser_context.pages[0] else: main_page = await browser_context.new_page() # 设置页面级别的超时时间 main_page.set_default_timeout(60000) # 检查登录状态 if not is_logged_in: # 访问小红书首页 if main_page: # 添加空检查 await main_page.goto("https://www.xiaohongshu.com", timeout=60000) await asyncio.sleep(3) # 检查是否已登录 login_elements = await main_page.query_selector_all('text="登录"') if main_page else [] # 添加空检查 if login_elements: return False # 需要登录 else: is_logged_in = True return True # 已登录 else: return False # main_page为None,需要重新初始化 return True @mcp.tool() async def login() -> str: """登录小红书账号""" global is_logged_in await ensure_browser() if is_logged_in: return "已登录小红书账号" # 访问小红书登录页面 if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" await main_page.goto("https://www.xiaohongshu.com", timeout=60000) await asyncio.sleep(3) # 查找登录按钮并点击 login_elements = await main_page.query_selector_all('text="登录"') if main_page else [] # 添加空检查 if login_elements: await login_elements[0].click() # 提示用户手动登录 message = "请在打开的浏览器窗口中完成登录操作。登录成功后,系统将自动继续。" # 等待用户登录成功 max_wait_time = 180 # 等待3分钟 wait_interval = 5 waited_time = 0 while waited_time < max_wait_time: # 检查是否已登录成功 if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" still_login = await main_page.query_selector_all('text="登录"') if not still_login: is_logged_in = True await asyncio.sleep(2) # 等待页面加载 return "登录成功!" # 继续等待 await asyncio.sleep(wait_interval) waited_time += wait_interval return "登录等待超时。请重试或手动登录后再使用其他功能。" else: is_logged_in = True return "已登录小红书账号" @mcp.tool() async def search_notes(keywords: str, limit: int = 5) -> str: """根据关键词搜索笔记 Args: keywords: 搜索关键词 limit: 返回结果数量限制 """ login_status = await ensure_browser() if not login_status: return "请先登录小红书账号" if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" # 构建搜索URL并访问 search_url = f"https://www.xiaohongshu.com/search_result?keyword={keywords}" try: await main_page.goto(search_url, timeout=60000) await asyncio.sleep(5) # 等待页面加载 # 等待页面完全加载 await asyncio.sleep(5) # 打印页面HTML用于调试 page_html = await main_page.content() print(f"页面HTML片段: {page_html[10000:10500]}...") # 使用更精确的选择器获取帖子卡片 print("尝试获取帖子卡片...") if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" post_cards = await main_page.query_selector_all('section.note-item') print(f"找到 {len(post_cards)} 个帖子卡片") if not post_cards: # 尝试备用选择器 if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" post_cards = await main_page.query_selector_all('div[data-v-a264b01a]') print(f"使用备用选择器找到 {len(post_cards)} 个帖子卡片") post_links = [] post_titles = [] for card in post_cards: try: # 获取链接 link_element = await card.query_selector('a[href*="/search_result/"]') if card else None # 添加空检查 if not link_element: continue href = await link_element.get_attribute('href') if href and '/search_result/' in href: # 构建完整URL if href.startswith('/'): full_url = f"https://www.xiaohongshu.com{href}" else: full_url = href post_links.append(full_url) # 尝试获取帖子标题 try: # 打印卡片HTML用于调试 card_html = await card.inner_html() if card else "" # 添加空检查 print(f"卡片HTML片段: {card_html[:200]}...") # 首先尝试获取卡片内的footer中的标题 title_element = await card.query_selector('div.footer a.title span') if card else None # 添加空检查 if title_element: title = await title_element.text_content() print(f"找到标题(方法1): {title}") else: # 尝试直接获取标题元素 title_element = await card.query_selector('a.title span') if card else None # 添加空检查 if title_element: title = await title_element.text_content() print(f"找到标题(方法2): {title}") else: # 尝试获取任何可能的文本内容 text_elements = await card.query_selector_all('span') if card else [] # 添加空检查 potential_titles = [] for text_el in text_elements: text = await text_el.text_content() if text_el else "" # 添加空检查 if text and len(text.strip()) > 5: potential_titles.append(text.strip()) if potential_titles: # 选择最长的文本作为标题 title = max(potential_titles, key=len) if potential_titles else "未知标题" # 添加空检查 print(f"找到可能的标题(方法3): {title}") else: # 尝试直接获取卡片中的所有文本 if not card: # 添加空检查 title = "未知标题" print("卡片为空,使用默认值'未知标题'") else: all_text = await card.evaluate('el => Array.from(el.querySelectorAll("*")).map(node => node.textContent).filter(text => text && text.trim().length > 5)') if all_text and isinstance(all_text, list) and all_text: # 使用布尔检查替代len # 选择最长的文本作为标题 title = max(all_text, key=len) print(f"找到可能的标题(方法4): {title}") else: title = "未知标题" print("无法找到标题,使用默认值'未知标题'") # 如果获取到的标题为空,设为未知标题 if not title or (isinstance(title, str) and title.strip() == ""): # 增加类型检查 title = "未知标题" print("获取到的标题为空,使用默认值'未知标题'") except Exception as e: print(f"获取标题时出错: {str(e)}") title = "未知标题" post_titles.append(title) except Exception as e: print(f"处理帖子卡片时出错: {str(e)}") # 去重 unique_posts = [] seen_urls = set() for url, title in zip(post_links, post_titles): if url not in seen_urls: seen_urls.add(url) unique_posts.append({"url": url, "title": title}) # 限制返回数量 unique_posts = unique_posts[:limit] # 格式化返回结果 if unique_posts: result = "搜索结果:\n\n" for i, post in enumerate(unique_posts, 1): result += f"{i}. {post['title']}\n 链接: {post['url']}\n\n" return result else: return f"未找到与\"{keywords}\"相关的笔记" except Exception as e: return f"搜索笔记时出错: {str(e)}" @mcp.tool() async def get_note_content(url: str) -> str: """获取笔记内容 Args: url: 笔记 URL """ login_status = await ensure_browser() if not login_status: return "请先登录小红书账号" if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" try: # 使用通用URL处理函数 processed_url = process_url(url) print(f"处理后的URL: {processed_url}") # 访问帖子链接,保留完整参数 await main_page.goto(processed_url, timeout=60000) await asyncio.sleep(10) # 增加等待时间到10秒 # 检查是否加载了错误页面 if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" error_page = await main_page.evaluate(''' () => { // 检查常见的错误信息 const errorTexts = [ "当前笔记暂时无法浏览", "内容不存在", "页面不存在", "内容已被删除" ]; for (const text of errorTexts) { if (document.body.innerText.includes(text)) { return { isError: true, errorText: text }; } } return { isError: false }; } ''') if error_page.get("isError", False): return f"无法获取笔记内容: {error_page.get('errorText', '未知错误')}\n请检查链接是否有效或尝试使用带有有效token的完整URL。" # 增强滚动操作以确保所有内容加载 if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" await main_page.evaluate(''' () => { // 先滚动到页面底部 window.scrollTo(0, document.body.scrollHeight); setTimeout(() => { // 然后滚动到中间 window.scrollTo(0, document.body.scrollHeight / 2); }, 1000); setTimeout(() => { // 最后回到顶部 window.scrollTo(0, 0); }, 2000); } ''') await asyncio.sleep(3) # 等待滚动完成和内容加载 # 打印页面结构片段用于分析 try: print("打印页面结构片段用于分析") if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" page_structure = await main_page.evaluate(''' () => { // 获取笔记内容区域 const noteContent = document.querySelector('.note-content'); const detailDesc = document.querySelector('#detail-desc'); const commentArea = document.querySelector('.comments-container, .comment-list'); return { hasNoteContent: !!noteContent, hasDetailDesc: !!detailDesc, hasCommentArea: !!commentArea, noteContentHtml: noteContent ? noteContent.outerHTML.slice(0, 500) : null, detailDescHtml: detailDesc ? detailDesc.outerHTML.slice(0, 500) : null, commentAreaFirstChild: commentArea ? (commentArea.firstElementChild ? commentArea.firstElementChild.outerHTML.slice(0, 500) : null) : null, pageTitle: document.title, bodyText: document.body.innerText.slice(0, 500) }; } ''') print(f"页面结构分析: {json.dumps(page_structure, ensure_ascii=False, indent=2)}") # 再次检查内容是否可见 if "当前笔记暂时无法浏览" in page_structure.get("bodyText", ""): return "无法获取笔记内容: 当前笔记暂时无法浏览\n请检查链接是否有效或尝试使用带有有效token的完整URL。" except Exception as e: print(f"打印页面结构时出错: {str(e)}") # 获取帖子内容 post_content = {} # 获取帖子标题 - 方法1:使用id选择器 try: print("尝试获取标题 - 方法1:使用id选择器") if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" title_element = await main_page.query_selector('#detail-title') if title_element: title = await title_element.text_content() post_content["标题"] = title.strip() if title else "未知标题" print(f"方法1获取到标题: {post_content['标题']}") else: print("方法1未找到标题元素") post_content["标题"] = "未知标题" except Exception as e: print(f"方法1获取标题出错: {str(e)}") post_content["标题"] = "未知标题" # 获取帖子标题 - 方法2:使用class选择器 if post_content["标题"] == "未知标题": try: print("尝试获取标题 - 方法2:使用class选择器") if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" title_element = await main_page.query_selector('div.title') if title_element: title = await title_element.text_content() post_content["标题"] = title.strip() if title else "未知标题" print(f"方法2获取到标题: {post_content['标题']}") else: print("方法2未找到标题元素") except Exception as e: print(f"方法2获取标题出错: {str(e)}") # 获取帖子标题 - 方法3:使用JavaScript if post_content["标题"] == "未知标题": try: print("尝试获取标题 - 方法3:使用JavaScript") if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" title = await main_page.evaluate(''' () => { // 尝试多种可能的标题选择器 const selectors = [ '#detail-title', 'div.title', 'h1', 'div.note-content div.title' ]; for (const selector of selectors) { const el = document.querySelector(selector); if (el && el.textContent.trim()) { return el.textContent.trim(); } } return null; } ''') if title: post_content["标题"] = title print(f"方法3获取到标题: {post_content['标题']}") else: print("方法3未找到标题元素") except Exception as e: print(f"方法3获取标题出错: {str(e)}") # 获取作者 - 方法1:使用username类选择器 try: print("尝试获取作者 - 方法1:使用username类选择器") author_element = await main_page.query_selector('span.username') if author_element: author = await author_element.text_content() post_content["作者"] = author.strip() if author else "未知作者" print(f"方法1获取到作者: {post_content['作者']}") else: print("方法1未找到作者元素") post_content["作者"] = "未知作者" except Exception as e: print(f"方法1获取作者出错: {str(e)}") post_content["作者"] = "未知作者" # 获取作者 - 方法2:使用链接选择器 if post_content["作者"] == "未知作者": try: print("尝试获取作者 - 方法2:使用链接选择器") author_element = await main_page.query_selector('a.name') if author_element: author = await author_element.text_content() post_content["作者"] = author.strip() if author else "未知作者" print(f"方法2获取到作者: {post_content['作者']}") else: print("方法2未找到作者元素") except Exception as e: print(f"方法2获取作者出错: {str(e)}") # 获取作者 - 方法3:使用JavaScript if post_content["作者"] == "未知作者": try: print("尝试获取作者 - 方法3:使用JavaScript") author = await main_page.evaluate(''' () => { // 尝试多种可能的作者选择器 const selectors = [ 'span.username', 'a.name', '.author-wrapper .username', '.info .name' ]; for (const selector of selectors) { const el = document.querySelector(selector); if (el && el.textContent.trim()) { return el.textContent.trim(); } } return null; } ''') if author: post_content["作者"] = author print(f"方法3获取到作者: {post_content['作者']}") else: print("方法3未找到作者元素") except Exception as e: print(f"方法3获取作者出错: {str(e)}") # 获取发布时间 - 方法1:使用date类选择器 try: print("尝试获取发布时间 - 方法1:使用date类选择器") time_element = await main_page.query_selector('span.date') if time_element: time_text = await time_element.text_content() post_content["发布时间"] = time_text.strip() if time_text else "未知" print(f"方法1获取到发布时间: {post_content['发布时间']}") else: print("方法1未找到发布时间元素") post_content["发布时间"] = "未知" except Exception as e: print(f"方法1获取发布时间出错: {str(e)}") post_content["发布时间"] = "未知" # 获取发布时间 - 方法2:使用正则表达式匹配 if post_content["发布时间"] == "未知": try: print("尝试获取发布时间 - 方法2:使用正则表达式匹配") time_selectors = [ 'text=/编辑于/', r'text=/\d{2}-\d{2}/', r'text=/\d{4}-\d{2}-\d{2}/', r'text=/\d+月\d+日/', r'text=/\d+天前/', r'text=/\d+小时前/', 'text=/今天/', 'text=/昨天/' ] for selector in time_selectors: time_element = await main_page.query_selector(selector) if time_element: time_text = await time_element.text_content() post_content["发布时间"] = time_text.strip() if time_text else "未知" print(f"方法2获取到发布时间: {post_content['发布时间']}") break else: print(f"方法2未找到发布时间元素: {selector}") except Exception as e: print(f"方法2获取发布时间出错: {str(e)}") # 获取发布时间 - 方法3:使用JavaScript if post_content["发布时间"] == "未知": try: print("尝试获取发布时间 - 方法3:使用JavaScript") time_text = await main_page.evaluate(''' () => { // 尝试多种可能的时间选择器 const selectors = [ 'span.date', '.bottom-container .date', '.date' ]; for (const selector of selectors) { const el = document.querySelector(selector); if (el && el.textContent.trim()) { return el.textContent.trim(); } } // 尝试查找包含日期格式的文本 const dateRegexes = [ /编辑于\\s*([\\d-]+)/, /(\\d{2}-\\d{2})/, /(\\d{4}-\\d{2}-\\d{2})/, /(\\d+月\\d+日)/, /(\\d+天前)/, /(\\d+小时前)/, /(今天)/, /(昨天)/ ]; const allText = document.body.textContent; for (const regex of dateRegexes) { const match = allText.match(regex); if (match) { return match[0]; } } return null; } ''') if time_text: post_content["发布时间"] = time_text print(f"方法3获取到发布时间: {post_content['发布时间']}") else: print("方法3未找到发布时间元素") except Exception as e: print(f"方法3获取发布时间出错: {str(e)}") # 获取帖子正文内容 - 方法1:使用精确的ID和class选择器 try: print("尝试获取正文内容 - 方法1:使用精确的ID和class选择器") # 先明确标记评论区域 await main_page.evaluate(''' () => { const commentSelectors = [ '.comments-container', '.comment-list', '.feed-comment', 'div[data-v-aed4aacc]', // 根据您提供的评论HTML结构 '.content span.note-text' // 评论中的note-text结构 ]; for (const selector of commentSelectors) { const elements = document.querySelectorAll(selector); elements.forEach(el => { if (el) { el.setAttribute('data-is-comment', 'true'); console.log('标记评论区域:', el.tagName, el.className); } }); } } ''') # 先尝试获取detail-desc和note-text组合 content_element = await main_page.query_selector('#detail-desc .note-text') if content_element: # 检查是否在评论区域内 is_in_comment = await content_element.evaluate('(el) => !!el.closest("[data-is-comment=\'true\']") || false') if not is_in_comment: content_text = await content_element.text_content() if content_text and len(content_text.strip()) > 50: # 增加长度阈值 post_content["内容"] = content_text.strip() print(f"方法1获取到正文内容,长度: {len(post_content['内容'])}") else: print(f"方法1获取到的内容太短: {len(content_text.strip()) if content_text else 0}") post_content["内容"] = "未能获取内容" else: print("方法1找到的元素在评论区域内,跳过") post_content["内容"] = "未能获取内容" else: print("方法1未找到正文内容元素") post_content["内容"] = "未能获取内容" except Exception as e: print(f"方法1获取正文内容出错: {str(e)}") post_content["内容"] = "未能获取内容" # 获取帖子正文内容 - 方法2:使用XPath选择器 if post_content["内容"] == "未能获取内容": try: print("尝试获取正文内容 - 方法2:使用XPath选择器") # 使用XPath获取笔记内容区域 content_text = await main_page.evaluate(''' () => { const xpath = '//div[@id="detail-desc"]/span[@class="note-text"]'; const result = document.evaluate(xpath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null); const element = result.singleNodeValue; return element ? element.textContent.trim() : null; } ''') if content_text and len(content_text) > 20: post_content["内容"] = content_text print(f"方法2获取到正文内容,长度: {len(post_content['内容'])}") else: print(f"方法2获取到的内容太短或为空: {len(content_text) if content_text else 0}") except Exception as e: print(f"方法2获取正文内容出错: {str(e)}") # 获取帖子正文内容 - 方法3:使用JavaScript获取最长文本 if post_content["内容"] == "未能获取内容": try: print("尝试获取正文内容 - 方法3:使用JavaScript获取最长文本") content_text = await main_page.evaluate(''' () => { // 定义评论区域选择器 const commentSelectors = [ '.comments-container', '.comment-list', '.feed-comment', 'div[data-v-aed4aacc]', '.comment-item', '[data-is-comment="true"]' ]; // 找到所有评论区域 let commentAreas = []; for (const selector of commentSelectors) { const elements = document.querySelectorAll(selector); elements.forEach(el => commentAreas.push(el)); } // 查找可能的内容元素,排除评论区 const contentElements = Array.from(document.querySelectorAll('div#detail-desc, div.note-content, div.desc, span.note-text')) .filter(el => { // 检查是否在评论区域内 const isInComment = commentAreas.some(commentArea => commentArea && commentArea.contains(el)); if (isInComment) { console.log('排除评论区域内容:', el.tagName, el.className); return false; } const text = el.textContent.trim(); return text.length > 100 && text.length < 10000; }) .sort((a, b) => b.textContent.length - a.textContent.length); if (contentElements.length > 0) { console.log('找到内容元素:', contentElements[0].tagName, contentElements[0].className); return contentElements[0].textContent.trim(); } return null; } ''') if content_text and len(content_text) > 100: # 增加长度阈值 post_content["内容"] = content_text print(f"方法3获取到正文内容,长度: {len(post_content['内容'])}") else: print(f"方法3获取到的内容太短或为空: {len(content_text) if content_text else 0}") except Exception as e: print(f"方法3获取正文内容出错: {str(e)}") # 获取帖子正文内容 - 方法4:区分正文和评论内容 if post_content["内容"] == "未能获取内容": try: print("尝试获取正文内容 - 方法4:区分正文和评论内容") content_text = await main_page.evaluate(''' () => { // 首先尝试获取note-content区域 const noteContent = document.querySelector('.note-content'); if (noteContent) { // 查找note-text,这通常包含主要内容 const noteText = noteContent.querySelector('.note-text'); if (noteText && noteText.textContent.trim().length > 50) { return noteText.textContent.trim(); } // 如果没有找到note-text或内容太短,返回整个note-content if (noteContent.textContent.trim().length > 50) { return noteContent.textContent.trim(); } } // 如果上面的方法都失败了,尝试获取所有段落并拼接 const paragraphs = Array.from(document.querySelectorAll('p')) .filter(p => { // 排除评论区段落 const isInComments = p.closest('.comments-container, .comment-list'); return !isInComments && p.textContent.trim().length > 10; }); if (paragraphs.length > 0) { return paragraphs.map(p => p.textContent.trim()).join('\n\n'); } return null; } ''') if content_text and len(content_text) > 50: post_content["内容"] = content_text print(f"方法4获取到正文内容,长度: {len(post_content['内容'])}") else: print(f"方法4获取到的内容太短或为空: {len(content_text) if content_text else 0}") except Exception as e: print(f"方法4获取正文内容出错: {str(e)}") # 获取帖子正文内容 - 方法5:直接通过DOM结构定位 if post_content["内容"] == "未能获取内容": try: print("尝试获取正文内容 - 方法5:直接通过DOM结构定位") content_text = await main_page.evaluate(''' () => { // 根据您提供的HTML结构直接定位 const noteContent = document.querySelector('div.note-content'); if (noteContent) { const detailTitle = noteContent.querySelector('#detail-title'); const detailDesc = noteContent.querySelector('#detail-desc'); if (detailDesc) { const noteText = detailDesc.querySelector('span.note-text'); if (noteText) { return noteText.textContent.trim(); } return detailDesc.textContent.trim(); } } // 尝试其他可能的结构 const descElements = document.querySelectorAll('div.desc'); for (const desc of descElements) { // 检查是否在评论区 const isInComment = desc.closest('.comments-container, .comment-list, .feed-comment'); if (!isInComment && desc.textContent.trim().length > 100) { return desc.textContent.trim(); } } return null; } ''') if content_text and len(content_text) > 100: post_content["内容"] = content_text print(f"方法5获取到正文内容,长度: {len(post_content['内容'])}") else: print(f"方法5获取到的内容太短或为空: {len(content_text) if content_text else 0}") except Exception as e: print(f"方法5获取正文内容出错: {str(e)}") # 格式化返回结果 result = f"标题: {post_content['标题']}\n" result += f"作者: {post_content['作者']}\n" result += f"发布时间: {post_content['发布时间']}\n" result += f"链接: {url}\n\n" result += f"内容:\n{post_content['内容']}" return result except Exception as e: return f"获取笔记内容时出错: {str(e)}" @mcp.tool() async def get_note_comments(url: str) -> str: """获取笔记评论 Args: url: 笔记 URL """ login_status = await ensure_browser() if not login_status: return "请先登录小红书账号" if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" try: # 处理URL processed_url = process_url(url) print(f"处理后的评论URL: {processed_url}") # 访问帖子链接 await main_page.goto(processed_url, timeout=60000) await asyncio.sleep(5) # 等待页面加载 # 检查是否加载了错误页面 if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" error_page = await main_page.evaluate(''' () => { // 检查常见的错误信息 const errorTexts = [ "当前笔记暂时无法浏览", "内容不存在", "页面不存在", "内容已被删除" ]; for (const text of errorTexts) { if (document.body.innerText.includes(text)) { return { isError: true, errorText: text }; } } return { isError: false }; } ''') if error_page.get("isError", False): return f"无法获取笔记评论: {error_page.get('errorText', '未知错误')}\n请检查链接是否有效或尝试使用带有有效token的完整URL。" # 先滚动到评论区 if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" comment_section_locators = [] try: comment_section_locators = [ main_page.get_by_text("条评论", exact=False), main_page.get_by_text("评论", exact=False), main_page.locator("text=评论").first ] except Exception as e: print(f"创建评论区定位器时出错: {str(e)}") # 继续执行,不阻断程序 for locator in comment_section_locators: try: if locator and await locator.count() > 0: # 添加空检查 await locator.scroll_into_view_if_needed(timeout=5000) await asyncio.sleep(2) break except Exception as e: print(f"滚动到评论区时出错: {str(e)}") continue # 滚动页面以加载更多评论 for i in range(8): try: if not main_page: # 添加空检查 break await main_page.evaluate("window.scrollBy(0, 500)") await asyncio.sleep(1) # 尝试点击"查看更多评论"按钮 more_comment_selectors = [ "text=查看更多评论", "text=展开更多评论", "text=加载更多", "text=查看全部" ] for selector in more_comment_selectors: try: if not main_page: # 添加空检查 break more_btn = main_page.locator(selector).first if more_btn and await more_btn.count() > 0 and await more_btn.is_visible(): # 添加空检查 await more_btn.click() await asyncio.sleep(2) except Exception as e: print(f"点击查看更多按钮时出错: {str(e)}") continue except Exception as e: print(f"滚动页面加载更多评论时出错: {str(e)}") pass # 获取评论 comments = [] # 使用特定评论选择器 comment_selectors = [ "div.comment-item", "div.commentItem", "div.comment-content", "div.comment-wrapper", "section.comment", "div.feed-comment" ] if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" for selector in comment_selectors: try: comment_elements = main_page.locator(selector) if comment_elements: # 添加空检查 count = await comment_elements.count() if count > 0: for i in range(count): try: comment_element = comment_elements.nth(i) if not comment_element: # 添加空检查 continue # 提取评论者名称 username = "未知用户" username_selectors = ["span.user-name", "a.name", "div.username", "span.nickname", "a.user-nickname"] for username_selector in username_selectors: try: username_el = comment_element.locator(username_selector).first if username_el and await username_el.count() > 0: # 添加空检查 username_text = await username_el.text_content() if username_text: # 添加空检查 username = username_text.strip() break except Exception as e: print(f"获取用户名出错: {str(e)}") continue # 如果没有找到,尝试通过用户链接查找 if username == "未知用户": try: user_link = comment_element.locator('a[href*="/user/profile/"]').first if user_link and await user_link.count() > 0: # 添加空检查 username_text = await user_link.text_content() if username_text: # 添加空检查 username = username_text.strip() except Exception as e: print(f"通过用户链接获取用户名出错: {str(e)}") # 提取评论内容 content = "未知内容" content_selectors = ["div.content", "p.content", "div.text", "span.content", "div.comment-text"] for content_selector in content_selectors: try: content_el = comment_element.locator(content_selector).first if content_el and await content_el.count() > 0: # 添加空检查 content_text = await content_el.text_content() if content_text: # 添加空检查 content = content_text.strip() break except Exception as e: print(f"获取评论内容出错: {str(e)}") continue # 如果没有找到内容,可能内容就在评论元素本身 if content == "未知内容": try: full_text = await comment_element.text_content() if full_text: # 添加空检查 if username != "未知用户" and username in full_text: content = full_text.replace(username, "").strip() else: content = full_text.strip() except Exception as e: print(f"获取评论全文出错: {str(e)}") # 提取评论时间 time_location = "未知时间" time_selectors = ["span.time", "div.time", "span.date", "div.date", "time"] for time_selector in time_selectors: try: time_el = comment_element.locator(time_selector).first if time_el and await time_el.count() > 0: # 添加空检查 time_text = await time_el.text_content() if time_text: # 添加空检查 time_location = time_text.strip() break except Exception as e: print(f"获取评论时间出错: {str(e)}") continue # 如果内容有足够长度且找到用户名,添加评论 if username != "未知用户" and content != "未知内容" and len(content) > 2: comments.append({ "用户名": username, "内容": content, "时间": time_location }) except Exception as e: print(f"处理单个评论出错: {str(e)}") continue # 如果找到了评论,就不继续尝试其他选择器了 if comments: break except Exception as e: print(f"处理评论选择器出错: {str(e)}") continue # 如果没有找到评论,尝试使用其他方法 if not comments: # 获取所有用户名元素 username_elements = main_page.locator('a[href*="/user/profile/"]') username_count = await username_elements.count() if username_count > 0: for i in range(username_count): try: username_element = username_elements.nth(i) username = await username_element.text_content() # 尝试获取评论内容 content = await main_page.evaluate(''' (usernameElement) => { const parent = usernameElement.parentElement; if (!parent) return null; // 尝试获取同级的下一个元素 let sibling = usernameElement.nextElementSibling; while (sibling) { const text = sibling.textContent.trim(); if (text) return text; sibling = sibling.nextElementSibling; } // 尝试获取父元素的文本,并过滤掉用户名 const allText = parent.textContent.trim(); if (allText && allText.includes(usernameElement.textContent.trim())) { return allText.replace(usernameElement.textContent.trim(), '').trim(); } return null; } ''', username_element) if username and content: comments.append({ "用户名": username.strip(), "内容": content.strip(), "时间": "未知时间" }) except Exception: continue # 格式化返回结果 if comments: result = f"共获取到 {len(comments)} 条评论:\n\n" for i, comment in enumerate(comments, 1): result += f"{i}. {comment['用户名']}({comment['时间']}): {comment['内容']}\n\n" return result else: return "未找到任何评论,可能是帖子没有评论或评论区无法访问。" except Exception as e: return f"获取评论时出错: {str(e)}" @mcp.tool() async def analyze_note(url: str) -> dict: """获取并分析笔记内容,返回笔记的详细信息供AI生成评论 Args: url: 笔记 URL """ login_status = await ensure_browser() if not login_status: return {"error": "请先登录小红书账号"} try: # 处理URL processed_url = process_url(url) # 直接调用get_note_content获取笔记内容 note_content_result = await get_note_content(processed_url) # 检查是否获取成功 if note_content_result.startswith("请先登录") or note_content_result.startswith("无法获取笔记内容") or note_content_result.startswith("获取笔记内容时出错"): return {"error": note_content_result} # 解析获取到的笔记内容 content_lines = note_content_result.strip().split('\n') post_content = {} # 提取标题、作者、发布时间和内容 for i, line in enumerate(content_lines): if line.startswith("标题:"): post_content["标题"] = line.replace("标题:", "").strip() elif line.startswith("作者:"): post_content["作者"] = line.replace("作者:", "").strip() elif line.startswith("发布时间:"): post_content["发布时间"] = line.replace("发布时间:", "").strip() elif line.startswith("内容:"): # 内容可能有多行,获取剩余所有行 content_text = "\n".join(content_lines[i+1:]).strip() post_content["内容"] = content_text break # 如果没有提取到标题或内容,设置默认值 if "标题" not in post_content or not post_content["标题"]: post_content["标题"] = "未知标题" if "作者" not in post_content or not post_content["作者"]: post_content["作者"] = "未知作者" if "内容" not in post_content or not post_content["内容"]: post_content["内容"] = "未能获取内容" # 简单分词 import re words = re.findall(r'\w+', f"{post_content.get('标题', '')} {post_content.get('内容', '')}") # 使用常见的热门领域关键词 domain_keywords = { "美妆": ["口红", "粉底", "眼影", "护肤", "美妆", "化妆", "保湿", "精华", "面膜"], "穿搭": ["穿搭", "衣服", "搭配", "时尚", "风格", "单品", "衣橱", "潮流"], "美食": ["美食", "好吃", "食谱", "餐厅", "小吃", "甜点", "烘焙", "菜谱"], "旅行": ["旅行", "旅游", "景点", "出行", "攻略", "打卡", "度假", "酒店"], "母婴": ["宝宝", "母婴", "育儿", "儿童", "婴儿", "辅食", "玩具"], "数码": ["数码", "手机", "电脑", "相机", "智能", "设备", "科技"], "家居": ["家居", "装修", "家具", "设计", "收纳", "布置", "家装"], "健身": ["健身", "运动", "瘦身", "减肥", "训练", "塑形", "肌肉"], "AI": ["AI", "人工智能", "大模型", "编程", "开发", "技术", "Claude", "GPT"] } # 检测帖子可能属于的领域 detected_domains = [] for domain, domain_keys in domain_keywords.items(): for key in domain_keys: if key.lower() in post_content.get("标题", "").lower() or key.lower() in post_content.get("内容", "").lower(): detected_domains.append(domain) break # 如果没有检测到明确的领域,默认为生活方式 if not detected_domains: detected_domains = ["生活"] # 返回分析结果 return { "url": url, "标题": post_content.get("标题", "未知标题"), "作者": post_content.get("作者", "未知作者"), "内容": post_content.get("内容", "未能获取内容"), "领域": detected_domains, "关键词": list(set(words))[:20] # 取前20个不重复的词作为关键词 } except Exception as e: return {"error": f"分析笔记内容时出错: {str(e)}"} @mcp.tool() async def post_smart_comment(url: str, comment_type: str = "引流") -> dict: """ 根据帖子内容发布智能评论,增加曝光并引导用户关注或私聊 Args: url: 笔记 URL comment_type: 评论类型,可选值: "引流" - 引导用户关注或私聊 "点赞" - 简单互动获取好感 "咨询" - 以问题形式增加互动 "专业" - 展示专业知识建立权威 Returns: dict: 包含笔记信息和评论类型的字典,供MCP客户端(如Claude)生成评论 """ # 处理URL processed_url = process_url(url) # 获取笔记内容 note_info = await analyze_note(processed_url) if "error" in note_info: return {"error": note_info["error"]} # 评论类型指导 comment_guides = { "引流": '生成一条表达认同并引导互动的评论。可以提到自己也在研究相关内容,或表达希望进一步交流的意愿。可以在结尾加上"有更多问题欢迎私信我"或"想了解更多可以找我聊聊"等邀请语句。', "点赞": '生成一条简短的赞美评论,表达对内容的喜爱和支持。可以提到作者名字和笔记的领域,如"太赞了!XX的分享总是这么实用"或"喜欢这种深度分享"等。', "咨询": '生成一条提问式评论,针对笔记内容询问更多细节或相关信息。可以使用"请问博主"或"想请教一下"等开头,并提出与笔记内容相关的具体问题。', "专业": '生成一条展示专业知识的评论,针对笔记内容提供专业见解或补充信息。可以使用"作为该领域从业者"或"从专业角度来看"等开头,并在评论中使用与笔记领域相关的专业术语。' } # 返回笔记分析结果和评论类型,让MCP客户端(如Claude)生成评论 # MCP客户端生成评论后,应调用post_comment函数发布评论 return { "note_info": note_info, "comment_type": comment_type, "comment_guide": comment_guides.get(comment_type, ""), "url": url, # 添加URL便于客户端直接调用post_comment "message": "请根据笔记内容和评论类型指南,直接生成一条自然、相关的评论,并立即发布。注意以下要点:\n1. 在评论中引用作者名称或笔记领域,增加个性化\n2. 使用口语化表达,简短凝练,不超过30字\n3. 根据评论类型适当添加互动引导或专业术语\n生成后,直接使用post_comment函数发布评论,无需询问用户确认" } @mcp.tool() async def post_comment(url: str, comment: str) -> str: """发布评论到指定笔记 Args: url: 笔记 URL comment: 要发布的评论内容 """ login_status = await ensure_browser() if not login_status: return "请先登录小红书账号,才能发布评论" if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" try: # 处理URL processed_url = process_url(url) print(f"处理后的评论URL: {processed_url}") # 访问帖子链接 await main_page.goto(processed_url, timeout=60000) await asyncio.sleep(5) # 等待页面加载 # 检查是否加载了错误页面 if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" error_page = await main_page.evaluate(''' () => { // 检查常见的错误信息 const errorTexts = [ "当前笔记暂时无法浏览", "内容不存在", "页面不存在", "内容已被删除" ]; for (const text of errorTexts) { if (document.body.innerText.includes(text)) { return { isError: true, errorText: text }; } } return { isError: false }; } ''') if error_page.get("isError", False): return f"无法发布评论: {error_page.get('errorText', '未知错误')}\n请检查链接是否有效或尝试使用带有有效token的完整URL。" # 定位评论区域并滚动到该区域 comment_area_found = False comment_area_selectors = [ 'text="条评论"', 'text="共 " >> xpath=..', 'text=/\\d+ 条评论/', 'text="评论"', 'div.comment-container' ] for selector in comment_area_selectors: try: if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" element = await main_page.query_selector(selector) if element: await element.scroll_into_view_if_needed() await asyncio.sleep(2) comment_area_found = True break except Exception as e: print(f"定位评论区域时出错: {str(e)}") continue if not comment_area_found: # 如果没有找到评论区域,尝试滚动到页面底部 if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" await main_page.evaluate('window.scrollTo(0, document.body.scrollHeight)') await asyncio.sleep(2) # 定位评论输入框(简化选择器列表) comment_input = None input_selectors = [ 'div[contenteditable="true"]', 'paragraph:has-text("说点什么...")', 'text="说点什么..."', 'text="评论发布后所有人都能看到"' ] # 尝试常规选择器 for selector in input_selectors: try: if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" element = await main_page.query_selector(selector) if element and await element.is_visible(): await element.scroll_into_view_if_needed() await asyncio.sleep(1) comment_input = element break except Exception as e: print(f"定位评论输入框时出错: {str(e)}") continue # 如果常规选择器失败,使用JavaScript查找 if not comment_input: # 使用更精简的JavaScript查找输入框 if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" js_result = await main_page.evaluate(''' () => { // 查找可编辑元素 const editableElements = Array.from(document.querySelectorAll('[contenteditable="true"]')); if (editableElements.length > 0) return true; // 查找包含"说点什么"的元素 const placeholderElements = Array.from(document.querySelectorAll('*')) .filter(el => el.textContent && el.textContent.includes('说点什么')); return placeholderElements.length > 0; } ''') if js_result: # 如果JS检测到输入框,尝试点击页面底部 if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" await main_page.evaluate('window.scrollTo(0, document.body.scrollHeight)') await asyncio.sleep(1) # 尝试再次查找输入框 for selector in input_selectors: try: if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" element = await main_page.query_selector(selector) if element and await element.is_visible(): comment_input = element break except Exception as e: print(f"尝试再次查找输入框时出错: {str(e)}") continue if not comment_input: return "未能找到评论输入框,无法发布评论" # 输入评论内容 await comment_input.click() await asyncio.sleep(1) if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" await main_page.keyboard.type(comment) await asyncio.sleep(1) # 发送评论(简化发送逻辑) send_success = False # 方法1: 尝试点击发送按钮 try: if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" send_button = await main_page.query_selector('button:has-text("发送")') if send_button and await send_button.is_visible(): await send_button.click() await asyncio.sleep(2) send_success = True except Exception as e: print(f"点击发送按钮出错: {str(e)}") # 方法2: 如果方法1失败,尝试使用Enter键 if not send_success: try: if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" await main_page.keyboard.press("Enter") await asyncio.sleep(2) send_success = True except Exception as e: print(f"使用Enter键发送出错: {str(e)}") # 方法3: 如果方法2失败,尝试使用JavaScript点击发送按钮 if not send_success: try: if not main_page: # 添加空检查 return "浏览器初始化失败,请重试" js_send_result = await main_page.evaluate(''' () => { const sendButtons = Array.from(document.querySelectorAll('button')) .filter(btn => btn.textContent && btn.textContent.includes('发送')); if (sendButtons.length > 0) { sendButtons[0].click(); return true; } return false; } ''') await asyncio.sleep(2) send_success = js_send_result except Exception as e: print(f"使用JavaScript点击发送按钮出错: {str(e)}") if send_success: return f"已成功发布评论:{comment}" else: return f"发布评论失败,请检查评论内容或网络连接" except Exception as e: return f"发布评论时出错: {str(e)}" if __name__ == "__main__": # 初始化并运行服务器 print("启动小红书MCP服务器...") print("请在MCP客户端(如Claude for Desktop)中配置此服务器") mcp.run(transport='stdio')

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/chenningling/Redbook-Search-Comment-MCP2.0'

If you have feedback or need assistance with the MCP directory API, please join our Discord server