Skip to main content
Glama
xiaochang0303

Chinese Tourism Spots MCP Server

liulanqi.py12.6 kB
import datetime from operator import index import traceback from selenium import webdriver from time import sleep from selenium.webdriver.common.desired_capabilities import DesiredCapabilities import selenium from selenium import webdriver import pathlib import time from selenium.webdriver.common.keys import Keys from selenium.webdriver.chrome.options import Options import json import os from selenium.webdriver.chromium.remote_connection import ChromiumRemoteConnection # Use local project directory by default ROOT_PATH = os.getenv( "ROOT_PATH", os.path.join(os.getcwd(), "data_storage")) VIDEO_PATH = os.path.join(ROOT_PATH, "output") COOKING_PATH = os.path.join(ROOT_PATH, "cooking") # Ensure directories exist if not os.path.exists(COOKING_PATH): os.makedirs(COOKING_PATH) if not os.path.exists(VIDEO_PATH): os.makedirs(VIDEO_PATH) COOKING_TXT = os.path.join(COOKING_PATH, "douyin.txt") agent = 'Mozilla/5.0 (Macintosh; Linux) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36' isDingShi = os.getenv("IS_DINGSHI", True) def get_driver(): chrome_options = webdriver.ChromeOptions() chrome_options.add_argument('--no-sandbox') # 解决DevToolsActivePort文件不存在的报错 chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_experimental_option( 'excludeSwitches', ['enable-automation']) chrome_options.add_argument(f'user-agent={agent}') chrome_options.add_experimental_option('useAutomationExtension', False) chrome_options.add_argument( "--disable-blink-features=AutomationControlled") # driver = webdriver.Remote( # command_executor="http://101.43.210.78:50000", # options=chrome_options # ) # Use local driver if remote fails or for easier testing driver = webdriver.Chrome(options=chrome_options) driver.maximize_window() # driver = webdriver.Remote( # command_executor= ChromiumRemoteConnection(remote_server_addr='http://101.43.210.78:50000',vendor_prefix='-webkit-',browser_name="CHROME"), # desired_capabilities=chrome_options.to_capabilities() # ) # with open('/workspaces/notes/python/douyin/stealth.min.js') as f: # js = f.read() # driver.execute("executeCdpCommand", { # 'cmd': "Page.addScriptToEvaluateOnNewDocument", 'params': { # "source": js # }}) print("链接上") return driver def wait_login(driver): driver.get("https://creator.douyin.com/") time.sleep(2) driver.find_element("xpath", '//*[text()="登录"]').click() time.sleep(2) driver.find_element("xpath", '//*[text()="确认"]').click() time.sleep(2) # 手机登录 # driver.find_element( # "xpath", '//*[text()="手机号登录"]').click() # time.sleep(10) # driver.find_element( # "xpath", '//*[@id="dialog-0"]/div/div[2]/div/div[2]/div[2]/div/form/div[3]/span').click() # # driver.find_element("xpath",'//input[@type="file"]').send_keys(path_mp4) # driver.find_element( # "xpath", '//*[@placeholder="请输入手机号"]').send_keys("") # time.sleep(3) # # driver.find_element( # # "xpath", '//*[@placeholder="请输入密码"]').send_keys("") # time.sleep(3) # driver.find_element( # "xpath", '//*[@id="dialog-0"]/div/div[2]/div/div[2]/div[2]/div/form/div[4]/img').click() # time.sleep(3) # driver.find_element( # "xpath", '//*[@id="dialog-0"]/div/div[2]/div/div[2]/div[2]/div/form/button').click() print("等待登录") # 延迟一会,此时你需要登录,60秒应该是够操作了 time.sleep(60) # 读取cook cook = driver.get_cookies() # 保存cook,我是写到txt文件的,后期可以写成http的,收集大量的,然后就可以*****(你懂的)*** with open(COOKING_TXT, 'w') as f: f.write(json.dumps(cook, ensure_ascii=True)) # 格式转化,这不管你是学的那种语言都必须要知道的 def login(driver): if os.path.exists(COOKING_TXT): get_cookie(driver) else: try: wait_login(driver) except Exception as e: traceback.print_exc() wait_login(driver) def get_map4(): # 基本信息 # 视频存放路径 mp4_result = [] catalog_mp4 = VIDEO_PATH # 视频描述 path = pathlib.Path(catalog_mp4) # 视频地址获取 for path_mp4 in path.iterdir(): if(".mp4" in str(path_mp4)): map4_path = str(path_mp4) mp4_result.append((map4_path, path_mp4.name)) if(len(mp4_result) > 0): print("检查到视频路径:", mp4_result) else: print("未检查到视频路径,程序终止!") exit() mp4_result.sort() return mp4_result # 封面地址获取 # path_cover = "" # for i in path.iterdir(): # if(".png" in str(i) or ".jpg" in str(i)): # path_cover = str(i); # break; # if(path_cover != ""): # print("检查到封面路径:" + path_cover) # else: # print("未检查到封面路径,程序终止!") # exit() def get_cookie(driver): with open(COOKING_TXT) as f: data = json.loads(f.read()) # 打开浏览器 time.sleep(2) # 打开网址 driver.get("https://creator.douyin.com/creator-micro/home") driver.implicitly_wait(10) # 清楚cook driver.delete_all_cookies() time.sleep(8) # 遍历cook print("加载cookie") for cookie in data: if 'expiry' in cookie: del cookie["expiry"] # 添加cook driver.add_cookie(cookie) time.sleep(5) # 刷新 print("开始刷新") driver.refresh() def get_publish_date(title, index): # 代表的是 加一天时间 time_long = int(index/3) * 24 now = datetime.datetime.today() if(now.hour > 20): time_long = 24 tomorrowemp = now + datetime.timedelta(hours=time_long) print("title:", title) # 暂时注释掉+ datetime.timedelta(hours = 24) if title.find("(1)") > 0 or title.find("(4)") > 0 or title.find("(7)") > 0: tomorrow = tomorrowemp.replace(hour=8, minute=0, second=0) elif title.find("(2)") > 0 or title.find("(5)") > 0: tomorrow = tomorrowemp.replace(hour=12, minute=0, second=0) elif title.find("(3)") > 0 or title.find("(6)") > 0: tomorrow = tomorrowemp.replace(hour=18, minute=0, second=0) print("准备写入的时间是:", tomorrow) if(tomorrow <= now): tomorrow = now + \ datetime.timedelta(hours=2) + datetime.timedelta(hours=1*index) print("输出的时间是:", tomorrow.strftime("%Y-%m-%d %H:%M")) return tomorrow.strftime("%Y-%m-%d %H:%M") def publish_douyin(driver, mp4, index): ''' 作用:发布抖音视频 ''' # 进入创作者页面,并上传视频 time.sleep(2) print("开始点击体验") try: driver.find_element("xpath", '//*[text()="开始体验"]').click() time.sleep(3) driver.find_element("xpath", '//*[text()="下一步"]').click() time.sleep(3) driver.find_element("xpath", '//*[text()="完成"]').click() time.sleep(3) except Exception as e: traceback.print_exc() print("开始点击发布视频") driver.find_element("xpath", '//*[text()="发布视频"]').click() time.sleep(5) print("加载视频", mp4[1]) driver.find_element("xpath", '//input[@type="file"]').send_keys(mp4[0]) time.sleep(3) print("开始输入描述") try: driver.find_element("xpath", '//*[text()="我知道了"]').click() time.sleep(3) except Exception as e: traceback.print_exc() # 添加封面 # driver.find_element("xpath", '//*[text()="编辑封面"]').click() # time.sleep(1) # driver.find_element("xpath", '//div[text()="上传封面"]').click() # time.sleep(1) # driver.find_element("xpath",'//input[@type="file"]').send_keys(path_cover) # time.sleep(3) # driver.find_element( # "xpath", '//*[text()="裁剪封面"]/..//*[text()="确定"]').click() # time.sleep(3) # driver.find_element( # "xpath", '//*[text()="设置封面"]/..//*[contains(@class,"upload")]//*[text()="确定"]').click() time.sleep(5) # 输入视频描述 input_text = driver.find_element( "xpath", "//div[@data-placeholder='写一个合适的标题,能让更多人看到']") title = mp4[1].replace(".mp4", "") input_text.send_keys(title + " #小说推荐 ") time.sleep(4) input_text.send_keys(" #日常推文 ") time.sleep(4) input_text.send_keys(" #甜文 ") # time.sleep(4) # input_text.send_keys(" #虐文虐心 ") time.sleep(4) input_text.send_keys(" #言情 ") # 设置选项 time.sleep(4) driver.find_element("xpath", '//*[@class="radio--4Gpx6"]').click() time.sleep(1) driver.find_element( "xpath", '//*[@class="semi-select-selection"]//span[contains(text(),"输入")]').click() time.sleep(1) driver.find_element( "xpath", '//*[@class="semi-select-selection"]//input').send_keys("小南庄25号") time.sleep(1) driver.find_element( "xpath", '//*[@class="semi-select-selection"]//input').send_keys("院") time.sleep(3) try: driver.find_element( "xpath", '//*[@class="detail--2prVy"]').click() except Exception as e: traceback.print_exc() # 同步到西瓜视频 # time.sleep(1) # # driver.find_element("xpath",'//div[@class="preview--27Xrt"]//input').click() # 默认启用一次后,后面默认启用了。 # time.sleep(1) # driver.find_element("xpath", '//*[@class="card-pen--2P8rh"]').click() # time.sleep(1) # driver.find_element( # "xpath", '//*[@class="DraftEditor-root"]//br').send_keys("测试下" + " #上热门") # time.sleep(1) # driver.find_element("xpath", '//button[text()="确定"]').click() # 定时发布radio--4Gpx6 one-line--2rHu9 if isDingShi: print("定时发布") time.sleep(5) dingshi = driver.find_elements( "xpath", '//*[@class="radio--4Gpx6 one-line--2rHu9"]') time.sleep(4) print("点击定时发布") dingshi[1].click() #driver.find_elements("xpath", '//*[@class="radio--4Gpx6 one-line--2rHu9"]')[1].click() time.sleep(3) input_data = driver.find_element("xpath", '//*[@placeholder="日期和时间"]') input_data.send_keys(Keys.CONTROL, 'a') # 全选 # input_data.send_keys(Keys.DELETE) time.sleep(3) input_data.send_keys(get_publish_date(title, index)) # 等待视频上传完成,放到最后,这一步是最慢的. times = 10 while True: time.sleep(10) try: driver.find_element("xpath", '//*[text()="重新上传"]') times -= 1 break except Exception as e: if times == 0: raise ValueError("需要重新重试") print("视频还在上传中···") print("视频已上传完成!") # 点击发布 driver.find_element("xpath", '//button[text()="发布"]').click() time.sleep(3) try: driver.find_element("xpath", '//*[text()="暂不同步"]').click() except Exception as e: traceback.print_exc() print("上传结束") time.sleep(10) # 开始执行视频发布 # publish_douyin(driver=driver) def run(driver): try: login(driver=driver) mp4s = get_map4() mp4s_len = len(mp4s) index = 0 while index < mp4s_len: try: publish_douyin(driver, mp4s[index], index) except Exception as e: traceback.print_exc() index -= 1 finally: print(f"mp4长度是{mp4s_len},当前是:{index}") index += 1 time.sleep(10) finally: driver.quit() if __name__ == "__main__": try: driver = get_driver() login(driver=driver) mp4s = get_map4() mp4s_len = len(mp4s) index = 0 while index < mp4s_len: try: publish_douyin(driver, mp4s[index], index) except Exception as e: traceback.print_exc() index -= 1 finally: index += 1 print(f"mp4长度是{mp4s_len},当前是:{index}") time.sleep(10) finally: driver.quit()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/xiaochang0303/MCPProject'

If you have feedback or need assistance with the MCP directory API, please join our Discord server