Skip to main content
Glama

rapidapi

by SecurFi
twitter_crawler.py7.88 kB
import csv import json import os import time import logging import requests from typing import List, Dict, Optional from datetime import datetime # # 配置日志 # logging.basicConfig( # level=logging.INFO, # format='%(asctime)s - %(levelname)s - %(message)s', # handlers=[ # logging.FileHandler('twitter_crawler.log'), # logging.StreamHandler() # ] # ) class TwitterCrawler: def __init__(self): self.api_host = 'twitter-api45.p.rapidapi.com' self.api_key = '66dc774248msh5470cb9692d299cp1ac178jsn3f1836abf93f' self.headers = { 'x-rapidapi-host': self.api_host, 'x-rapidapi-key': self.api_key } # self.data_dir = 'data' # if not os.path.exists(self.data_dir): # os.makedirs(self.data_dir) def read_kols(self, limit: int = 3) -> List[Dict]: """读取KOL信息""" kols = [] try: with open('kols.csv', 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for i, row in enumerate(reader): if i >= limit: break kols.append({ 'Username': row['Username'], 'TwitterUserURL': row['TwitterUserURL'] }) # logging.info(f'成功读取{len(kols)}条KOL信息') return kols except Exception as e: # logging.error(f'读取KOL信息失败: {str(e)}') return [] def get_timeline(self, username: str, cursor: Optional[str] = None) -> Dict: """获取用户时间线数据""" # logging.info(f'getTimeline params: {username} {cursor}') try: params = {'screenname': username} if cursor: params['cursor'] = cursor response = requests.get( 'https://twitter-api45.p.rapidapi.com/timeline.php', headers=self.headers, params=params ) data = response.json() if data.get('timeline'): timeline_info = 'empty array' if not data['timeline'] else \ [tweet.get('tweet_id') for tweet in data['timeline']] # logging.info(f'接口返回结果: {timeline_info}') return data except Exception as e: logging.error(f'获取{username}时间线数据失败: {str(e)}') return {} def clean_tweet_data(self, timeline_data: List[Dict], prev_cursor: str, next_cursor: str) -> List[Dict]: """清理推文数据,提取所需字段""" return [{ 'tweet_id': tweet.get('tweet_id', ''), 'created_at': tweet.get('created_at', ''), 'favorites': tweet.get('favorites', 0), 'bookmarks': tweet.get('bookmarks', 0), 'text': tweet.get('text', ''), 'views': tweet.get('views', '0'), 'quotes': tweet.get('quotes', 0), 'prev_cursor': prev_cursor, 'next_cursor': next_cursor } for tweet in timeline_data] def save_to_csv(self, username: str, data: List[Dict], mode: str = 'w') -> None: """保存数据到CSV文件""" file_path = os.path.join(self.data_dir, f'{username}.csv') try: if not data: return write_header = mode == 'w' or not os.path.exists(file_path) with open(file_path, mode, newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=list(data[0].keys())) if write_header: writer.writeheader() writer.writerows(data) logging.info(f'成功保存{len(data)}条数据到{file_path}') except Exception as e: logging.error(f'保存数据到CSV失败: {str(e)}') def get_base_tweet(self, username: str) -> Optional[Dict]: """获取基准推文数据""" file_path = os.path.join(self.data_dir, f'{username}.csv') try: if not os.path.exists(file_path): return None with open(file_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) return next(reader, None) except Exception as e: logging.error(f'读取基准推文数据失败: {str(e)}') return None def filter_duplicate_data(self, existing_data: List[Dict], new_data: List[Dict]) -> List[Dict]: """过滤重复的推文数据""" existing_ids = set(tweet['tweet_id'] for tweet in existing_data) return [tweet for tweet in new_data if tweet['tweet_id'] not in existing_ids] def crawl_user_timeline(self, username: str, mode: str = 'init') -> None: """抓取用户时间线数据""" all_tweets = [] base_tweet = None next_cursor = None # 如果是更新模式,获取基准推文 if mode == 'update': base_tweet = self.get_base_tweet(username) if not base_tweet: logging.error(f'无法获取{username}的基准推文数据') return while True: data = None new_tweets = [] if mode == 'init': data = self.get_timeline(username, next_cursor) if not data or not data.get('timeline'): logging.info(f'没有更多数据可获取,停止抓取{username}的时间线数据') break # 清理推文数据 new_tweets = self.clean_tweet_data( data['timeline'], data.get('prev_cursor'), data.get('next_cursor') ) all_tweets.extend(new_tweets) # 保存数据 self.save_to_csv( username, new_tweets, 'w' if len(all_tweets) == len(new_tweets) else 'a' ) if len(all_tweets) >= 100: break if mode == 'update': data = self.get_timeline(username, base_tweet['prev_cursor']) if not data or not data.get('timeline'): logging.info(f'没有更多数据可获取,停止抓取{username}的时间线数据') break # 清理推文数据 new_tweets = self.clean_tweet_data( data['timeline'], data.get('prev_cursor'), data.get('next_cursor') ) # 过滤重复数据 existing_data = [self.get_base_tweet( username)] if self.get_base_tweet(username) else [] unique_tweets = self.filter_duplicate_data( existing_data, new_tweets) if unique_tweets: self.save_to_csv(username, unique_tweets, 'a') # 更新游标 next_cursor = data.get('next_cursor') # 添加延迟避免频繁请求 time.sleep(1) logging.info(f'完成{username}的时间线数据处理') def run(self) -> None: """运行爬虫""" # 获取kol信息 kols = self.read_kols(1) if not kols: logging.error('没有找到KOL信息') return for kol in kols: username = kol['Username'] logging.info(f'******处理kol {username} 信息******') # 判断模式 mode = 'update' if self.get_base_tweet(username) else 'init' logging.info(f'采用{mode}模式') # 开始爬取 self.crawl_user_timeline(username, mode) time.sleep(2) # 处理下一个KOL前暂停

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SecurFi/RapidAPI_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server