Skip to main content
Glama
cli.py25.3 kB
""" CLI for Scout Provides commands for managing the repository stack and triggering reindexing. """ import argparse import logging import os import re import subprocess import sys from pathlib import Path from scout.embeddings import EmbeddingStore from scout.indexer import MultiRepoIndexer, RepoIndexer from scout.stack_config import IndexingStatus, StackConfig # Global logger logger = logging.getLogger(__name__) def setup_logging(verbose=False, debug=False): """Configure logging based on verbosity flags""" if debug: level = logging.DEBUG log_format = "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s" elif verbose: level = logging.INFO log_format = "%(asctime)s - %(levelname)s - %(message)s" else: level = logging.WARNING log_format = "%(levelname)s - %(message)s" # Configure root logger logging.basicConfig( level=level, format=log_format, handlers=[logging.StreamHandler(sys.stderr)] ) # Also log to file if debug mode if debug: log_dir = os.path.expanduser("~/.scout/logs") os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, "scout.log") file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(logging.Formatter(log_format)) logging.getLogger().addHandler(file_handler) logger.debug(f"Debug logging enabled. Logs: {log_file}") def get_indexer(): """Get or create the multi-repo indexer""" db_path = os.getenv("SCOUT_DB_PATH", os.path.expanduser("~/.scout/db")) embedding_store = EmbeddingStore(db_path=db_path, collection_name="scout_code_index") return MultiRepoIndexer(embedding_store=embedding_store) def cmd_add(args): """Add a repository to the index (local path or GitHub URL)""" source = args.source name = args.name clone_dir = args.clone_dir or os.path.expanduser("~/Code") logger.info(f"Adding repository from source: {source}") logger.debug(f"Clone directory: {clone_dir}, Name: {name}") # Determine if source is a URL or local path is_url = source.startswith(("http://", "https://", "git@", "git://")) logger.debug(f"Source is URL: {is_url}") if is_url: # Extract repo name from URL if not provided if not name: # Extract from URL: https://github.com/user/repo.git -> repo match = re.search(r"/([^/]+?)(\.git)?$", source) if match: name = match.group(1) logger.info(f"Extracted repo name from URL: {name}") else: logger.error("Could not extract repo name from URL") print( "Error: Could not extract repo name from URL. Please provide --name" ) print("\nExample:") print(f" scout add {source} --name my-repo-name") return 1 # Clone the repository repo_path = Path(clone_dir) / name logger.debug(f"Target clone path: {repo_path}") if repo_path.exists(): logger.error(f"Directory already exists: {repo_path}") print(f"Error: Directory already exists: {repo_path}") print("Use a different --name or remove the existing directory") return 1 print(f"Cloning {source} to {repo_path}...") logger.info(f"Starting git clone: {source} -> {repo_path}") try: subprocess.run(["git", "clone", source, str(repo_path)], check=True) print(f"✓ Cloned successfully") logger.info("Git clone completed successfully") except subprocess.CalledProcessError as e: # Cleanup partial clone if it exists if repo_path.exists(): import shutil try: shutil.rmtree(repo_path) print(f"Cleaned up partial clone at {repo_path}") except Exception as cleanup_error: print(f"Warning: Could not cleanup {repo_path}: {cleanup_error}") print(f"Error: Failed to clone repository: {e}") print("\nTroubleshooting:") print(" - Verify the URL is correct and accessible") print(" - Check your network connection") print(" - Ensure git is installed: git --version") print(" - For private repos: check SSH keys or credentials") print(" - Try cloning manually: git clone <url>") return 1 else: # Local path repo_path = Path(source).resolve() logger.info(f"Using local repository path: {repo_path}") if not repo_path.exists(): logger.error(f"Path does not exist: {repo_path}") print(f"Error: Path does not exist: {repo_path}") print("\nSuggestions:") print(f" - Check the path is correct: ls {repo_path.parent}") print(" - Use absolute path or ensure relative path is correct") print(f" - Current directory: {Path.cwd()}") return 1 if not (repo_path / ".git").exists(): logger.warning(f"Not a git repository: {repo_path}") print(f"Warning: Not a git repository: {repo_path}") print("Proceeding anyway...") # Extract repo name from path if not provided if not name: name = repo_path.name logger.info(f"Extracted repo name from path: {name}") # Index the repository print(f"\nIndexing repository '{name}' at {repo_path}...") logger.info(f"Starting indexing for '{name}' at {repo_path}") indexer = get_indexer() try: logger.debug("Calling indexer.add_repo()") indexer.add_repo(repo_path=str(repo_path), repo_name=name, auto_index=True) logger.info(f"Indexing completed successfully for '{name}'") # Get stats from stack config repo_config = indexer.stack_config.get_repo(name) if repo_config: logger.debug( f"Stats: {repo_config.files_indexed} files, {repo_config.chunks_indexed} chunks" ) print(f"✓ Successfully indexed '{name}'") print(f" Files: {repo_config.files_indexed}") print(f" Chunks: {repo_config.chunks_indexed}") else: print(f"✓ Successfully added '{name}'") return 0 except Exception as e: logger.error(f"Failed to index repository: {e}", exc_info=True) print(f"Error: Failed to index repository: {e}") print("\nTroubleshooting:") print(" - Ensure repository path exists and is readable") print(" - Check disk space (indexing creates embeddings)") print(" - For git repos: verify git is installed") print(" - Try: scout check # Verify installation") print(" - Check logs for more details: ~/.scout/logs/scout.log") print(" - Run with --debug flag for detailed logging") return 1 def cmd_check_updates(args): """Check for repos that need reindexing""" indexer = get_indexer() needs_update = [] for repo_name, repo_indexer in indexer.repo_indexers.items(): # Get current commit current_commit = repo_indexer._get_git_commit() if indexer.stack_config.needs_reindex(repo_name, current_commit or ""): repo_config = indexer.stack_config.get_repo(repo_name) needs_update.append( { "name": repo_name, "path": repo_config.path if repo_config else "?", "old_commit": repo_config.last_commit if repo_config else None, "new_commit": current_commit, } ) if not needs_update: print("All repositories are up to date.") return 0 print(f"Found {len(needs_update)} repository(ies) needing reindex:\n") for repo in needs_update: print(f" • {repo['name']}") print(f" Path: {repo['path']}") print(f" Old commit: {repo['old_commit'] or 'never indexed'}") print(f" New commit: {repo['new_commit']}\n") return len(needs_update) def cmd_reindex_changed(args): """Reindex repositories that have changed""" indexer = get_indexer() reindexed = [] errors = [] for repo_name, repo_indexer in indexer.repo_indexers.items(): # Get current commit current_commit = repo_indexer._get_git_commit() if indexer.stack_config.needs_reindex(repo_name, current_commit or ""): print(f"Reindexing {repo_name}...") try: # Update status to indexing indexer.stack_config.update_repo_status( repo_name, IndexingStatus.INDEXING ) # Reindex result = repo_indexer.reindex(force=True) # Update status indexer.stack_config.update_repo_status( repo_name, IndexingStatus.INDEXED, last_commit=result.git_commit, files_indexed=result.files_processed, chunks_indexed=result.chunks_indexed, ) reindexed.append( { "name": repo_name, "files": result.files_processed, "chunks": result.chunks_indexed, } ) print( f" ✓ Indexed {result.files_processed} files, {result.chunks_indexed} chunks\n" ) except Exception as e: errors.append({"name": repo_name, "error": str(e)}) # Update status to error indexer.stack_config.update_repo_status( repo_name, IndexingStatus.ERROR, error_message=str(e) ) print(f" ✗ Error: {str(e)}\n") # Print summary if reindexed: print(f"\nSuccessfully reindexed {len(reindexed)} repository(ies)") if errors: print(f"\nFailed to reindex {len(errors)} repository(ies)") return 1 if not reindexed and not errors: print("All repositories are up to date.") return 0 def cmd_install_hook(args): """Install git post-merge hook in a repository""" repo_path = Path(args.repo_path).resolve() if not repo_path.exists(): print(f"Error: Repository path does not exist: {repo_path}") return 1 git_dir = repo_path / ".git" if not git_dir.exists(): print(f"Error: Not a git repository: {repo_path}") return 1 hooks_dir = git_dir / "hooks" hooks_dir.mkdir(exist_ok=True) hook_path = hooks_dir / "post-merge" # Check if hook already exists if hook_path.exists() and not args.force: print(f"Error: Hook already exists at {hook_path}") print("Use --force to overwrite") return 1 # Create hook script hook_script = """#!/bin/sh # MCP Indexer post-merge hook # Automatically reindex when pulling changes echo "Checking for indexing updates..." python3 -m scout.cli reindex-changed exit 0 """ hook_path.write_text(hook_script) hook_path.chmod(0o755) # Make executable print(f"✓ Installed post-merge hook at {hook_path}") print(f"\nThe repository will now automatically reindex after git pull.") return 0 def cmd_status(args): """Show stack status""" indexer = get_indexer() stats = indexer.get_stack_status() print("Repository Stack Status:\n") print(f" Total Repositories: {stats['total_repos']}") print(f" Total Files Indexed: {stats['total_files_indexed']}") print(f" Total Chunks Indexed: {stats['total_chunks_indexed']}") print(f" Database: {indexer.embedding_store.db_path}\n") print(" Status Breakdown:") for status, count in stats["by_status"].items(): if count > 0: print(f" {status}: {count}") # Check for stuck indexing stuck_repos = [] for repo in indexer.stack_config.list_repos(): if repo.status == IndexingStatus.INDEXING: stuck_repos.append(repo) if stuck_repos: print("\n ⚠ Warning: Repos stuck in 'indexing' status:") for repo in stuck_repos: print(f" - {repo.name}") print("\n These may have been interrupted. To recover:") print(" scout recover") return 0 def cmd_recover(args): """Recover from interrupted indexing""" indexer = get_indexer() # Find repos stuck in INDEXING status stuck_repos = [] for repo in indexer.stack_config.list_repos(): if repo.status == IndexingStatus.INDEXING: stuck_repos.append(repo) if not stuck_repos: print("No repositories need recovery.") print("All repositories are in a stable state.") return 0 print(f"Found {len(stuck_repos)} repository(ies) with interrupted indexing:\n") for repo in stuck_repos: print(f" • {repo.name}") print(f" Path: {repo.path}") print(f" Last indexed: {repo.last_indexed or 'never'}") print() if not args.force: response = input( "Attempt to recover by re-indexing these repos? (y/n): " ).lower() if response != "y": print("\nCancelled. To force recovery without prompt:") print(" scout recover --force") return 0 print("\nRecovering repositories...\n") recovered = 0 failed = 0 for repo in stuck_repos: print(f"Recovering {repo.name}...") try: repo_indexer = indexer.repo_indexers.get(repo.name) if not repo_indexer: print(f" ✗ Repository not found in indexer") failed += 1 continue # Reindex from scratch result = repo_indexer.reindex(force=True) # Update status indexer.stack_config.update_repo_status( repo.name, IndexingStatus.INDEXED, last_commit=result.git_commit, files_indexed=result.files_processed, chunks_indexed=result.chunks_indexed, ) print( f" ✓ Recovered: {result.files_processed} files, {result.chunks_indexed} chunks\n" ) recovered += 1 except Exception as e: print(f" ✗ Failed: {str(e)}\n") indexer.stack_config.update_repo_status( repo.name, IndexingStatus.ERROR, error_message=str(e) ) failed += 1 # Summary print("Recovery complete:") print(f" ✓ Recovered: {recovered}") if failed > 0: print(f" ✗ Failed: {failed}") return 1 return 0 def cmd_init(args): """Interactive setup wizard for first-time users""" print("======================================") print(" Scout Interactive Setup") print("======================================\n") # Step 1: Verify installation print("Step 1: Verifying installation...\n") try: from scout.embeddings import EmbeddingStore from scout.indexer import MultiRepoIndexer print("✓ Scout is installed correctly\n") except ImportError as e: print(f"✗ Installation error: {e}") print("Please run setup.sh first or install with: pip install -e .") return 1 # Step 2: Check for existing repos indexer = get_indexer() repos = indexer.stack_config.list_repos() if repos: print(f"✓ Found {len(repos)} existing repository(ies):\n") for repo in repos: print(f" - {repo.name} ({repo.path})") print() response = input("Would you like to add another repository? (y/n): ").lower() if response != "y": print("\nGreat! You're all set. Run 'scout status' to see your repos.") return 0 print() else: print("No repositories indexed yet. Let's add your first one!\n") # Step 3: Add a repository print("Step 2: Add a repository\n") print("You can add:") print(" 1. A local repository (provide a path)") print(" 2. A GitHub repository (provide a URL)") print(" 3. Skip for now") print() choice = input("Enter choice (1/2/3): ").strip() if choice == "3": print("\nNo problem! You can add repositories later with:") print(" scout add /path/to/repo") print(" scout add https://github.com/user/repo") return 0 repo_source = None repo_name = None if choice == "1": repo_source = input("\nEnter the path to your local repository: ").strip() if not repo_source: print("No path provided. Exiting.") return 1 elif choice == "2": repo_source = input("\nEnter the GitHub URL: ").strip() if not repo_source: print("No URL provided. Exiting.") return 1 else: print("Invalid choice. Exiting.") return 1 repo_name = input( "Enter a name for this repository (or press Enter for auto-detect): " ).strip() # Add the repository using the existing cmd_add logic print(f"\nIndexing repository...") # Create mock args for cmd_add class Args: pass add_args = Args() add_args.source = repo_source add_args.name = repo_name if repo_name else None add_args.clone_dir = None result = cmd_add(add_args) if result != 0: print("\nFailed to add repository. Please try manually with:") print(f" scout add {repo_source}") return 1 # Step 4: Demo search (optional) print("\n======================================") print("Step 3: Try a search (optional)") print("======================================\n") response = input("Would you like to try a semantic search? (y/n): ").lower() if response == "y": query = input("\nEnter a search query (e.g., 'authentication logic'): ").strip() if query: print(f"\nSearching for: '{query}'...\n") try: results = indexer.embedding_store.semantic_search(query, n_results=3) if results: print(f"Found {len(results)} results:\n") for i, result in enumerate(results, 1): print(f"{i}. {result.file_path}") print(f" Symbol: {result.symbol_name}") print(f" Score: {result.score:.4f}") print() else: print( "No results found. Try a different query or add more repositories." ) except Exception as e: print(f"Search error: {e}") # Step 5: Show next steps print("\n======================================") print(" Setup Complete!") print("======================================\n") print("Next steps:\n") print("1. Check your stack status:") print(" scout status\n") print("2. Add more repositories:") print(" scout add /path/to/repo\n") print("3. Search your code:") print(" Use the MCP tools with your AI assistant\n") print("4. Set up git hooks for auto-reindexing:") print(" scout install-hook /path/to/repo\n") print("For more help: scout --help") print() return 0 def cmd_check(args): """Verify Scout installation""" print("Scout Installation Check\n") all_ok = True # Check Python version py_version = sys.version_info if py_version >= (3, 10): print(f"✓ Python {py_version.major}.{py_version.minor}.{py_version.micro}") else: print( f"✗ Python {py_version.major}.{py_version.minor}.{py_version.micro} (3.10+ required)" ) all_ok = False # Check core dependencies try: import chromadb print("✓ ChromaDB available") except ImportError: print("✗ ChromaDB not found - run: pip install chromadb>=0.4.22") all_ok = False try: import sentence_transformers print("✓ sentence-transformers available") except ImportError: print( "✗ sentence-transformers not found - run: pip install sentence-transformers>=2.2.2" ) all_ok = False try: from scout.embeddings import EmbeddingStore from scout.indexer import MultiRepoIndexer print("✓ scout modules available") except ImportError as e: print(f"✗ scout not properly installed: {e}") print(" Try: pip install -e .") all_ok = False # Check database directory db_path = os.getenv("SCOUT_DB_PATH", "~/.scout/db") db_path_expanded = os.path.expanduser(db_path) if os.path.exists(db_path_expanded): print(f"✓ Database directory: {db_path_expanded}") else: print(f"⚠ Database directory does not exist (will be created on first use)") print(f" Location: {db_path_expanded}") # Check stack config stack_path = os.path.expanduser("~/.scout/stack.json") if os.path.exists(stack_path): try: config = StackConfig() repos = config.list_repos() print(f"✓ Stack config: {len(repos)} repo(s) configured") except Exception as e: print(f"⚠ Stack config exists but could not be read: {e}") else: print("⚠ No repositories indexed yet") print(" Run: scout add /path/to/repo") print() if all_ok: print("✓ Installation verified! Ready to use.") print("\nNext steps:") print(" scout add /path/to/repo # Index your first repository") print(" scout status # Check indexing status") return 0 else: print("✗ Installation incomplete. Please fix the errors above.") return 1 def main(): """Main CLI entry point""" parser = argparse.ArgumentParser( description="MCP Indexer CLI - Manage repository indexing" ) parser.add_argument("--version", action="version", version="scout 0.1.0") parser.add_argument( "--verbose", "-v", action="store_true", help="Enable verbose output (INFO level logging)", ) parser.add_argument( "--debug", action="store_true", help="Enable debug output (DEBUG level logging, logs to file)", ) subparsers = parser.add_subparsers(dest="command", help="Command to run") # init command parser_init = subparsers.add_parser( "init", help="Interactive setup wizard for first-time users" ) parser_init.set_defaults(func=cmd_init) # add command parser_add = subparsers.add_parser( "add", help="Add a repository to index (local path or GitHub URL)" ) parser_add.add_argument("source", help="GitHub URL or local path to repository") parser_add.add_argument( "--name", help="Name for the repository (auto-detected if not provided)" ) parser_add.add_argument( "--clone-dir", help="Directory to clone into (default: ~/Code)" ) parser_add.set_defaults(func=cmd_add) # check-updates command parser_check = subparsers.add_parser( "check-updates", help="Check which repositories need reindexing" ) parser_check.set_defaults(func=cmd_check_updates) # reindex-changed command parser_reindex = subparsers.add_parser( "reindex-changed", help="Reindex repositories that have changed" ) parser_reindex.set_defaults(func=cmd_reindex_changed) # install-hook command parser_hook = subparsers.add_parser( "install-hook", help="Install git post-merge hook in a repository" ) parser_hook.add_argument("repo_path", help="Path to git repository") parser_hook.add_argument( "--force", action="store_true", help="Overwrite existing hook" ) parser_hook.set_defaults(func=cmd_install_hook) # status command parser_status = subparsers.add_parser("status", help="Show stack status") parser_status.set_defaults(func=cmd_status) # recover command parser_recover = subparsers.add_parser( "recover", help="Recover from interrupted indexing" ) parser_recover.add_argument( "--force", action="store_true", help="Skip confirmation prompt" ) parser_recover.set_defaults(func=cmd_recover) # check command parser_check = subparsers.add_parser( "check", help="Verify installation and configuration" ) parser_check.set_defaults(func=cmd_check) args = parser.parse_args() # Setup logging based on verbosity flags setup_logging(verbose=args.verbose, debug=args.debug) logger.debug(f"CLI invoked with command: {args.command}") logger.debug(f"Arguments: {vars(args)}") if not args.command: parser.print_help() return 1 return args.func(args) if __name__ == "__main__": sys.exit(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gkatechis/mcpIndexer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server