Skip to main content
Glama
fancyboi999

Xiaohongshu (Little Red Book) MCP Server

by fancyboi999
main.py16.8 kB
import sys from typing import Annotated, Any, List, Dict, Optional import asyncio import concurrent import tempfile import time import json import os from datetime import datetime from fastmcp import FastMCP from dotenv import load_dotenv from pydantic import Field import requests from xhs_auto_mcp.tools.write_xiaohongshu import XiaohongshuPoster from mcp.types import TextContent from xhs_auto_mcp.tools.xhs_api import XhsApi from xhs_auto_mcp.tools.log_utils import logger, setup_logger from urllib.parse import urlparse, parse_qs import argparse # 配置日志 setup_logger(log_level="INFO") parser = argparse.ArgumentParser() parser.add_argument("--transport", type=str, default='http') parser.add_argument("--port", type=int, default=8809) parser.add_argument("--host", type=str, default='0.0.0.0') args = parser.parse_args() mcp = FastMCP("小红书内容获取及自动发布", port=args.port) # 加载.env文件中的环境变量 load_dotenv() xhs_cookie = os.getenv('XHS_COOKIE') # 默认缓存路径为当前目录 path= os.getenv("JSON_PATH","./") slow_mode=os.getenv("SLOW_MODE","False").lower() == "true" xhs_api = XhsApi(cookie=xhs_cookie) def get_nodeid_token(url=None, note_ids=None): if note_ids is not None: note_id = note_ids[0,24] xsec_token = note_ids[24:] return {"note_id": note_id, "xsec_token": xsec_token} parsed_url = urlparse(url) query_params = parse_qs(parsed_url.query) note_id = parsed_url.path.split('/')[-1] xsec_token = None xsec_token_list = query_params.get('xsec_token', [None]) if len(xsec_token_list) > 0: xsec_token = xsec_token_list[0] return {"note_id": note_id, "xsec_token": xsec_token} def download_image(url): local_filename = url.split('/')[-1] temp_dir = tempfile.gettempdir() local_path = os.path.join(temp_dir, local_filename) # 假设缓存地址为/tmp with requests.get(url, stream=True) as r: r.raise_for_status() with open(local_path, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) return local_path def download_images_parallel(urls): """ 并行下载图片到本地缓存地址 """ with concurrent.futures.ThreadPoolExecutor() as executor: results = list(executor.map(download_image, urls)) return results @mcp.tool() async def check_content_cookie() -> str: """ 小红书内容平台工具 检测小红书内容平台cookie是否有效 Returns: str: "cookie有效" 或 "cookie已失效" """ try: data = await xhs_api.get_me() if 'success' in data and data['success'] == True: return "cookie有效" else: return "cookie已失效" except Exception as e: logger.error(e) return "cookie已失效" @mcp.tool() async def home_feed() -> str: """ 小红书内容平台工具 获取首页推荐笔记 Returns: str: 首页推荐笔记列表 """ data = await xhs_api.home_feed() result = "搜索结果:\n\n" if 'data' in data and 'items' in data['data'] and len(data['data']['items']) > 0: for i in range(0, len(data['data']['items'])): item = data['data']['items'][i] if 'note_card' in item and 'display_title' in item['note_card']: title = item['note_card']['display_title'] liked_count = item['note_card']['interact_info']['liked_count'] # cover=item['note_card']['cover']['url_default'] url = f'https://www.xiaohongshu.com/explore/{item["id"]}?xsec_token={item["xsec_token"]}' result += f"{i}. {title} \n 点赞数:{liked_count} \n 链接: {url} \n\n" else: result = await check_content_cookie() if "有效" in result: result = f"未找到相关的笔记" return result @mcp.tool() async def search_notes(keywords: Annotated[str, Field(description="搜索关键词")]) -> str: """ 小红书内容平台工具 根据关键词搜索笔记 Returns: str: 搜索结果 """ data = await xhs_api.search_notes(keywords) logger.info(f'keywords:{keywords},data:{data}') result = "搜索结果:\n\n" if 'data' in data and 'items' in data['data'] and len(data['data']['items']) > 0: for i in range(0, len(data['data']['items'])): item = data['data']['items'][i] if 'note_card' in item and 'display_title' in item['note_card']: title = item['note_card']['display_title'] liked_count = item['note_card']['interact_info']['liked_count'] # cover=item['note_card']['cover']['url_default'] url = f'https://www.xiaohongshu.com/explore/{item["id"]}?xsec_token={item["xsec_token"]}' result += f"{i}. {title} \n 点赞数:{liked_count} \n 链接: {url} \n\n" else: result = await check_content_cookie() if "有效" in result: result = f"未找到与\"{keywords}\"相关的笔记" return result @mcp.tool() async def get_note_content(url: Annotated[str, Field(description="笔记url,要带上xsec_token")]) -> str: """ 小红书内容平台工具 获取笔记详细内容 Returns: str: 笔记内容 """ params = get_nodeid_token(url=url) data = await xhs_api.get_note_content(**params) logger.info(f'url:{url},data:{data}') result = "" if 'data' in data and 'items' in data['data'] and len(data['data']['items']) > 0: for i in range(0, len(data['data']['items'])): item = data['data']['items'][i] if 'note_card' in item and 'user' in item['note_card']: note_card = item['note_card'] cover = '' if 'image_list' in note_card and len(note_card['image_list']) > 0 and note_card['image_list'][0][ 'url_pre']: cover = note_card['image_list'][0]['url_pre'] data_format = datetime.fromtimestamp(note_card.get('time', 0) / 1000) liked_count = item['note_card']['interact_info']['liked_count'] comment_count = item['note_card']['interact_info']['comment_count'] collected_count = item['note_card']['interact_info']['collected_count'] url = f'https://www.xiaohongshu.com/explore/{params["note_id"]}?xsec_token={params["xsec_token"]}' result = f"标题: {note_card.get('title', '')}\n" result += f"作者: {note_card['user'].get('nickname', '')}\n" result += f"发布时间: {data_format}\n" result += f"点赞数: {liked_count}\n" result += f"评论数: {comment_count}\n" result += f"收藏数: {collected_count}\n" result += f"链接: {url}\n\n" result += f"内容:\n{note_card.get('desc', '')}\n" result += f"封面:\n{cover}" break else: result = await check_content_cookie() if "有效" in result: result = "获取失败" return result @mcp.tool() async def get_note_comments(url: Annotated[str, Field(description="笔记url,要带上xsec_token")]) -> str: """ 小红书内容平台工具 获取笔记评论 Returns: str: 笔记评论 """ params = get_nodeid_token(url=url) data = await xhs_api.get_note_comments(**params) logger.info(f'url:{url},data:{data}') result = "" if 'data' in data and 'comments' in data['data'] and len(data['data']['comments']) > 0: for i in range(0, len(data['data']['comments'])): item = data['data']['comments'][i] data_format = datetime.fromtimestamp(item['create_time'] / 1000) result += f"{i}. {item['user_info']['nickname']}({data_format}): {item['content']}\n\n" else: result = await check_content_cookie() if "有效" in result: result = "暂无评论" return result @mcp.tool() async def post_comment(comment: Annotated[str, Field(description="评论内容")], note_id: Annotated[str, Field(description="笔记id")]) -> str: """ 小红书内容平台工具 进行评论到指定笔记 Returns: str: 发布结果 """ # params = get_nodeid_token(url) response = await xhs_api.post_comment(note_id, comment) if 'success' in response and response['success'] == True: return "回复成功" else: # 直接实现cookie检查逻辑,而不是调用check_content_cookie工具 try: data = await xhs_api.get_me() if 'success' in data and data['success'] == True: return "回复失败" else: return "cookie已失效" except Exception as e: logger.error(e) return "cookie已失效" @mcp.tool() def login(phone: Annotated[str, Field(description="手机号")], country_code: Annotated[Optional[str], Field(description="国家代码")] = "+86"): """ 小红书创作平台工具 打开浏览器,发送验证码,等待用户输入验证码,登录小红书创作平台 若用户未提供手机号,请告诉用户,请提供手机号 Returns: str: 打开浏览器及发送验证码结果 """ # 重置之前可能存在的实例 XiaohongshuPoster.reset_instance() # 创建新实例 poster = XiaohongshuPoster(path) success, message = poster.login(phone, country_code) # 不关闭浏览器,让它保持打开状态等待验证码 return message @mcp.tool() def wait_for_verify_code(verification_code: Annotated[str, Field(description="验证码")]): """ 小红书创作平台工具 等待用户输入验证码,登录小红书创作平台 Returns: str: 验证码输入结果,及登录结果 """ # 使用已存在的实例 poster = XiaohongshuPoster(path) success, message = poster.wait_for_verify_code(verification_code) return message @mcp.tool() def create_note_with_images(title: Annotated[str, Field(description="小红书笔记标题,不能超过20个字")], content: Annotated[str, Field(description="小红书笔记正文内容,不能超过1000个字")], image_paths: Annotated[Optional[str], Field(description="本地图片路径,多个路径用逗号分隔")] = None, image_urls: Annotated[Optional[str], Field(description="图片URL链接,多个URL用逗号分隔")] = None) -> list[TextContent]: """ 小红书创作平台工具 创建小红书笔记支持本地图片和图片URL,必须提供图片路径或URL。 如果提供图片URL,则忽略本地图片路径,如果提供本地图片路径则忽略图片URL,如果都提供,则优先使用本地图片路径 Returns: list[TextContent]: 发布结果 """ poster = XiaohongshuPoster(path) #poster.login(phone) res = "" try: images = [] # 处理本地图片路径 if image_paths: local_paths = [path.strip() for path in image_paths.split(',')] # 检查图片文件是否存在 for img_path in local_paths: if not os.path.exists(img_path): logger.error(f"图片文件不存在: {img_path}") return [TextContent(type="text", text=f"错误:图片文件不存在: {img_path}")] else: logger.info(f"图片文件存在: {img_path}") images.extend(local_paths) # 处理图片URL if not images and image_urls: urls = [url.strip() for url in image_urls.split(',')] if urls: # 使用并行下载图片 try: downloaded_images = download_images_parallel(urls) images.extend(downloaded_images) logger.info(f"成功下载图片: {downloaded_images}") except Exception as e: logger.error(f"下载图片失败: {str(e)}") return [TextContent(type="text", text=f"错误:下载图片失败: {str(e)}")] # 确保至少有一张图片 if not images: return [TextContent(type="text", text="错误:至少需要提供一张图片(本地路径或URL)")] logger.info(f"准备发布笔记,标题: {title}, 内容长度: {len(content)}, 图片数量: {len(images)}") code, info = poster.login_to_publish(title, content, images, slow_mode) logger.info(f"发布结果: 成功={code}, 信息={info}") res = info except Exception as e: logger.error(f"发布笔记异常: {str(e)}", exc_info=True) res = f"error: {str(e)}" finally: try: poster.close() logger.info("浏览器已关闭") except Exception as e: logger.error(f"关闭浏览器时发生错误: {str(e)}") return [TextContent(type="text", text=res)] @mcp.tool() def create_note_with_videos(title: Annotated[str, Field(description="小红书笔记标题,不能超过20个字")], content: Annotated[str, Field(description="小红书笔记正文内容,不能超过1000个字")], video_paths: Annotated[Optional[str], Field(description="本地视频路径,多个路径用逗号分隔")] = None, video_urls: Annotated[Optional[str], Field(description="视频URL链接,多个URL用逗号分隔")] = None) -> list[TextContent]: """ 小红书创作平台工具 创建小红书笔记支持本地视频和视频URL,必须提供视频路径或URL 。 如果提供视频URL,则忽略本地视频路径,如果提供本地视频路径则忽略视频URL,如果都提供,则优先使用本地视频路径 Returns: list[TextContent]: 发布结果 """ poster = XiaohongshuPoster(path) #poster.login(phone) res = "" try: videos = [] # 处理本地视频路径 if video_paths: local_paths = [path.strip() for path in video_paths.split(',')] # 检查视频文件是否存在 for video_path in local_paths: if not os.path.exists(video_path): logger.error(f"视频文件不存在: {video_path}") return [TextContent(type="text", text=f"错误:视频文件不存在: {video_path}")] else: logger.info(f"视频文件存在: {video_path}") videos.extend(local_paths) # 处理视频URL if not videos and video_urls: urls = [url.strip() for url in video_urls.split(',')] if urls: # 使用并行下载视频 try: downloaded_videos = download_images_parallel(urls) videos.extend(downloaded_videos) logger.info(f"成功下载视频: {downloaded_videos}") except Exception as e: logger.error(f"下载视频失败: {str(e)}") return [TextContent(type="text", text=f"错误:下载视频失败: {str(e)}")] # 确保至少有一个视频 if not videos: return [TextContent(type="text", text="错误:至少需要提供一个视频(本地路径或URL)")] logger.info(f"准备发布视频笔记,标题: {title}, 内容长度: {len(content)}, 视频数量: {len(videos)}") code, info = poster.login_to_publish_video(title, content, videos, slow_mode) logger.info(f"发布结果: 成功={code}, 信息={info}") res = info except Exception as e: logger.error(f"发布视频笔记异常: {str(e)}", exc_info=True) res = f"error: {str(e)}" finally: try: poster.close() logger.info("浏览器已关闭") except Exception as e: logger.error(f"关闭浏览器时发生错误: {str(e)}") return [TextContent(type="text", text=res)] def app(): host = args.host port = args.port transport = args.transport try: if transport == "http": mcp.run(host=host, port=port, transport=transport) elif transport == "stdio": mcp.run(transport=transport) else: raise ValueError("不支持的端口形式") except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Error running server: {str(e)}") sys.exit(1) if __name__ == "__main__": app()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fancyboi999/xhs-auto-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server