Skip to main content
Glama
fancyboi999

Xiaohongshu (Little Red Book) MCP Server

by fancyboi999
write_xiaohongshu.py19.8 kB
# 小红书的自动发稿 from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.keys import Keys from xhs_auto_mcp.tools.log_utils import logger import time import json import os class XiaohongshuPoster: # 添加静态变量来保存实例 _instance = None def __init__(self,path=os.path.dirname(os.path.abspath(__file__))): # 如果已经有实例,直接返回 if XiaohongshuPoster._instance is not None: self.driver = XiaohongshuPoster._instance.driver self.wait = XiaohongshuPoster._instance.wait self.token_file = XiaohongshuPoster._instance.token_file self.cookies_file = XiaohongshuPoster._instance.cookies_file self.token = XiaohongshuPoster._instance.token return #self.driver = webdriver.ChromiumEdge()#.Chrome() self.driver = webdriver.Chrome() self.wait = WebDriverWait(self.driver, 10) # 获取当前执行文件所在目录 current_dir = path self.token_file = os.path.join(current_dir, "xiaohongshu_token.json") self.cookies_file = os.path.join(current_dir, "xiaohongshu_cookies.json") self.token = self._load_token() self._load_cookies() # 保存实例 XiaohongshuPoster._instance = self def _load_token(self): """从文件加载token""" if os.path.exists(self.token_file): try: with open(self.token_file, 'r') as f: token_data = json.load(f) # 检查token是否过期 if token_data.get('expire_time', 0) > time.time(): return token_data.get('token') except: pass return None def _save_token(self, token): """保存token到文件""" token_data = { 'token': token, # token有效期设为30天 'expire_time': time.time() + 30 * 24 * 3600 } with open(self.token_file, 'w') as f: json.dump(token_data, f) def _load_cookies(self): """从文件加载cookies""" if os.path.exists(self.cookies_file): logger.info(f"加载cookies文件: {self.cookies_file}") try: with open(self.cookies_file, 'r') as f: cookies = json.load(f) self.driver.get("https://creator.xiaohongshu.com") for cookie in cookies: self.driver.add_cookie(cookie) except: pass def _save_cookies(self): """保存cookies到文件""" cookies = self.driver.get_cookies() with open(self.cookies_file, 'w') as f: json.dump(cookies, f) def login_to_publish(self,title, content, images=None,slow_mode=False): #self.driver.get("https://creator.xiaohongshu.com/publish/publish?from=menu") self._load_cookies() self.driver.refresh() self.driver.get("https://creator.xiaohongshu.com/publish/publish?from=menu") time.sleep(3) if self.driver.current_url != "https://creator.xiaohongshu.com/publish/publish?from=menu": return False, "登录失败" #time.sleep(1) # 如果是发布视频,则不操作这一步 # 切换到上传图文 tabs = self.driver.find_elements(By.CSS_SELECTOR, ".creator-tab") if len(tabs) > 1: tabs[2].click() time.sleep(1) # # 输入标题和内容 # 上传图片 if images: logger.info(f"准备上传图片,路径: {images}") # 检查图片文件是否存在 for img_path in images: if not os.path.exists(img_path): logger.error(f"图片文件不存在: {img_path}") return False, f"图片文件不存在: {img_path}" else: logger.info(f"图片文件存在: {img_path}") try: upload_input = self.driver.find_element(By.CSS_SELECTOR, ".upload-input") # 将所有图片路径用\n连接成一个字符串一次性上传 upload_input.send_keys('\n'.join(images)) logger.info("图片上传成功") time.sleep(2) # 增加等待时间,确保图片上传完成 except Exception as e: logger.error(f"图片上传失败: {str(e)}") return False, f"图片上传失败: {str(e)}" time.sleep(1) title = title[:20] title_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".d-text"))) title_input.send_keys(title) # Start of Selection # Start of Selection logger.info(content) content_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".ql-editor"))) content_input.send_keys(content) # 发布 if slow_mode: time.sleep(5) time.sleep(2) submit_btn = self.driver.find_element(By.CSS_SELECTOR, ".publishBtn") submit_btn.click() logger.info('发布成功') time.sleep(2) return True, "发布成功" def login_to_publish_video(self,title, content, videos=None,slow_mode=False): #self.driver.get("https://creator.xiaohongshu.com/publish/publish?from=menu") self._load_cookies() self.driver.refresh() self.driver.get("https://creator.xiaohongshu.com/publish/publish?from=menu") time.sleep(3) if self.driver.current_url != "https://creator.xiaohongshu.com/publish/publish?from=menu": return False, "登录失败" # # 输入标题和内容 if videos: logger.info(f"准备上传视频,路径: {videos}") # 检查视频文件是否存在 for video_path in videos: if not os.path.exists(video_path): logger.error(f"视频文件不存在: {video_path}") return False, f"视频文件不存在: {video_path}" else: logger.info(f"视频文件存在: {video_path}") try: upload_input = self.driver.find_element(By.CSS_SELECTOR, ".upload-input") # 将所有视频路径用\n连接成一个字符串一次性上传 upload_input.send_keys('\n'.join(videos)) logger.info("视频上传成功") time.sleep(3) # 增加等待时间,确保视频上传完成 except Exception as e: logger.error(f"视频上传失败: {str(e)}") return False, f"视频上传失败: {str(e)}" time.sleep(3) title = title[:20] title_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".d-text"))) title_input.send_keys(title) # Start of Selection logger.info(content) content_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".ql-editor"))) content_input.send_keys(content) # 发布 if slow_mode: time.sleep(5) time.sleep(2) submit_btn = self.driver.find_element(By.CSS_SELECTOR, ".publishBtn") submit_btn.click() logger.info('发布成功') time.sleep(2) return True, "发布成功" def login(self, phone, country_code="+86"): """登录小红书""" # 如果token有效则直接返回 if self.token: return True, "已有有效token,无需重新登录" # 尝试加载cookies进行登录 self.driver.get("https://creator.xiaohongshu.com/login") self._load_cookies() self.driver.refresh() time.sleep(3) # 检查是否已经登录 if self.driver.current_url != "https://creator.xiaohongshu.com/login": logger.info("使用cookies登录成功") self.token = self._load_token() self._save_cookies() time.sleep(2) return True, "使用cookies登录成功" else: # 清理无效的cookies self.driver.delete_all_cookies() logger.info("无效的cookies,已清理") # 如果cookies登录失败,则进行手动登录 self.driver.get("https://creator.xiaohongshu.com/login") # 等待登录页面加载完成 time.sleep(5) # 点击国家区号输入框 skip = True if not skip: country_input = self.wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "input[placeholder='请选择选项']"))) country_input.click() time.sleep(30) # 等待区号列表出现并点击+886 # 等待区号列表出现并点击+86 try: self.driver.find_element(By.XPATH, "/html/body/div[1]/div/div/div/div[2]/div[1]/div[2]/div/div/div/div/div/div[2]/div[1]/div[1]/div/div/div[1]/input").click() time.sleep(2) self.driver.find_element(By.XPATH, "/html/body/div[1]/div/div/div/div[2]/div[1]/div[2]/div/div/div/div/div/div[2]/div[1]/div[1]/div/div/div[1]/input").send_keys( country_code) time.sleep(2) self.driver.find_element(By.XPATH, "/html/body/div[6]/div/div").click() # china_option = self.wait.until(EC.element_to_be_clickable((By.XPATH, "//div[contains(@class, 'css-cqcgee')]//div[contains(text(), '+86')]"))) time.sleep(2) except Exception as e: logger.info("无法找到国家区号选项") logger.info(e) # 定位手机号输入框 phone_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "input[placeholder='手机号']"))) phone_input.clear() phone_input.send_keys(phone) # 点击发送验证码按钮 try: send_code_btn = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, ".css-uyobdj"))) send_code_btn.click() except: # 尝试其他可能的选择器 try: send_code_btn = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, ".css-1vfl29"))) send_code_btn.click() except: try: send_code_btn = self.wait.until( EC.element_to_be_clickable((By.XPATH, "//button[contains(text(),'发送验证码')]"))) send_code_btn.click() except: logger.info("无法找到发送验证码按钮") return False, "无法找到发送验证码按钮" return True, "成功打开浏览器,发送验证码成功" def wait_for_verify_code(self,verification_code): """等待验证码输入并登录 Args: verification_code: 验证码 Returns: tuple: (成功状态, 消息) """ try: # 输入验证码 code_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "input[placeholder='验证码']"))) code_input.clear() code_input.send_keys(verification_code) # 点击登录按钮 login_button = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, ".beer-login-btn"))) login_button.click() # 等待登录成功,获取token time.sleep(3) # 保存cookies self._save_cookies() return True, "验证码通过,登录成功" except Exception as e: logger.info(f"验证码输入失败: {str(e)}") return False, f"验证码输入失败: {str(e)}" def post_article(self, title, content, images=None): """发布文章 Args: title: 文章标题 content: 文章内容 images: 图片路径列表 """ # 如果token失效则重新登录 # 设置token # self.driver.execute_script(f'localStorage.setItem("token", "{self.token}")') #time.sleep(3) #logger.info("点击发布按钮") # 点击发布按钮 publish_btn = self.wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, ".btn.el-tooltip__trigger.el-tooltip__trigger"))) publish_btn.click() # 如果是发布视频,则不操作这一步 # 切换到上传图文 time.sleep(3) tabs = self.driver.find_elements(By.CSS_SELECTOR, ".creator-tab") if len(tabs) > 1: tabs[1].click() time.sleep(2) # # 输入标题和内容 # title_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".title-input"))) # content_input = self.driver.find_element(By.CSS_SELECTOR, ".content-input") # title_input.send_keys(title) # content_input.send_keys(content) # 上传图片 if images: upload_input = self.driver.find_element(By.CSS_SELECTOR, ".upload-input") # 将所有图片路径用\n连接成一个字符串一次性上传 upload_input.send_keys('\n'.join(images)) time.sleep(1) time.sleep(2) title=title[:20] title_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".d-text"))) title_input.send_keys(title) # Start of Selection # Start of Selection logger.info(content) content_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".ql-editor"))) content_input.send_keys(content) # 发布 time.sleep(2) submit_btn = self.driver.find_element(By.CSS_SELECTOR, ".publishBtn") submit_btn.click() logger.info('发布成功') time.sleep(2) def post_video_article(self, title, content, videos=None): """发布文章 Args: title: 文章标题 content: 文章内容 videos: 视频路径列表 """ # 如果token失效则重新登录 # 设置token # self.driver.execute_script(f'localStorage.setItem("token", "{self.token}")') time.sleep(3) #logger.info("点击发布按钮") # 点击发布按钮 publish_btn = self.wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, ".btn.el-tooltip__trigger.el-tooltip__trigger"))) publish_btn.click() # 如果是发布视频,则不操作这一步 # 切换到上传图文 time.sleep(3) # # 输入标题和内容 # title_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".title-input"))) # content_input = self.driver.find_element(By.CSS_SELECTOR, ".content-input") # title_input.send_keys(title) # content_input.send_keys(content) # 上传图片 if videos: upload_input = self.driver.find_element(By.CSS_SELECTOR, ".upload-input") # 将所有图片路径用\n连接成一个字符串一次性上传 upload_input.send_keys('\n'.join(videos)) time.sleep(1) time.sleep(3) title_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".d-text"))) title_input.send_keys(title) # Start of Selection # Start of Selection logger.info(content) content_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".ql-editor"))) content_input.send_keys(content) # 发布 time.sleep(6) submit_btn = self.driver.find_element(By.CSS_SELECTOR, ".publishBtn") submit_btn.click() logger.info('发布成功') time.sleep(3) def close(self): """关闭浏览器""" self.driver.quit() XiaohongshuPoster._instance = None @classmethod def reset_instance(cls): """重置实例,用于关闭浏览器后重新创建""" if cls._instance is not None: try: cls._instance.driver.quit() except: pass cls._instance = None if __name__ == "__main__": # 测试XiaohongshuPoster类的功能 import argparse parser = argparse.ArgumentParser(description='测试小红书发布功能') parser.add_argument('--mode', type=str, default='login', choices=['login', 'post_image', 'post_video'], help='测试模式: login-仅登录, post_image-发布图文, post_video-发布视频') parser.add_argument('--title', type=str, default='测试标题', help='发布内容的标题') parser.add_argument('--content', type=str, default='这是一条测试内容', help='发布的正文内容') parser.add_argument('--images', type=str, nargs='+', help='要上传的图片路径列表') parser.add_argument('--videos', type=str, nargs='+', help='要上传的视频路径列表') parser.add_argument('--phone', type=str, help='登录用的手机号') parser.add_argument('--country_code', type=str, default='+86', help='国家区号') parser.add_argument('--slow_mode', action='store_true', help='慢速模式,增加等待时间') parser.add_argument('--verification_code', type=str, help='验证码,用于登录后的验证') args = parser.parse_args() poster = None try: logger.info("初始化XiaohongshuPoster...") poster = XiaohongshuPoster() logger.info("初始化完成") if args.mode == 'login': if not args.phone: logger.info("登录模式需要提供手机号,请使用 --phone 参数") exit(1) logger.info(f"尝试登录,手机号: {args.phone}") success, message = poster.login(args.phone, args.country_code) logger.info(f"登录结果: {success}, {message}") # 如果提供了验证码,则进行验证 if args.verification_code: success, message = poster.wait_for_verify_code(args.verification_code) logger.info(f"验证结果: {success}, {message}") elif args.mode == 'post_image': logger.info(f"尝试发布图文,标题: {args.title}") success, message = poster.login_to_publish(args.title, args.content, args.images, args.slow_mode) logger.info(f"图文发布结果: {success}, {message}") elif args.mode == 'post_video': logger.info(f"尝试发布视频,标题: {args.title}") success, message = poster.login_to_publish_video(args.title, args.content, args.videos, args.slow_mode) logger.info(f"视频发布结果: {success}, {message}") logger.info("测试完成") except Exception as e: import traceback logger.info(f"测试过程中发生错误: {e}") logger.info(traceback.format_exc()) finally: if poster: logger.info("关闭浏览器...") try: poster.close() logger.info("浏览器已关闭") except Exception as e: logger.info(f"关闭浏览器时发生错误: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fancyboi999/xhs-auto-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server