Skip to main content
Glama

MCP RAG Server

search_and_index.py12.1 kB
#!/usr/bin/env python """ Improved script to search GitHub for Move files containing specific keywords, download them, and index them for the MCP Server. """ import os import argparse import logging import time import datetime from typing import List, Dict, Any, Tuple from dotenv import load_dotenv from tqdm import tqdm from github import Github, RateLimitExceededException from mcp_server.utils.document_processor import DocumentProcessor from mcp_server.models.vector_store import FAISSVectorStore from mcp_server.index_move_files import index_move_files # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Load environment variables load_dotenv() def search_github_repositories(token: str, keywords: List[str], max_repos: int = 20) -> Dict[str, List[List[str]]]: """ Search GitHub for repositories using input keywords. Args: token: GitHub access token keywords: List of keywords to search for max_repos: Maximum number of repositories to retrieve per keyword Returns: Dictionary mapping keywords to lists of repository information """ # Initialize GitHub API client github = Github(token) results = {} for keyword in keywords: logger.info(f"Searching GitHub for keyword: {keyword}") # Set up query query = keyword + '+path:*.move' search_results = github.search_repositories(query, 'stars', 'desc') logger.info(f"Found {search_results.totalCount} repositories for '{keyword}'") count = min(search_results.totalCount, max_repos) # Process results results[keyword] = [] for i in tqdm(range(count), desc=f"Processing repos for '{keyword}'"): try: repo = search_results[i] results[keyword].append([repo.name, repo.html_url, repo.description, repo.full_name]) # Respect rate limits time.sleep(1) except RateLimitExceededException: logger.warning("Rate limit exceeded. Sleeping for 60 seconds.") time.sleep(60) try: repo = search_results[i] results[keyword].append([repo.name, repo.html_url, repo.description, repo.full_name]) except Exception as e: logger.error(f"Error retrieving repository {i}: {str(e)}") except Exception as e: logger.error(f"Error processing repository {i}: {str(e)}") return results def find_move_files_in_repos(token: str, repos: List[List[str]], output_dir: str) -> List[str]: """ Find and download Move files from the provided repositories. Args: token: GitHub access token repos: List of repository information [name, url, description, full_name] output_dir: Directory to save downloaded files Returns: List of paths to downloaded Move files """ github = Github(token) downloaded_files = [] logger.info(f"Searching for Move files in {len(repos)} repositories") for repo_info in tqdm(repos, desc="Processing repositories"): try: repo_name = repo_info[3] # full_name from search results logger.info(f"Checking repository: {repo_name}") # Get repository from GitHub API repo = github.get_repo(repo_name) # Create repository-specific directory to avoid filename conflicts repo_dir = os.path.join(output_dir, repo_name.replace('/', '_')) os.makedirs(repo_dir, exist_ok=True) # Walk through repository contents recursively contents = [(repo.get_contents(""), "")] # Stack of (content, path) while contents: current_contents, path_prefix = contents.pop(0) # Handle content list or single item if not isinstance(current_contents, list): current_contents = [current_contents] for content in current_contents: try: if content.type == "dir": # Add directory contents to the stack with updated prefix new_prefix = os.path.join(path_prefix, content.name) try: dir_contents = repo.get_contents(content.path) contents.append((dir_contents, new_prefix)) except Exception as e: logger.warning(f"Error accessing directory {content.path}: {str(e)}") elif content.type == "file" and content.name.endswith(".move"): # Download Move file try: file_content = content.decoded_content.decode("utf-8") # Only keep files with "use sui" reference if "use sui" in file_content.lower(): # Create local path rel_path = os.path.join(path_prefix, content.name) if path_prefix else content.name local_path = os.path.join(repo_dir, rel_path) # Ensure directory exists os.makedirs(os.path.dirname(local_path), exist_ok=True) # Write file with open(local_path, "w", encoding="utf-8") as f: f.write(file_content) downloaded_files.append(local_path) logger.debug(f"Downloaded: {local_path}") except Exception as e: logger.warning(f"Error downloading file {content.path}: {str(e)}") except Exception as e: logger.warning(f"Error processing content: {str(e)}") # Respect rate limits time.sleep(0.5) except RateLimitExceededException: logger.warning("Rate limit exceeded. Sleeping for 60 seconds.") time.sleep(60) except Exception as e: logger.error(f"Error processing repository {repo_info[0]}: {str(e)}") logger.info(f"Downloaded {len(downloaded_files)} Move files") return downloaded_files def main(): """Entry point for the script.""" parser = argparse.ArgumentParser( description="Search GitHub for Move files with keywords and index them for MCP Server" ) parser.add_argument( "--keywords", default="sui move,move framework", help="Comma-separated list of keywords to search for (default: 'sui move,move framework')" ) parser.add_argument( "--repo", help="Specific repository to search (format: owner/repo, e.g., MystenLabs/sui)" ) parser.add_argument( "--token", default=os.getenv("GITHUB_TOKEN"), help="GitHub personal access token (default: from GITHUB_TOKEN env var)" ) parser.add_argument( "--output-dir", default="docs/move_files", help="Directory to save downloaded files (default: docs/move_files)" ) parser.add_argument( "--index-file", default="data/faiss_index.bin", help="Path to save/load FAISS index (default: data/faiss_index.bin)" ) parser.add_argument( "--max-repos", type=int, default=20, help="Maximum number of repositories to process per keyword (default: 20)" ) parser.add_argument( "--new-index", action="store_true", help="Create a new index instead of merging with existing one" ) parser.add_argument( "--verbose", action="store_true", help="Enable verbose logging" ) parser.add_argument( "--output-results", action="store_true", help="Save search results to a file" ) args = parser.parse_args() # Set logging level if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Check if GitHub token is provided if not args.token: logger.error("GitHub token is required. Provide with --token or set GITHUB_TOKEN environment variable.") return 1 # Create output directory os.makedirs(args.output_dir, exist_ok=True) # Initialize GitHub API client github = Github(args.token) # Process specific repository if provided if args.repo: try: logger.info(f"Fetching specified repository: {args.repo}") repo = github.get_repo(args.repo) repo_info = [repo.name, repo.html_url, repo.description, repo.full_name] all_repos = [repo_info] logger.info(f"Using specified repository: {repo.full_name} with {repo.stargazers_count} stars") except Exception as e: logger.error(f"Error fetching repository {args.repo}: {str(e)}") return 1 else: # Parse keywords for search keywords = [k.strip() for k in args.keywords.split(",") if k.strip()] if not keywords: logger.error("No valid keywords provided. Use --keywords with comma-separated values.") return 1 # Search GitHub repositories search_results = search_github_repositories(args.token, keywords, args.max_repos) # Flatten repository list (combine results from all keywords) all_repos = [] for keyword, repos in search_results.items(): for repo in repos: # Only add if not already in the list (avoid duplicates across keywords) if not any(r[3] == repo[3] for r in all_repos): # Compare by full_name all_repos.append(repo) logger.info(f"Found {len(all_repos)} unique repositories across all keywords") # Save search results if requested if args.output_results: timestamp = datetime.datetime.now().strftime('%d%b%Y') results_file = f"github_search_results_{timestamp}.txt" with open(results_file, "w") as f_out: for keyword, repos in search_results.items(): for repo in repos: f_out.write(f"{keyword}\t{repo[0]}\t{repo[1]}\t{repo[2]}\n") logger.info(f"Saved search results to {results_file}") # Download Move files from repositories downloaded_files = find_move_files_in_repos(args.token, all_repos, args.output_dir) if not downloaded_files: if args.repo: logger.warning(f"No Move files found in repository {args.repo} with 'use sui' references.") else: logger.warning("No Move files found or downloaded. Try different keywords.") return 1 # Index the downloaded files logger.info("Indexing downloaded files...") num_indexed = index_move_files( docs_dir=args.output_dir, index_file=args.index_file ) logger.info(f"Successfully indexed {num_indexed} document chunks") # Verify index try: vector_store = FAISSVectorStore() vector_store.load(args.index_file) logger.info(f"Index verification: loaded {len(vector_store.documents)} total documents") except Exception as e: logger.error(f"Error verifying index: {str(e)}") return 0 if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ProbonoBonobo/sui-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server