Skip to main content
Glama

Xiaohongshu Search & Comment MCP Server

by chenningling
xiaohongshu_mcp.py34.9 kB
from typing import Any, List, Dict, Optional import asyncio import json import os import pandas as pd from datetime import datetime from playwright.async_api import async_playwright from fastmcp import FastMCP # 初始化 FastMCP 服务器 mcp = FastMCP("xiaohongshu_scraper") # 全局变量 BROWSER_DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "browser_data") DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S") # 确保目录存在 os.makedirs(BROWSER_DATA_DIR, exist_ok=True) os.makedirs(DATA_DIR, exist_ok=True) # 用于存储浏览器上下文,以便在不同方法之间共享 browser_context = None main_page = None is_logged_in = False async def ensure_browser(): """确保浏览器已启动并登录""" global browser_context, main_page, is_logged_in if browser_context is None: # 启动浏览器 playwright_instance = await async_playwright().start() # 使用持久化上下文来保存用户状态 browser_context = await playwright_instance.chromium.launch_persistent_context( user_data_dir=BROWSER_DATA_DIR, headless=False, # 非隐藏模式,方便用户登录 viewport={"width": 1280, "height": 800}, timeout=60000 ) # 创建一个新页面 if browser_context.pages: main_page = browser_context.pages[0] else: main_page = await browser_context.new_page() # 设置页面级别的超时时间 main_page.set_default_timeout(60000) # 检查登录状态 if not is_logged_in: # 访问小红书首页 await main_page.goto("https://www.xiaohongshu.com", timeout=60000) await asyncio.sleep(3) # 检查是否已登录 login_elements = await main_page.query_selector_all('text="登录"') if login_elements: return False # 需要登录 else: is_logged_in = True return True # 已登录 return True @mcp.tool() async def login() -> str: """登录小红书账号""" global is_logged_in await ensure_browser() if is_logged_in: return "已登录小红书账号" # 访问小红书登录页面 await main_page.goto("https://www.xiaohongshu.com", timeout=60000) await asyncio.sleep(3) # 查找登录按钮并点击 login_elements = await main_page.query_selector_all('text="登录"') if login_elements: await login_elements[0].click() # 提示用户手动登录 message = "请在打开的浏览器窗口中完成登录操作。登录成功后,系统将自动继续。" # 等待用户登录成功 max_wait_time = 180 # 等待3分钟 wait_interval = 5 waited_time = 0 while waited_time < max_wait_time: # 检查是否已登录成功 still_login = await main_page.query_selector_all('text="登录"') if not still_login: is_logged_in = True await asyncio.sleep(2) # 等待页面加载 return "登录成功!" # 继续等待 await asyncio.sleep(wait_interval) waited_time += wait_interval return "登录等待超时。请重试或手动登录后再使用其他功能。" else: is_logged_in = True return "已登录小红书账号" @mcp.tool() async def search_notes(keywords: str, limit: int = 5) -> str: """根据关键词搜索笔记 Args: keywords: 搜索关键词 limit: 返回结果数量限制 """ login_status = await ensure_browser() if not login_status: return "请先登录小红书账号" # 构建搜索URL并访问 search_url = f"https://www.xiaohongshu.com/search_result?keyword={keywords}" try: await main_page.goto(search_url, timeout=60000) await asyncio.sleep(5) # 等待页面加载 # 获取所有帖子卡片 post_elements = await main_page.query_selector_all('a[href*="/search_result/"]') post_links = [] post_titles = [] for element in post_elements: href = await element.get_attribute('href') if href and '/search_result/' in href: full_url = f"https://www.xiaohongshu.com{href}" post_links.append(full_url) # 尝试获取帖子标题 try: title_element = await element.query_selector('div.title') if title_element: title = await title_element.text_content() else: title = "未知标题" except: title = "未知标题" post_titles.append(title) # 去重 unique_posts = [] seen_urls = set() for url, title in zip(post_links, post_titles): if url not in seen_urls: seen_urls.add(url) unique_posts.append({"url": url, "title": title}) # 限制返回数量 unique_posts = unique_posts[:limit] # 格式化返回结果 if unique_posts: result = "搜索结果:\n\n" for i, post in enumerate(unique_posts, 1): result += f"{i}. {post['title']}\n 链接: {post['url']}\n\n" return result else: return f"未找到与\"{keywords}\"相关的笔记" except Exception as e: return f"搜索笔记时出错: {str(e)}" @mcp.tool() async def get_note_content(url: str) -> str: """获取笔记内容 Args: url: 笔记 URL """ login_status = await ensure_browser() if not login_status: return "请先登录小红书账号" try: # 访问帖子链接 await main_page.goto(url, timeout=60000) await asyncio.sleep(5) # 等待页面加载 # 获取帖子内容 post_content = {} # 获取帖子标题 try: title_element = await main_page.query_selector('text="编辑于"') if title_element: title = await title_element.evaluate('(el) => el.previousElementSibling ? el.previousElementSibling.textContent : ""') post_content["标题"] = title.strip() if title else "未知标题" else: post_content["标题"] = "未知标题" except Exception as e: post_content["标题"] = "未知标题" # 获取作者 try: author_element = await main_page.query_selector('a[href*="/user/profile/"]') if author_element: author = await author_element.text_content() post_content["作者"] = author.strip() if author else "未知作者" else: post_content["作者"] = "未知作者" except Exception as e: post_content["作者"] = "未知作者" # 获取发布时间 try: time_selectors = [ 'text=/\\d{4}-\\d{2}-\\d{2}/', 'text=/\\d+月\\d+日/', 'text=/\\d+天前/', 'text=/\\d+小时前/', 'text=/今天/', 'text=/昨天/' ] post_content["发布时间"] = "未知" for selector in time_selectors: time_element = await main_page.query_selector(selector) if time_element: post_content["发布时间"] = await time_element.text_content() break except Exception as e: post_content["发布时间"] = "未知" # 获取帖子正文内容 try: content_selectors = [ 'div.content', 'div.note-content', 'article', 'div.desc' ] post_content["内容"] = "未能获取内容" for selector in content_selectors: content_element = await main_page.query_selector(selector) if content_element: content_text = await content_element.text_content() if content_text and len(content_text.strip()) > 10: post_content["内容"] = content_text.strip() break # 使用JavaScript提取主要文本内容 if post_content["内容"] == "未能获取内容": content_text = await main_page.evaluate(''' () => { const contentElements = Array.from(document.querySelectorAll('div, p, article')) .filter(el => { const text = el.textContent.trim(); return text.length > 50 && text.length < 5000 && el.querySelectorAll('a, button').length < 5 && el.children.length < 10; }) .sort((a, b) => b.textContent.length - a.textContent.length); if (contentElements.length > 0) { return contentElements[0].textContent.trim(); } return null; } ''') if content_text: post_content["内容"] = content_text except Exception as e: post_content["内容"] = f"获取内容时出错: {str(e)}" # 格式化返回结果 result = f"标题: {post_content['标题']}\n" result += f"作者: {post_content['作者']}\n" result += f"发布时间: {post_content['发布时间']}\n" result += f"链接: {url}\n\n" result += f"内容:\n{post_content['内容']}" return result except Exception as e: return f"获取笔记内容时出错: {str(e)}" @mcp.tool() async def get_note_comments(url: str) -> str: """获取笔记评论 Args: url: 笔记 URL """ login_status = await ensure_browser() if not login_status: return "请先登录小红书账号" try: # 访问帖子链接 await main_page.goto(url, timeout=60000) await asyncio.sleep(5) # 等待页面加载 # 先滚动到评论区 comment_section_locators = [ main_page.get_by_text("条评论", exact=False), main_page.get_by_text("评论", exact=False), main_page.locator("text=评论").first ] for locator in comment_section_locators: try: if await locator.count() > 0: await locator.scroll_into_view_if_needed(timeout=5000) await asyncio.sleep(2) break except Exception: continue # 滚动页面以加载更多评论 for i in range(8): try: await main_page.evaluate("window.scrollBy(0, 500)") await asyncio.sleep(1) # 尝试点击"查看更多评论"按钮 more_comment_selectors = [ "text=查看更多评论", "text=展开更多评论", "text=加载更多", "text=查看全部" ] for selector in more_comment_selectors: try: more_btn = main_page.locator(selector).first if await more_btn.count() > 0 and await more_btn.is_visible(): await more_btn.click() await asyncio.sleep(2) except Exception: continue except Exception: pass # 获取评论 comments = [] # 使用特定评论选择器 comment_selectors = [ "div.comment-item", "div.commentItem", "div.comment-content", "div.comment-wrapper", "section.comment", "div.feed-comment" ] for selector in comment_selectors: comment_elements = main_page.locator(selector) count = await comment_elements.count() if count > 0: for i in range(count): try: comment_element = comment_elements.nth(i) # 提取评论者名称 username = "未知用户" username_selectors = ["span.user-name", "a.name", "div.username", "span.nickname", "a.user-nickname"] for username_selector in username_selectors: username_el = comment_element.locator(username_selector).first if await username_el.count() > 0: username = await username_el.text_content() username = username.strip() break # 如果没有找到,尝试通过用户链接查找 if username == "未知用户": user_link = comment_element.locator('a[href*="/user/profile/"]').first if await user_link.count() > 0: username = await user_link.text_content() username = username.strip() # 提取评论内容 content = "未知内容" content_selectors = ["div.content", "p.content", "div.text", "span.content", "div.comment-text"] for content_selector in content_selectors: content_el = comment_element.locator(content_selector).first if await content_el.count() > 0: content = await content_el.text_content() content = content.strip() break # 如果没有找到内容,可能内容就在评论元素本身 if content == "未知内容": full_text = await comment_element.text_content() if username != "未知用户" and username in full_text: content = full_text.replace(username, "").strip() else: content = full_text.strip() # 提取评论时间 time_location = "未知时间" time_selectors = ["span.time", "div.time", "span.date", "div.date", "time"] for time_selector in time_selectors: time_el = comment_element.locator(time_selector).first if await time_el.count() > 0: time_location = await time_el.text_content() time_location = time_location.strip() break # 如果内容有足够长度且找到用户名,添加评论 if username != "未知用户" and content != "未知内容" and len(content) > 2: comments.append({ "用户名": username, "内容": content, "时间": time_location }) except Exception: continue # 如果找到了评论,就不继续尝试其他选择器了 if comments: break # 如果没有找到评论,尝试使用其他方法 if not comments: # 获取所有用户名元素 username_elements = main_page.locator('a[href*="/user/profile/"]') username_count = await username_elements.count() if username_count > 0: for i in range(username_count): try: username_element = username_elements.nth(i) username = await username_element.text_content() # 尝试获取评论内容 content = await main_page.evaluate(''' (usernameElement) => { const parent = usernameElement.parentElement; if (!parent) return null; // 尝试获取同级的下一个元素 let sibling = usernameElement.nextElementSibling; while (sibling) { const text = sibling.textContent.trim(); if (text) return text; sibling = sibling.nextElementSibling; } // 尝试获取父元素的文本,并过滤掉用户名 const allText = parent.textContent.trim(); if (allText && allText.includes(usernameElement.textContent.trim())) { return allText.replace(usernameElement.textContent.trim(), '').trim(); } return null; } ''', username_element) if username and content: comments.append({ "用户名": username.strip(), "内容": content.strip(), "时间": "未知时间" }) except Exception: continue # 格式化返回结果 if comments: result = f"共获取到 {len(comments)} 条评论:\n\n" for i, comment in enumerate(comments, 1): result += f"{i}. {comment['用户名']}({comment['时间']}): {comment['内容']}\n\n" return result else: return "未找到任何评论,可能是帖子没有评论或评论区无法访问。" except Exception as e: return f"获取评论时出错: {str(e)}" @mcp.tool() async def post_smart_comment(url: str, comment_type: str = "引流") -> str: """根据帖子内容发布智能评论,增加曝光并引导用户关注或私聊 Args: url: 笔记 URL comment_type: 评论类型,可选值: "引流" - 引导用户关注或私聊 "点赞" - 简单互动获取好感 "咨询" - 以问题形式增加互动 "专业" - 展示专业知识建立权威 """ login_status = await ensure_browser() if not login_status: return "请先登录小红书账号,才能发布评论" try: # 访问帖子链接 await main_page.goto(url, timeout=60000) await asyncio.sleep(5) # 等待页面加载 # 获取帖子内容进行分析 post_content = {} # 获取帖子标题 try: title_element = await main_page.query_selector('text="编辑于"') if title_element: title = await title_element.evaluate('(el) => el.previousElementSibling ? el.previousElementSibling.textContent : ""') post_content["标题"] = title.strip() if title else "未知标题" else: post_content["标题"] = "未知标题" except Exception: post_content["标题"] = "未知标题" # 获取作者 try: author_element = await main_page.query_selector('a[href*="/user/profile/"]') if author_element: author = await author_element.text_content() post_content["作者"] = author.strip() if author else "未知作者" else: post_content["作者"] = "未知作者" except Exception: post_content["作者"] = "未知作者" # 获取帖子正文内容 try: content_selectors = [ 'div.content', 'div.note-content', 'article', 'div.desc' ] post_content["内容"] = "未能获取内容" for selector in content_selectors: content_element = await main_page.query_selector(selector) if content_element: content_text = await content_element.text_content() if content_text and len(content_text.strip()) > 10: post_content["内容"] = content_text.strip() break # 使用JavaScript提取主要文本内容 if post_content["内容"] == "未能获取内容": content_text = await main_page.evaluate(''' () => { const contentElements = Array.from(document.querySelectorAll('div, p, article')) .filter(el => { const text = el.textContent.trim(); return text.length > 50 && text.length < 5000 && el.querySelectorAll('a, button').length < 5 && el.children.length < 10; }) .sort((a, b) => b.textContent.length - a.textContent.length); if (contentElements.length > 0) { return contentElements[0].textContent.trim(); } return null; } ''') if content_text: post_content["内容"] = content_text except Exception: post_content["内容"] = "未能获取内容" # 根据帖子内容和评论类型生成智能评论 comment_text = await _generate_smart_comment(post_content, comment_type) # 基于页面快照定位评论输入框 # 先定位评论区域 try: # 先尝试找到评论数量区域,它通常包含"条评论"文本 comment_count_selectors = [ 'text="条评论"', 'text="共 " >> xpath=..', 'text=/\\d+ 条评论/', ] for selector in comment_count_selectors: try: comment_count_element = await main_page.query_selector(selector) if comment_count_element: await comment_count_element.scroll_into_view_if_needed() await asyncio.sleep(2) break except Exception: continue # 定位评论输入框 input_box_selectors = [ 'paragraph:has-text("说点什么...")', 'text="说点什么..."', 'text="评论发布后所有人都能看到"', 'div[contenteditable="true"]' ] comment_input = None for selector in input_box_selectors: try: input_element = await main_page.query_selector(selector) if input_element and await input_element.is_visible(): await input_element.scroll_into_view_if_needed() await asyncio.sleep(1) comment_input = input_element break except Exception: continue # 如果上面的方法都失败了,尝试使用JavaScript查找 if not comment_input: comment_input = await main_page.evaluate(''' () => { // 查找包含"说点什么..."的元素 const elements = Array.from(document.querySelectorAll('*')) .filter(el => el.textContent && el.textContent.includes('说点什么...')); if (elements.length > 0) return elements[0]; // 查找可编辑的div元素 const editableDivs = Array.from(document.querySelectorAll('div[contenteditable="true"]')); if (editableDivs.length > 0) return editableDivs[0]; return null; } ''') if comment_input: comment_input = await main_page.query_selector_all('*')[-1] # 使用最后一个元素作为占位符 if not comment_input: return "未能找到评论输入框,无法发布评论" # 点击评论输入框 await comment_input.click() await asyncio.sleep(1) # 使用键盘输入评论内容 await main_page.keyboard.type(comment_text) await asyncio.sleep(1) # 尝试使用Enter键发送 try: # 寻找发送按钮 send_button_selectors = [ 'button:has-text("发送")', 'button[aria-ref^="s"]' ] send_button = None for selector in send_button_selectors: elements = await main_page.query_selector_all(selector) for element in elements: text_content = await element.text_content() if text_content and '发送' in text_content: send_button = element break if send_button: break if send_button: await send_button.click() else: # 如果找不到发送按钮,使用Enter键 await main_page.keyboard.press('Enter') await asyncio.sleep(3) # 等待评论发送完成 return f"已成功发布评论:{comment_text}" except Exception as e: # 如果点击发送按钮失败,尝试通过Enter键发送 try: await main_page.keyboard.press('Enter') await asyncio.sleep(3) return f"已通过Enter键发送评论:{comment_text}" except Exception as press_error: return f"尝试发送评论时出错: {str(e)},尝试Enter键也失败: {str(press_error)}" except Exception as e: return f"操作评论区时出错: {str(e)}" except Exception as e: return f"发布评论时出错: {str(e)}" async def _generate_smart_comment(post_content, comment_type): """根据帖子内容和评论类型生成智能评论""" title = post_content.get("标题", "") content = post_content.get("内容", "") author = post_content.get("作者", "") # 提取帖子关键词 keywords = [] # 简单分词 import re words = re.findall(r'\w+', f"{title} {content}") # 使用常见的热门领域关键词 domain_keywords = { "美妆": ["口红", "粉底", "眼影", "护肤", "美妆", "化妆", "保湿", "精华", "面膜"], "穿搭": ["穿搭", "衣服", "搭配", "时尚", "风格", "单品", "衣橱", "潮流"], "美食": ["美食", "好吃", "食谱", "餐厅", "小吃", "甜点", "烘焙", "菜谱"], "旅行": ["旅行", "旅游", "景点", "出行", "攻略", "打卡", "度假", "酒店"], "母婴": ["宝宝", "母婴", "育儿", "儿童", "婴儿", "辅食", "玩具"], "数码": ["数码", "手机", "电脑", "相机", "智能", "设备", "科技"], "家居": ["家居", "装修", "家具", "设计", "收纳", "布置", "家装"], "健身": ["健身", "运动", "瘦身", "减肥", "训练", "塑形", "肌肉"] } # 检测帖子可能属于的领域 detected_domains = [] for domain, domain_keys in domain_keywords.items(): for key in domain_keys: if key in title.lower() or key in content.lower(): detected_domains.append(domain) break # 如果没有检测到明确的领域,默认为生活方式 if not detected_domains: detected_domains = ["生活"] # 根据评论类型生成评论模板 templates = { "引流": [ f"这个{detected_domains[0]}分享真不错!我也在研究相关内容,欢迎私信交流~", f"谢谢分享,{author}的见解很独到!我也整理了一些相关资料,感兴趣可以找我聊聊", f"看了你的分享受益匪浅,我在公号上也写过类似的内容,感兴趣可以来找我", f"非常喜欢你的分享风格!我也做{detected_domains[0]}相关内容,可以互相关注交流", f"太有共鸣了!我也遇到过类似情况,想了解更多可以私信我", f"这篇帖子信息量好大!我收藏了,有问题可以一起讨论哦~" ], "点赞": [ f"太赞了!{author}的分享总是这么实用", f"每次看到{author}的分享都很有收获,继续关注了", f"这篇内容整理得超详细,学到了很多,感谢分享!", f"喜欢这种深度分享,比一般的{detected_domains[0]}帖子有内涵多了", f"已经收藏并点赞了,很有参考价值", f"这种高质量内容太少见了,感谢博主分享" ], "咨询": [ f"请问博主,对于{detected_domains[0]}有什么入门建议吗?", f"这个{detected_domains[0]}技巧看起来很实用,适合新手尝试吗?", f"博主分享的经验太宝贵了,能详细说说你是怎么开始的吗?", f"看完很受启发,想请教一下{author},如何能做到这么专业的水平?", f"对这个领域很感兴趣,有没有推荐的学习资源可以分享?", f"博主的见解很独到,能不能分享一下你的学习路径?" ], "专业": [ f"作为{detected_domains[0]}领域从业者,很认同博主的观点,特别是关于{title[:10]}这部分", f"从专业角度来看,这篇分享涵盖了关键点,我还想补充的是...", f"这个内容分析很到位,我在实践中也发现了类似规律,非常认同", f"很专业的分享!我从事相关工作多年,这些方法确实有效", f"这篇内容的深度令人印象深刻,展现了博主的专业素养", f"从技术角度看,博主分享的方法非常具有可行性,值得一试" ] } import random # 确保评论类型有效 if comment_type not in templates: comment_type = "引流" # 默认使用引流模板 # 随机选择一个模板 selected_template = random.choice(templates[comment_type]) # 如果是专业类型的评论,可以添加一些行业术语 if comment_type == "专业": domain_terms = { "美妆": ["妆效", "质地", "显色度", "持妆力", "上妆", "哑光", "珠光"], "穿搭": ["版型", "剪裁", "廓形", "叠穿", "层次感", "色调", "质感"], "美食": ["口感", "层次", "搭配", "技法", "火候", "调味", "食材"], "旅行": ["行程", "攻略", "体验", "风土人情", "当地特色", "隐藏景点"], "母婴": ["早教", "成长", "发育", "营养", "互动", "认知"], "数码": ["性能", "体验", "配置", "系统", "兼容性", "功耗", "散热"], "家居": ["空间规划", "采光", "色彩搭配", "功能区", "动线"], "健身": ["训练计划", "组数", "强度", "恢复", "代谢", "肌肉增长"] } # 为检测到的每个领域随机添加一个术语 for domain in detected_domains: if domain in domain_terms: terms = domain_terms[domain] selected_term = random.choice(terms) if random.random() > 0.5: # 50%的概率添加术语 selected_template += f",特别是在{selected_term}方面的见解很独到" # 在部分评论中添加吸引私信的结尾 if comment_type == "引流" or (comment_type == "咨询" and random.random() > 0.7): endings = [ "有更多问题欢迎私信我~", "感兴趣的话可以来我主页看看哦", "想了解更多可以找我聊聊", "关注我了解更多相关内容", "私信我有惊喜哦~" ] selected_template += " " + random.choice(endings) return selected_template if __name__ == "__main__": # 初始化并运行服务器 print("启动小红书MCP服务器...") print("请在MCP客户端(如Claude for Desktop)中配置此服务器") mcp.run(transport='stdio')

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/chenningling/RedBook-Search-Comment-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server