Skip to main content
Glama

MCP RAG Server

github_extractor.py16.7 kB
import os import re import time import logging import requests from typing import List, Dict, Any, Optional, Tuple from bs4 import BeautifulSoup from github import Github, RateLimitExceededException from github.ContentFile import ContentFile from dotenv import load_dotenv from tqdm import tqdm # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Load environment variables load_dotenv() class GitHubExtractor: def __init__(self, token: Optional[str] = None, output_dir: str = "docs/move_files"): """ Initialize GitHub extractor. Args: token: GitHub personal access token (from env var GITHUB_TOKEN if not provided) output_dir: Directory to save extracted files """ # Use provided token or get from environment self.token = token or os.getenv("GITHUB_TOKEN") if not self.token: logger.warning("No GitHub token provided. Rate limits will be restricted.") # Initialize GitHub API self.github = Github(self.token) if self.token else Github() # Set output directory self.output_dir = output_dir os.makedirs(self.output_dir, exist_ok=True) # Base GitHub search URL self.base_url = "https://github.com/search" def search_code_with_api(self, query: str, language: str = None, extension: str = None, max_results: int = 100) -> List[Dict[str, Any]]: """ Search GitHub for code using the GitHub API. Args: query: Search query string language: Filter by programming language extension: Filter by file extension max_results: Maximum number of results to return Returns: List of code search results with metadata """ try: # Build query string search_query = query if language: search_query += f" language:{language}" if extension: search_query += f" extension:{extension}" logger.info(f"Searching GitHub API with query: {search_query}") # Use the search repositories approach from search_github.py full_query = search_query + ' in:readme+in:description' search_results = self.github.search_repositories(full_query, 'stars', 'desc') logger.info(f"Found {search_results.totalCount} repositories") # Process results results = [] count = 0 for repo in tqdm(range(0, min(search_results.totalCount, max_results))): try: repo_data = search_results[repo] repo_name = repo_data.full_name # Get repository contents logger.info(f"Scanning repository: {repo_name}") # Look for Move files in the repository move_files = self._find_move_files_in_repo(repo_data, extension=extension) for file_data in move_files: if count >= max_results: break results.append(file_data) count += 1 # Respect rate limits time.sleep(2) except RateLimitExceededException: logger.warning("Rate limit exceeded. Sleeping for 60 seconds.") time.sleep(60) continue except Exception as e: logger.warning(f"Error processing repository {repo}: {str(e)}") continue logger.info(f"Found {len(results)} code results") return results except RateLimitExceededException: logger.error("GitHub API rate limit exceeded. Try again later or use a token with higher limits.") return [] except Exception as e: logger.error(f"Error searching GitHub API: {str(e)}") return [] def _find_move_files_in_repo(self, repo, extension: str = "move") -> List[Dict[str, Any]]: """ Find all Move files in a repository. Args: repo: GitHub repository object extension: File extension to search for Returns: List of file data dictionaries """ results = [] # Use recursive traversal to find all files try: contents = repo.get_contents("") while contents: file_content = contents.pop(0) if file_content.type == "dir": # Add directory contents to the stack contents.extend(repo.get_contents(file_content.path)) else: # Check if it's a Move file if file_content.name.endswith(f".{extension}"): try: content = file_content.decoded_content.decode('utf-8') # Check if file contains "use sui" for Move files if extension == "move" and "use sui" not in content.lower(): continue # Create result object result = { 'name': file_content.name, 'path': file_content.path, 'repo': repo.full_name, 'url': file_content.html_url, 'content': content } results.append(result) logger.debug(f"Found {extension} file: {file_content.path}") except Exception as e: logger.warning(f"Error getting content for {file_content.html_url}: {str(e)}") except Exception as e: logger.warning(f"Error traversing repository {repo.full_name}: {str(e)}") return results def search_code_with_scraping(self, query: str, path_pattern: str = None, max_pages: int = 5) -> List[Dict[str, Any]]: """ Search GitHub for code by scraping the GitHub search results page. This is a fallback when API doesn't work or limits are exceeded. Args: query: Search query string path_pattern: Path pattern to filter results (e.g., "*.move") max_pages: Maximum number of pages to scrape Returns: List of code search results with metadata """ results = [] # Build query parameters params = { 'q': query, 'type': 'code', } if path_pattern: params['q'] += f" path:{path_pattern}" logger.info(f"Scraping GitHub search with query: {params['q']}") # Start with page 1 page = 1 while page <= max_pages: try: # Add page parameter params['p'] = page # Send request with proper headers to mimic a browser headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml', 'Accept-Language': 'en-US,en;q=0.9', } response = requests.get(self.base_url, params=params, headers=headers) if response.status_code != 200: logger.error(f"Failed to retrieve search results. Status code: {response.status_code}") break # Parse HTML soup = BeautifulSoup(response.text, 'html.parser') # Find code results code_blocks = soup.select('div.code-list-item') if not code_blocks: logger.info(f"No more results found on page {page}") break # Process each code block for block in code_blocks: try: # Extract repository info repo_element = block.select_one('a.Link--secondary') if not repo_element: continue repo_name = repo_element.text.strip() # Extract file path file_element = block.select_one('a.Link--primary') if not file_element: continue file_path = file_element.text.strip() file_url = f"https://github.com/{repo_name}/blob/master/{file_path}" # Extract the raw URL for content raw_url = f"https://raw.githubusercontent.com/{repo_name}/master/{file_path}" results.append({ 'name': os.path.basename(file_path), 'path': file_path, 'repo': repo_name, 'url': file_url, 'raw_url': raw_url, 'content': None # Will be fetched later }) except Exception as e: logger.warning(f"Error parsing code block: {str(e)}") # Go to next page page += 1 # Respect rate limits with a delay time.sleep(2) except Exception as e: logger.error(f"Error scraping GitHub search page {page}: {str(e)}") break logger.info(f"Found {len(results)} code results through scraping") # Fetch content for each result self._fetch_contents_for_scraped_results(results) return results def _fetch_contents_for_scraped_results(self, results: List[Dict[str, Any]]): """ Fetch file contents for results obtained through scraping. Args: results: List of search results from scraping """ logger.info(f"Fetching content for {len(results)} files...") for result in tqdm(results): try: if 'raw_url' not in result or not result['raw_url']: continue response = requests.get(result['raw_url']) if response.status_code == 200: result['content'] = response.text else: logger.warning(f"Failed to fetch content for {result['raw_url']}. Status: {response.status_code}") # Respect rate limits time.sleep(0.5) except Exception as e: logger.warning(f"Error fetching content for {result.get('raw_url', 'unknown URL')}: {str(e)}") def extract_move_files(self, query: str = "use sui", use_scraping: bool = True, max_results: int = 100) -> List[Dict[str, Any]]: """ Extract .move files from GitHub based on search query. Args: query: Search query use_scraping: Whether to use web scraping as fallback max_results: Maximum number of results to extract Returns: List of extracted file data """ results = [] # Try API first if token is available if self.token and not use_scraping: logger.info("Attempting to use GitHub API...") results = self.search_code_with_api(query, extension="move", max_results=max_results) # Fall back to scraping if API didn't work or explicitly requested if not results and use_scraping: logger.info("Using web scraping method...") results = self.search_code_with_scraping(query, path_pattern="*.move", max_pages=5) # Filter out results without content valid_results = [r for r in results if r.get('content')] if len(valid_results) < len(results): logger.warning(f"Filtered out {len(results) - len(valid_results)} results without content") return valid_results def download_move_files(self, files: List[Dict[str, Any]]) -> List[str]: """ Download .move files to the output directory. Args: files: List of file data from extract_move_files Returns: List of paths to downloaded files """ downloaded_paths = [] logger.info(f"Downloading {len(files)} .move files to {self.output_dir}") for file_data in tqdm(files): try: # Create subdirectory based on repo name to avoid name conflicts repo_dir = file_data['repo'].replace('/', '_') file_dir = os.path.join(self.output_dir, repo_dir) os.makedirs(file_dir, exist_ok=True) # Create file path file_name = file_data['name'] file_path = os.path.join(file_dir, file_name) # Write content to file with open(file_path, 'w', encoding='utf-8') as f: f.write(file_data['content']) downloaded_paths.append(file_path) except Exception as e: logger.error(f"Error saving file {file_data.get('name', 'unknown')}: {str(e)}") logger.info(f"Successfully downloaded {len(downloaded_paths)} files") return downloaded_paths def _check_rate_limit(self): """Check remaining rate limit and pause if necessary""" if not self.token: # Be conservative with unauthenticated requests time.sleep(2) return rate_limit = self.github.get_rate_limit() remaining = rate_limit.search.remaining if remaining < 10: reset_time = rate_limit.search.reset.timestamp() current_time = time.time() sleep_time = max(1, reset_time - current_time) logger.warning(f"Rate limit low ({remaining} remaining). Sleeping for {sleep_time:.1f} seconds") time.sleep(sleep_time) def extract_and_index_move_files(query: str = "use sui", output_dir: str = "docs/move_files", github_token: Optional[str] = None, use_scraping: bool = True, max_results: int = 100) -> Tuple[int, List[str]]: """ Extract Move files from GitHub and return paths for indexing. Args: query: GitHub search query output_dir: Directory to save files github_token: GitHub API token use_scraping: Whether to use web scraping fallback max_results: Maximum files to extract Returns: Tuple of (number of files, list of file paths) """ # Initialize extractor extractor = GitHubExtractor(token=github_token, output_dir=output_dir) # Extract files files = extractor.extract_move_files(query, use_scraping, max_results) # Download files file_paths = extractor.download_move_files(files) return len(file_paths), file_paths

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ProbonoBonobo/sui-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server