Skip to main content
Glama
tiktok_client.py35.1 kB
from TikTokApi import TikTokApi import asyncio import os from dotenv import load_dotenv import logging import json from typing import Optional, List, Dict, Any import aiohttp import backoff from functools import lru_cache import time import random # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # NYC Locations for rotation - All within ~2-3 blocks in Financial District NYC_LOCATIONS = [ # Base location - 23 Wall Street {"latitude": 40.7075, "longitude": -74.0021, "accuracy": 20, "name": "Wall & Broad"}, # Around the corner - Federal Hall {"latitude": 40.7073, "longitude": -74.0102, "accuracy": 20, "name": "Nassau Street"}, # Down the block - Near NYSE {"latitude": 40.7069, "longitude": -74.0113, "accuracy": 20, "name": "NYSE Area"}, # Slight variation - Near Chase Plaza {"latitude": 40.7077, "longitude": -74.0107, "accuracy": 20, "name": "Chase Plaza"}, # Small movement - Near Trinity Church {"latitude": 40.7081, "longitude": -74.0119, "accuracy": 20, "name": "Trinity Church"} ] # Track last used location for realistic movement _last_location_index = 0 # Browser configurations for rotation BROWSER_CONFIGS = [ { "browser": "firefox", "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:123.0) Gecko/20100101 Firefox/123.0", "viewport": {"width": 1920, "height": 1080} }, { "browser": "webkit", "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3.1 Safari/605.1.15", "viewport": {"width": 2560, "height": 1440} }, { "browser": "chromium", "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "viewport": {"width": 1680, "height": 1050} } ] class TikTokClient: def __init__(self): """Initialize the TikTok client.""" self.api = None self.session = None self.browser = None # Get environment variables self.ms_token = os.environ.get('ms_token') self.proxy = os.environ.get('TIKTOK_PROXY') if not self.ms_token: logger.warning("ms_token not found in environment variables. TikTok API functionality will be limited.") async def _init_api(self): """Initialize the TikTokApi instance.""" try: # Import here to avoid startup issues if TikTokApi isn't installed from TikTokApi import TikTokApi logger.info("Creating new TikTokApi instance...") self.api = TikTokApi() logger.info("TikTokApi instance created successfully") # Initialize session with ms_token logger.info("Creating TikTok session...") # Skip version check since pkg_resources is not available logger.info("TikTokApi version check skipped (pkg_resources not available)") # Try to create sessions try: if self.ms_token: self.session = await self.api.create_sessions( ms_tokens=[self.ms_token], num_sessions=1, headless=True ) else: self.session = await self.api.create_sessions( num_sessions=1, headless=True ) logger.info("TikTok session created successfully") except Exception as e: logger.error(f"Error creating sessions: {str(e)}") # Even if session creation fails, we still have the API object # Return True even if some steps failed - we can still use HTTP fallback return True except Exception as e: logger.error(f"Error initializing TikTok API: {str(e)}") self.api = None return False # Explicitly return False on failure async def close(self): """Close the TikTok session and browser.""" try: if self.browser: await self.browser.close() logger.info("Browser closed successfully") if self.session: for session in self.session: await session.close() logger.info("TikTok session closed successfully") except Exception as e: logger.error(f"Error closing TikTok session: {e}") async def search_videos(self, search_term: str, count: int = 30) -> List[Dict[str, Any]]: """ Search for TikTok videos by keyword or hashtag. Handles both search_by_keywords and search_for_hashtag based on input. """ try: # Initialize API if needed if not self.api: await self._init_api() if not self.api: raise RuntimeError("Failed to initialize TikTok API") # Determine if this is a hashtag search is_hashtag = search_term.startswith('#') clean_term = search_term.lstrip('#') logger.info(f"Searching for {'hashtag' if is_hashtag else 'keyword'}: {clean_term}") if is_hashtag: # Get hashtag info first logger.info(f"Getting hashtag info for #{clean_term}...") try: hashtag_info = await self.api.hashtag(name=clean_term).info() logger.info(f"Hashtag info received: {json.dumps(hashtag_info, indent=2)}") # Wait a moment before fetching videos to avoid rate limiting logger.info("Waiting before fetching videos...") await asyncio.sleep(1.5) # Now fetch videos for this hashtag logger.info(f"Fetching videos for hashtag #{clean_term}...") hashtag_id = hashtag_info.get('challengeInfo', {}).get('challenge', {}).get('id') if not hashtag_id: logger.error("Could not find hashtag ID in the response") return [] logger.info(f"Got hashtag ID: {hashtag_id}") # Fetch videos with the hashtag videos = [] challenge = self.api.challenge(id=hashtag_id) # Get videos and convert to dict async for video in challenge.videos(count=count): videos.append(video.as_dict) logger.info(f"Successfully retrieved {len(videos)} videos for hashtag #{clean_term}") return videos except Exception as e: logger.error(f"Error in hashtag search: {str(e)}") return [] else: # Regular keyword search videos = [] logger.info(f"Searching for keyword: {clean_term}") # Use the search API search_obj = self.api.search.videos(clean_term, count=count) async for video in search_obj: videos.append(video.as_dict) logger.info(f"Successfully retrieved {len(videos)} videos for keyword search: {clean_term}") return videos except Exception as e: logger.error(f"Error searching videos: {str(e)}") logger.error(f"Error type: {type(e)}") return [] async def get_trending(self, count: int = 30) -> List[Dict[str, Any]]: """Get trending TikTok videos.""" try: # Initialize API if needed if not self.api: await self._init_api() if not self.api: raise RuntimeError("Failed to initialize TikTok API") logger.info(f"Fetching {count} trending videos...") # Get trending videos videos = [] async for video in self.api.trending.videos(count=count): videos.append(video.as_dict) logger.info(f"Successfully retrieved {len(videos)} trending videos") return videos except Exception as e: logger.error(f"Error fetching trending videos: {str(e)}") logger.error(f"Error type: {type(e)}") return [] async def get_hashtag_info(self, hashtag: str) -> Dict[str, Any]: """Get information about a specific hashtag.""" try: # Initialize API if needed if not self.api: await self._init_api() if not self.api: raise RuntimeError("Failed to initialize TikTok API") # Format hashtag if needed clean_hashtag = hashtag.strip() if clean_hashtag.startswith('#'): clean_hashtag = clean_hashtag[1:] logger.info(f"Getting hashtag info for: #{clean_hashtag}") hashtag_info = await self.api.hashtag(name=clean_hashtag).info() return hashtag_info except Exception as e: logger.error(f"Error getting hashtag info: {str(e)}") logger.error(f"Error type: {type(e)}") return {} @backoff.on_exception( backoff.expo, (Exception), max_tries=3, max_time=30 ) async def _make_request(self, func, *args, **kwargs): """Make an API request with retry logic""" if not self.api: await self._init_api() if not self.api: raise RuntimeError("Failed to initialize TikTok API") try: return await func(*args, **kwargs) except Exception as e: logger.error(f"API request failed: {e}") # Force reinitialization on next request self.last_init_time = 0 raise @lru_cache(maxsize=100) async def get_trending_videos(self, count: int = 30) -> List[Dict[str, Any]]: """Get trending videos with caching""" videos = [] try: # Initialize API if needed if not self.api: await self._init_api() if not self.api: raise RuntimeError("Failed to initialize TikTok API") # Get trending videos async for video in self.api.trending.videos(count=count): videos.append(video.as_dict) except Exception as e: logger.error(f"Failed to get trending videos: {e}") raise return videos async def search_by_keywords(self, keywords, count=30): """Search TikTok for videos by keywords or hashtags""" logger.info(f"Searching by keywords: {keywords}") try: # Initialize API if needed if not self.api: await self._init_api() if not self.api: raise RuntimeError("Failed to initialize TikTok API") # Format hashtag if needed search_term = keywords.strip() if search_term.startswith('#'): tag = search_term[1:] # Remove the # for API call logger.info(f"Searching for hashtag: #{tag}") try: # First try hashtag search videos = [] # Get hashtag info hashtag = await self.api.hashtag(name=tag).info() logger.info(f"Found hashtag info with id: {hashtag.get('challengeInfo', {}).get('challenge', {}).get('id')}") # Get videos for the hashtag hashtag_videos = self.api.hashtag(id=hashtag.get('challengeInfo', {}).get('challenge', {}).get('id')) # Collect videos async for video in hashtag_videos.videos(count=count): videos.append(video.as_dict) if len(videos) >= count: break logger.info(f"Found {len(videos)} videos for hashtag #{tag}") return videos except Exception as e: logger.error(f"Error in hashtag search: {str(e)}") # Fall back to keyword search logger.info(f"Falling back to keyword search for: {search_term}") # Use keyword search logger.info(f"Performing keyword search for: {search_term}") videos = [] async for video in self.api.search.videos(search_term, count=count): videos.append(video.as_dict) if len(videos) >= count: break logger.info(f"Found {len(videos)} videos for keyword search: {search_term}") return videos except Exception as e: logger.error(f"Search by keywords failed: {str(e)}") logger.error(f"Error type: {type(e).__name__}") return [] # Return empty list instead of raising to make function more robust async def _ensure_api_initialized(self): """Ensure API is initialized""" if not self.api: await self._init_api() await asyncio.sleep(2) # Wait for API to be fully ready async def _handle_api_error(self, func_name, e): # Implement exponential backoff logic here logger.error(f"Error in {func_name}: {str(e)}") # Placeholder for exponential backoff logic await asyncio.sleep(1) # Wait between retries return await self._init_api() # Alternative approach using direct HTTP requests async def search_videos_http(self, search_term, count=30): """Search for videos using direct HTTP requests instead of browser automation""" try: # Clean the search term clean_term = search_term.strip() if clean_term.startswith('#'): clean_term = clean_term[1:] logger.info(f"HTTP Search: Searching for term '{clean_term}' with count {count}") # TikTok requires these specific headers to work properly headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Accept": "application/json, text/plain, */*", "Accept-Language": "en-US,en;q=0.9", "Referer": "https://www.tiktok.com/search?q=" + clean_term, # Important: Include search term in referer "Origin": "https://www.tiktok.com", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "Connection": "keep-alive" } # Include cookies if ms_token is available if self.ms_token: headers["Cookie"] = f"msToken={self.ms_token}; tt_csrf_token=abcd1234; ttwid=random_ttwid_value" # TikTok requires specific search parameters params = { "aid": "1988", "app_language": "en", "app_name": "tiktok_web", "battery_info": "1", "browser_language": "en-US", "browser_name": "Mozilla", "browser_online": "true", "browser_platform": "MacIntel", "browser_version": "5.0 (Macintosh)", "channel": "tiktok_web", "cookie_enabled": "true", "device_id": f"{int(time.time() * 1000)}", "device_platform": "web_pc", "focus_state": "true", "from_page": "search", "history_len": "2", "is_fullscreen": "false", "is_page_visible": "true", "keyword": clean_term, "count": str(count), "cursor": "0", "os": "mac", "priority_region": "US", "referer": f"https://www.tiktok.com/search/video?q={clean_term}", "region": "US", "screen_height": "1080", "screen_width": "1920", "type": "1" # 1 for videos } # Try three different search methods search_methods = [ { "name": "trending method", "url": "https://www.tiktok.com/api/recommend/item_list/", "params": { "aid": "1988", "count": str(count), "app_language": "en", "device_platform": "web_pc" } }, { "name": "hashtag method", "url": f"https://www.tiktok.com/api/challenge/item_list/", "params": { "aid": "1988", "challengeID": clean_term, "count": str(count) } }, { "name": "search method", "url": "https://www.tiktok.com/api/search/item/", "params": params } ] for method in search_methods: logger.info(f"Trying {method['name']} at {method['url']}") async with aiohttp.ClientSession() as session: async with session.get(method['url'], params=method['params'], headers=headers) as response: logger.info(f"{method['name']} status code: {response.status}") if response.status == 200: data = await response.json() videos = [] if "itemList" in data: videos = data["itemList"] elif "items" in data: videos = data["items"] elif "item_list" in data: videos = data["item_list"] logger.info(f"Found {len(videos)} videos with {method['name']}") if videos: # Format the videos formatted_videos = [] for video in videos: formatted_videos.append({ "id": video.get("id", ""), "desc": video.get("desc", ""), "author": { "uniqueId": video.get("author", {}).get("uniqueId", ""), "nickname": video.get("author", {}).get("nickname", "") }, "stats": { "playCount": video.get("stats", {}).get("playCount", 0), "diggCount": video.get("stats", {}).get("diggCount", 0), "commentCount": video.get("stats", {}).get("commentCount", 0), "shareCount": video.get("stats", {}).get("shareCount", 0) } }) logger.info(f"Successfully formatted {len(formatted_videos)} videos") return formatted_videos # If we get here, none of the methods worked logger.warning("All HTTP search methods failed") # As a last resort, try scraping the web page directly return await self._scrape_search_results(clean_term, count) except Exception as e: logger.error(f"HTTP Search: Error searching for '{search_term}': {str(e)}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") return [] async def _scrape_search_results(self, term, count=30): """Last resort method - directly scrape search results from the TikTok website""" logger.info(f"Using web scraping as last resort for '{term}'") # URL for direct TikTok search url = f"https://www.tiktok.com/search/video?q={term}" headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1" } try: import re from bs4 import BeautifulSoup async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status != 200: logger.error(f"Scraping failed with status {response.status}") return [] html = await response.text() logger.info(f"Received HTML page of length {len(html)}") # Extract video data from HTML # Look for SIGI_STATE or other JSON data embedded in the page json_data = None # Find SIGI_STATE match = re.search(r'window\[[\'\"]SIGI_STATE[\'\"]\]\s*=\s*(\{.+?\});\s*window\[[\'\"]SIGI_RETRY[\'\"]', html, re.DOTALL) if match: try: json_data = json.loads(match.group(1)) logger.info("Found SIGI_STATE data") except json.JSONDecodeError: logger.error("Failed to parse SIGI_STATE JSON") if not json_data: # Try to find ItemModule match = re.search(r'\"ItemModule\"\s*:\s*(\{.+?\}),\s*\"ItemList\"', html, re.DOTALL) if match: try: module_data = "{" + match.group(1) + "}" json_data = {"ItemModule": json.loads(module_data)} logger.info("Found ItemModule data") except json.JSONDecodeError: logger.error("Failed to parse ItemModule JSON") # Extract videos from the JSON data videos = [] if json_data and "ItemModule" in json_data: for video_id, video_data in json_data["ItemModule"].items(): videos.append({ "id": video_id, "desc": video_data.get("desc", ""), "author": { "uniqueId": video_data.get("author", ""), "nickname": video_data.get("nickname", "") }, "stats": { "playCount": video_data.get("stats", {}).get("playCount", 0), "diggCount": video_data.get("stats", {}).get("diggCount", 0), "commentCount": video_data.get("stats", {}).get("commentCount", 0), "shareCount": video_data.get("stats", {}).get("shareCount", 0) } }) if len(videos) >= count: break logger.info(f"Scraped {len(videos)} videos") return videos except Exception as e: logger.error(f"Error during web scraping: {str(e)}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") return [] async def search_videos_sigi(self, search_term, count=30): """Search for videos using TikTok's SIGI_STATE API - an alternative approach""" try: # Clean the search term clean_term = search_term.strip() if clean_term.startswith('#'): clean_term = clean_term[1:] logger.info(f"SIGI Search: Searching for term '{clean_term}'") # Build the search URL url = f"https://www.tiktok.com/tag/{clean_term}" headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml", "Accept-Language": "en-US,en;q=0.9", "Cookie": f"msToken={self.ms_token}" if self.ms_token else "" } async with aiohttp.ClientSession() as session: logger.info(f"Making request to {url}") async with session.get(url, headers=headers) as response: if response.status != 200: logger.error(f"SIGI Search: Status code {response.status}") return [] # Get the HTML content html = await response.text() logger.info(f"Received HTML response of length {len(html)}") # Extract SIGI_STATE JSON try: import re match = re.search(r'window\[\'SIGI_STATE\'\]=(.*?);', html) if not match: logger.error("SIGI_STATE not found in response") return [] # Parse the JSON data import json json_data = match.group(1) data = json.loads(json_data) logger.info(f"SIGI_STATE keys: {list(data.keys())}") # Extract videos from the data videos = [] if 'ItemModule' in data: for video_id, video_data in data['ItemModule'].items(): videos.append({ "id": video_id, "desc": video_data.get("desc", ""), "author": { "uniqueId": video_data.get("author", ""), "nickname": video_data.get("nickname", "") }, "stats": { "playCount": video_data.get("stats", {}).get("playCount", 0), "diggCount": video_data.get("stats", {}).get("diggCount", 0), "commentCount": video_data.get("stats", {}).get("commentCount", 0), "shareCount": video_data.get("stats", {}).get("shareCount", 0) } }) if len(videos) >= count: break logger.info(f"SIGI Search: Found {len(videos)} videos") return videos except Exception as e: logger.error(f"Error parsing SIGI_STATE: {str(e)}") return [] except Exception as e: logger.error(f"SIGI Search: Error {str(e)}") return [] async def get_mock_search_results(self, term, count=30): """Provide mock search results when all other methods fail""" logger.warning(f"Using MOCK search results for term '{term}'") # Clean term for use in descriptions clean_term = term.lstrip('#').lower() # Define topic-specific content based on search term topic_content = { "tariff": "discussing the economic impact of new tariffs on global trade", "trump": "covering Trump's latest campaign events and policy announcements", "biden": "analyzing President Biden's recent decisions and policy changes", "viral": "that's getting millions of views across TikTok right now", "cooking": "showing a delicious recipe that's easy to make at home", "workout": "demonstrating effective exercises for strength training", "finance": "explaining how to save money and build wealth", "news": "breaking down today's top headlines and current events" } # Find the most relevant topic content or use a generic one content_description = None for keyword, description in topic_content.items(): if keyword in clean_term: content_description = description break if not content_description: content_description = f"about {clean_term} that's trending on TikTok" # Create more realistic usernames related to the topic usernames_by_topic = { "tariff": ["economistDaily", "tradeExpert", "globalEcon", "marketWatcher"], "trump": ["politicalAnalyst", "conservativeVoice", "newsUpdates", "politicsToday"], "biden": ["whitehouseReporter", "politicalNews", "dcInsider", "politicsTalk"], "viral": ["trendingNow", "viralCreator", "tiktokStar", "contentKing"], "default": ["user123456", "creator789", "tiktokUser", "contentCreator"] } # Choose relevant usernames or default ones usernames = usernames_by_topic.get("default") for topic, names in usernames_by_topic.items(): if topic in clean_term: usernames = names break # Create base videos with more realistic and relevant content base_videos = [ { "id": f"734126227922467{random.randint(1000, 9999)}", "desc": f"#{clean_term} video {content_description} #viral #trending", "author": { "uniqueId": usernames[0], "nickname": usernames[0].replace("_", " ").title() }, "stats": { "playCount": random.randint(800000, 2500000), "diggCount": random.randint(100000, 350000), "commentCount": random.randint(2000, 5000), "shareCount": random.randint(3000, 8000) } }, { "id": f"734217893612477{random.randint(1000, 9999)}", "desc": f"Check out this #{clean_term} content {content_description} #fyp #foryou", "author": { "uniqueId": usernames[1], "nickname": usernames[1].replace("_", " ").title() }, "stats": { "playCount": random.randint(1500000, 3000000), "diggCount": random.randint(200000, 450000), "commentCount": random.randint(3000, 6000), "shareCount": random.randint(5000, 10000) } }, { "id": f"734388739183726{random.randint(1000, 9999)}", "desc": f"Latest update on #{clean_term} - {content_description} #tiktok #trending", "author": { "uniqueId": usernames[2], "nickname": usernames[2].replace("_", " ").title() }, "stats": { "playCount": random.randint(500000, 1200000), "diggCount": random.randint(80000, 150000), "commentCount": random.randint(1500, 3500), "shareCount": random.randint(2000, 4500) } } ] # Generate the videos videos = [] for i in range(min(count, 10)): # Max 10 mock videos # Clone one of the base videos and modify it idx = i % len(base_videos) video = {k: v for k, v in base_videos[idx].items()} if "author" in video: video["author"] = {k: v for k, v in base_videos[idx]["author"].items()} if "stats" in video: video["stats"] = {k: v for k, v in base_videos[idx]["stats"].items()} # Customize for search term and add variety video["id"] = f"{video['id'][:-4]}{random.randint(1000, 9999)}" # Add variety to stats if "stats" in video: video["stats"]["playCount"] += random.randint(-200000, 200000) video["stats"]["diggCount"] += random.randint(-50000, 50000) video["stats"]["commentCount"] += random.randint(-500, 500) videos.append(video) return videos

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/terrylinhaochen/tiktok_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server