# src/codegraphcontext/cli/main.py
"""
This module defines the command-line interface (CLI) for the CodeGraphContext application.
It uses the Typer library to create a user-friendly and well-documented CLI.
Commands:
- mcp setup: Runs an interactive wizard to configure the MCP client.
- mcp start: Launches the main MCP server.
- help: Displays help information.
- version: Show the installed version.
"""
import typer
from rich.console import Console
from rich.table import Table
from rich import box
from typing import Optional
import asyncio
import logging
import json
import os
from pathlib import Path
from dotenv import load_dotenv, find_dotenv, set_key
from importlib.metadata import version as pkg_version, PackageNotFoundError
from codegraphcontext.server import MCPServer
from codegraphcontext.core.database import DatabaseManager
from .setup_wizard import run_neo4j_setup_wizard, configure_mcp_client
from . import config_manager
# Import the new helper functions
from .cli_helpers import (
index_helper,
add_package_helper,
list_repos_helper,
delete_helper,
cypher_helper,
visualize_helper,
reindex_helper,
clean_helper,
stats_helper,
_initialize_services,
watch_helper,
unwatch_helper,
list_watching_helper,
)
# Set the log level for the noisy neo4j and asyncio logger to WARNING to keep the output clean.
logging.getLogger("neo4j").setLevel(logging.WARNING)
logging.getLogger("asyncio").setLevel(logging.WARNING)
# Initialize the Typer app and Rich console for formatted output.
app = typer.Typer(
name="cgc",
help="CodeGraphContext: An MCP server for AI-powered code analysis.\n\n[DEPRECATED] 'cgc start' is deprecated. Use 'cgc mcp start' instead.",
add_completion=True,
)
console = Console(stderr=True)
# Configure basic logging for the application.
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')
def get_version() -> str:
"""
Try to read version from the installed package metadata.
Fallback to a dev version if not installed.
"""
try:
return pkg_version("codegraphcontext") # must match [project].name in pyproject.toml
except PackageNotFoundError:
return "0.0.0 (dev)"
# Create MCP command group
mcp_app = typer.Typer(help="MCP client configuration commands")
app.add_typer(mcp_app, name="mcp")
@mcp_app.command("setup")
def mcp_setup():
"""
Configure MCP Client (IDE/CLI Integration).
Sets up CodeGraphContext integration with your IDE or CLI tool:
- VS Code, Cursor, Windsurf
- Claude Desktop, Gemini CLI
- Cline, RooCode, Amazon Q Developer
Works with FalkorDB by default (no database setup needed).
"""
console.print("\n[bold cyan]MCP Client Setup[/bold cyan]")
console.print("Configure your IDE or CLI tool to use CodeGraphContext.\n")
configure_mcp_client()
@mcp_app.command("start")
def mcp_start():
"""
Start the CodeGraphContext MCP server.
Starts the server which listens for JSON-RPC requests from stdin.
This is used by IDE integrations (VS Code, Cursor, etc.).
"""
console.print("[bold green]Starting CodeGraphContext Server...[/bold green]")
_load_credentials()
server = None
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Initialize and run the main server.
server = MCPServer(loop=loop)
loop.run_until_complete(server.run())
except ValueError as e:
# This typically happens if credentials are still not found after all checks.
console.print(f"[bold red]Configuration Error:[/bold red] {e}")
console.print("Please run `cgc neo4j setup` or use FalkorDB (default).")
except KeyboardInterrupt:
# Handle graceful shutdown on Ctrl+C.
console.print("\n[bold yellow]Server stopped by user.[/bold yellow]")
finally:
# Ensure server and event loop are properly closed.
if server:
server.shutdown()
loop.close()
@mcp_app.command("tools")
def mcp_tools():
"""
List all available MCP tools.
Shows all tools that can be called by AI assistants through the MCP interface.
"""
_load_credentials()
console.print("[bold green]Available MCP Tools:[/bold green]")
try:
# Instantiate the server to access the tool definitions.
server = MCPServer()
tools = server.tools.values()
table = Table(show_header=True, header_style="bold magenta")
table.add_column("Tool Name", style="dim", width=30)
table.add_column("Description")
for tool in sorted(tools, key=lambda t: t['name']):
table.add_row(tool['name'], tool['description'])
console.print(table)
except ValueError as e:
console.print(f"[bold red]Error loading tools:[/bold red] {e}")
console.print("Please ensure your database is configured correctly.")
except Exception as e:
console.print(f"[bold red]An unexpected error occurred:[/bold red] {e}")
# Abbreviation for mcp setup
@app.command("m", rich_help_panel="Shortcuts")
def mcp_setup_alias():
"""Shortcut for 'cgc mcp setup'"""
mcp_setup()
# Create Neo4j command group
neo4j_app = typer.Typer(help="Neo4j database configuration commands")
app.add_typer(neo4j_app, name="neo4j")
@neo4j_app.command("setup")
def neo4j_setup():
"""
Configure Neo4j Database Connection.
Choose from multiple setup options:
- Local (Docker-based, recommended)
- Local (Binary installation on Linux)
- Hosted (Neo4j AuraDB or remote instance)
- Connect to existing Neo4j instance
Note: This is optional. CodeGraphContext works with FalkorDB by default.
"""
console.print("\n[bold cyan]Neo4j Database Setup[/bold cyan]")
console.print("Configure Neo4j database connection for CodeGraphContext.\n")
run_neo4j_setup_wizard()
# Abbreviation for neo4j setup
@app.command("n", rich_help_panel="Shortcuts")
def neo4j_setup_alias():
"""Shortcut for 'cgc neo4j setup'"""
neo4j_setup()
def _load_credentials():
"""
Loads configuration and credentials from various sources into environment variables.
Uses per-variable precedence - each variable is loaded from the highest priority source.
Priority order (highest to lowest):
1. Local `mcp.json` env vars (highest - explicit MCP server config)
2. Local `.env` in project directory (high - project-specific overrides)
3. Global `~/.codegraphcontext/.env` (lowest - user defaults)
"""
from dotenv import dotenv_values
# Collect all config sources in reverse priority order (lowest to highest)
config_sources = []
config_source_names = []
# 3. Global .env file (lowest priority - user defaults)
global_env_path = Path.home() / ".codegraphcontext" / ".env"
if global_env_path.exists():
try:
config_sources.append(dotenv_values(str(global_env_path)))
config_source_names.append(str(global_env_path))
except Exception as e:
console.print(f"[yellow]Warning: Could not load global .env: {e}[/yellow]")
# 2. Local project .env (higher priority - project-specific overrides)
try:
dotenv_path = find_dotenv(usecwd=True, raise_error_if_not_found=False)
if dotenv_path:
config_sources.append(dotenv_values(dotenv_path))
config_source_names.append(str(dotenv_path))
except Exception as e:
console.print(f"[yellow]Warning: Could not load .env from current directory: {e}[/yellow]")
# 1. Local mcp.json (highest priority - explicit MCP server config)
mcp_file_path = Path.cwd() / "mcp.json"
if mcp_file_path.exists():
try:
with open(mcp_file_path, "r") as f:
mcp_config = json.load(f)
server_env = mcp_config.get("mcpServers", {}).get("CodeGraphContext", {}).get("env", {})
if server_env:
config_sources.append(server_env)
config_source_names.append("mcp.json")
except Exception as e:
console.print(f"[yellow]Warning: Could not load mcp.json: {e}[/yellow]")
# Merge all configs with proper precedence (later sources override earlier ones)
merged_config = {}
for config in config_sources:
merged_config.update(config)
# Apply merged config to environment
for key, value in merged_config.items():
if value is not None: # Only set non-None values
os.environ[key] = str(value)
# Report what was loaded
if config_source_names:
if len(config_source_names) == 1:
console.print(f"[dim]Loaded configuration from: {config_source_names[-1]}[/dim]")
else:
console.print(f"[dim]Loaded configuration from: {', '.join(config_source_names)} (highest priority: {config_source_names[-1]})[/dim]")
else:
console.print("[yellow]No configuration file found. Using defaults.[/yellow]")
# Show which database is actually being used
# Check for runtime override first (from -db/--database flag)
runtime_db = os.environ.get("CGC_RUNTIME_DB_TYPE")
if runtime_db:
default_db = runtime_db.lower()
else:
default_db = os.environ.get("DEFAULT_DATABASE", "falkordb").lower()
if default_db == "neo4j":
has_neo4j_creds = all([
os.environ.get("NEO4J_URI"),
os.environ.get("NEO4J_USERNAME"),
os.environ.get("NEO4J_PASSWORD")
])
if has_neo4j_creds:
console.print("[cyan]Using database: Neo4j[/cyan]")
else:
console.print("[yellow]⚠ DEFAULT_DATABASE=neo4j but credentials not found. Falling back to FalkorDB.[/yellow]")
else:
console.print("[cyan]Using database: FalkorDB[/cyan]")
# ============================================================================
# CONFIG COMMAND GROUP
# ============================================================================
config_app = typer.Typer(help="Manage configuration settings")
app.add_typer(config_app, name="config")
@config_app.command("show")
def config_show():
"""
Display current configuration settings.
Shows all configuration values including database, indexing options,
logging settings, and performance tuning parameters.
"""
config_manager.show_config()
@config_app.command("set")
def config_set(
key: str = typer.Argument(..., help="Configuration key to set"),
value: str = typer.Argument(..., help="Value to set")
):
"""
Set a configuration value.
Examples:
cgc config set DEFAULT_DATABASE neo4j
cgc config set INDEX_VARIABLES false
cgc config set MAX_FILE_SIZE_MB 20
cgc config set DEBUG_LOGS true
"""
config_manager.set_config_value(key, value)
@config_app.command("reset")
def config_reset():
"""
Reset all configuration to default values.
This will restore all settings to their defaults.
Your current configuration will be backed up.
"""
if typer.confirm("Are you sure you want to reset all configuration to defaults?", default=False):
config_manager.reset_config()
else:
console.print("[yellow]Reset cancelled[/yellow]")
@config_app.command("db")
def config_db(backend: str = typer.Argument(..., help="Database backend: 'neo4j' or 'falkordb'")):
"""
Quickly switch the default database backend.
Shortcut for 'cgc config set DEFAULT_DATABASE <backend>'.
Examples:
cgc config db neo4j
cgc config db falkordb
"""
backend = backend.lower()
if backend not in ['falkordb', 'neo4j']:
console.print(f"[bold red]Invalid backend: {backend}[/bold red]")
console.print("Must be 'falkordb' or 'neo4j'")
raise typer.Exit(code=1)
config_manager.set_config_value("DEFAULT_DATABASE", backend)
console.print(f"[green]✔ Default database switched to {backend}[/green]")
# ============================================================================
# BUNDLE COMMAND GROUP - Pre-indexed Graph Snapshots
# ============================================================================
bundle_app = typer.Typer(help="Create and load pre-indexed graph bundles")
app.add_typer(bundle_app, name="bundle")
@bundle_app.command("export")
def bundle_export(
output: str = typer.Argument(..., help="Output path for the .cgc bundle file"),
repo: Optional[str] = typer.Option(None, "--repo", "-r", help="Specific repository path to export (default: export all)"),
no_stats: bool = typer.Option(False, "--no-stats", help="Skip statistics generation")
):
"""
Export the current graph to a portable .cgc bundle.
Creates a pre-indexed graph snapshot that can be distributed and loaded
instantly without re-indexing. Perfect for sharing famous repositories.
Examples:
cgc bundle export numpy.cgc --repo /path/to/numpy
cgc bundle export my-project.cgc
cgc bundle export all-repos.cgc --no-stats
"""
_load_credentials()
from codegraphcontext.core.cgc_bundle import CGCBundle
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
output_path = Path(output)
repo_path = Path(repo).resolve() if repo else None
console.print(f"[cyan]Exporting graph to {output_path}...[/cyan]")
if repo_path:
console.print(f"[dim]Repository: {repo_path}[/dim]")
else:
console.print(f"[dim]Exporting all repositories[/dim]")
bundle = CGCBundle(db_manager)
success, message = bundle.export_to_bundle(
output_path,
repo_path=repo_path,
include_stats=not no_stats
)
if success:
console.print(f"[bold green]{message}[/bold green]")
else:
console.print(f"[bold red]Export failed: {message}[/bold red]")
raise typer.Exit(code=1)
finally:
db_manager.close_driver()
@bundle_app.command("import")
def bundle_import(
bundle_file: str = typer.Argument(..., help="Path to the .cgc bundle file to import"),
clear: bool = typer.Option(False, "--clear", help="Clear existing graph data before importing")
):
"""
Import a .cgc bundle into the current database.
Loads a pre-indexed graph snapshot into your database. Use --clear to
replace all existing data with the bundle contents.
Examples:
cgc bundle import numpy.cgc
cgc bundle import my-project.cgc --clear
"""
_load_credentials()
from codegraphcontext.core.cgc_bundle import CGCBundle
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
bundle_path = Path(bundle_file)
if not bundle_path.exists():
console.print(f"[bold red]Bundle file not found: {bundle_path}[/bold red]")
raise typer.Exit(code=1)
if clear:
console.print("[yellow]⚠️ Warning: This will clear all existing graph data![/yellow]")
if not typer.confirm("Are you sure you want to continue?", default=False):
console.print("[yellow]Import cancelled[/yellow]")
return
console.print(f"[cyan]Importing bundle from {bundle_path}...[/cyan]")
bundle = CGCBundle(db_manager)
success, message = bundle.import_from_bundle(
bundle_path,
clear_existing=clear
)
if success:
console.print(f"[bold green]{message}[/bold green]")
else:
console.print(f"[bold red]Import failed: {message}[/bold red]")
raise typer.Exit(code=1)
finally:
db_manager.close_driver()
@bundle_app.command("load")
def bundle_load(
bundle_name: str = typer.Argument(..., help="Bundle name or path to load (e.g., 'numpy' or 'numpy.cgc')"),
clear: bool = typer.Option(False, "--clear", help="Clear existing graph data before loading")
):
"""
Load a pre-indexed bundle (download if needed, then import).
This is a convenience command that will:
1. Check if the bundle exists locally
2. Download from registry if not found (future feature)
3. Import the bundle into the database
Examples:
cgc load numpy
cgc load numpy.cgc --clear
"""
_load_credentials()
# For now, this is just an alias for import
# In the future, this will support downloading from a registry
bundle_path = Path(bundle_name)
if not bundle_path.suffix:
bundle_path = Path(f"{bundle_name}.cgc")
if not bundle_path.exists():
console.print(f"[yellow]Bundle '{bundle_name}' not found locally.[/yellow]")
console.print("[dim]Registry download not yet implemented. Please provide a local .cgc file.[/dim]")
console.print(f"[dim]Usage: cgc bundle load /path/to/{bundle_path.name}[/dim]")
raise typer.Exit(code=1)
# Call import
bundle_import(str(bundle_path), clear=clear)
# Shortcut commands at root level
@app.command("export", rich_help_panel="Bundle Shortcuts")
def export_shortcut(
output: str = typer.Argument(..., help="Output path for the .cgc bundle file"),
repo: Optional[str] = typer.Option(None, "--repo", "-r", help="Specific repository path to export")
):
"""Shortcut for 'cgc bundle export'"""
bundle_export(output, repo, False)
@app.command("load", rich_help_panel="Bundle Shortcuts")
def load_shortcut(
bundle_name: str = typer.Argument(..., help="Bundle name or path to load"),
clear: bool = typer.Option(False, "--clear", help="Clear existing graph data before loading")
):
"""Shortcut for 'cgc bundle load'"""
bundle_load(bundle_name, clear)
# ============================================================================
# DOCTOR DIAGNOSTIC COMMAND
# ============================================================================
@app.command()
def doctor():
"""
Run diagnostics to check system health and configuration.
Checks:
- Configuration validity
- Database connectivity
- Tree-sitter installation
- Required dependencies
- File permissions
"""
console.print("[bold cyan]🏥 Running CodeGraphContext Diagnostics...[/bold cyan]\n")
all_checks_passed = True
# 1. Check configuration
console.print("[bold]1. Checking Configuration...[/bold]")
try:
config = config_manager.load_config()
console.print(f" [green]✓[/green] Configuration loaded from {config_manager.CONFIG_FILE}")
# Validate each config value
invalid_configs = []
for key, value in config.items():
is_valid, error_msg = config_manager.validate_config_value(key, value)
if not is_valid:
invalid_configs.append(f"{key}: {error_msg}")
if invalid_configs:
console.print(f" [red]✗[/red] Invalid configuration values found:")
for err in invalid_configs:
console.print(f" - {err}")
all_checks_passed = False
else:
console.print(f" [green]✓[/green] All configuration values are valid")
except Exception as e:
console.print(f" [red]✗[/red] Configuration error: {e}")
all_checks_passed = False
# 2. Check database connectivity
console.print("\n[bold]2. Checking Database Connection...[/bold]")
try:
_load_credentials()
default_db = config.get("DEFAULT_DATABASE", "falkordb")
console.print(f" Default database: {default_db}")
if default_db == "neo4j":
uri = os.environ.get("NEO4J_URI")
username = os.environ.get("NEO4J_USERNAME")
password = os.environ.get("NEO4J_PASSWORD")
if uri and username and password:
console.print(f" [cyan]Testing Neo4j connection to {uri}...[/cyan]")
is_connected, error_msg = DatabaseManager.test_connection(uri, username, password)
if is_connected:
console.print(f" [green]✓[/green] Neo4j connection successful")
else:
console.print(f" [red]✗[/red] Neo4j connection failed: {error_msg}")
all_checks_passed = False
else:
console.print(f" [yellow]⚠[/yellow] Neo4j credentials not set. Run 'cgc neo4j setup'")
else:
# FalkorDB
try:
import falkordb
console.print(f" [green]✓[/green] FalkorDB Lite is installed")
except ImportError:
console.print(f" [yellow]⚠[/yellow] FalkorDB Lite not installed (Python 3.12+ only)")
console.print(f" Run: pip install falkordblite")
except Exception as e:
console.print(f" [red]✗[/red] Database check error: {e}")
all_checks_passed = False
# 3. Check tree-sitter installation
console.print("\n[bold]3. Checking Tree-Sitter Installation...[/bold]")
try:
from tree_sitter import Language, Parser
console.print(f" [green]✓[/green] tree-sitter is installed")
try:
from tree_sitter_language_pack import get_language
console.print(f" [green]✓[/green] tree-sitter-language-pack is installed")
# Test a few languages
test_langs = ["python", "javascript", "typescript"]
for lang in test_langs:
try:
get_language(lang)
console.print(f" [green]✓[/green] {lang} parser available")
except Exception:
console.print(f" [yellow]⚠[/yellow] {lang} parser not available")
except ImportError:
console.print(f" [red]✗[/red] tree-sitter-language-pack not installed")
all_checks_passed = False
except ImportError as e:
console.print(f" [red]✗[/red] tree-sitter not installed: {e}")
all_checks_passed = False
# 4. Check file permissions
console.print("\n[bold]4. Checking File Permissions...[/bold]")
try:
config_dir = config_manager.CONFIG_DIR
if config_dir.exists():
console.print(f" [green]✓[/green] Config directory exists: {config_dir}")
# Check if writable
test_file = config_dir / ".test_write"
try:
test_file.touch()
test_file.unlink()
console.print(f" [green]✓[/green] Config directory is writable")
except Exception as e:
console.print(f" [red]✗[/red] Config directory not writable: {e}")
all_checks_passed = False
else:
console.print(f" [yellow]⚠[/yellow] Config directory doesn't exist, will be created on first use")
except Exception as e:
console.print(f" [red]✗[/red] Permission check error: {e}")
all_checks_passed = False
# 5. Check cgc command availability
console.print("\n[bold]5. Checking CGC Command...[/bold]")
import shutil
cgc_path = shutil.which("cgc")
if cgc_path:
console.print(f" [green]✓[/green] cgc command found at: {cgc_path}")
else:
console.print(f" [yellow]⚠[/yellow] cgc command not in PATH (using python -m cgc)")
# Final summary
console.print("\n" + "=" * 60)
if all_checks_passed:
console.print("[bold green]✅ All diagnostics passed! System is healthy.[/bold green]")
else:
console.print("[bold yellow]⚠️ Some issues detected. Please review the output above.[/bold yellow]")
console.print("\n[cyan]Common fixes:[/cyan]")
console.print(" • For Neo4j issues: Run 'cgc neo4j setup'")
console.print(" • For missing packages: pip install codegraphcontext")
console.print(" • For config issues: Run 'cgc config reset'")
console.print("=" * 60 + "\n")
@app.command()
def start():
"""
[DEPRECATED] Use 'cgc mcp start' instead. This command will be removed in a future version.
"""
console.print("[yellow]⚠️ 'cgc start' is deprecated. Use 'cgc mcp start' instead.[/yellow]")
mcp_start()
@app.command()
def index(
path: Optional[str] = typer.Argument(None, help="Path to the directory or file to index. Defaults to the current directory."),
force: bool = typer.Option(False, "--force", "-f", help="Force re-index (delete existing and rebuild)")
):
"""
Indexes a directory or file by adding it to the code graph.
If no path is provided, it indexes the current directory.
Use --force to delete the existing index and rebuild from scratch.
"""
_load_credentials()
if path is None:
path = str(Path.cwd())
if force:
console.print("[yellow]Force re-indexing (--force flag detected)[/yellow]")
reindex_helper(path)
else:
index_helper(path)
@app.command()
def clean():
"""
Remove orphaned nodes and relationships from the database.
This will clean up nodes that are not connected to any repository,
helping to keep your database tidy and performant.
"""
_load_credentials()
clean_helper()
@app.command()
def stats(path: Optional[str] = typer.Argument(None, help="Path to show stats for. Omit for overall stats.")):
"""
Show indexing statistics.
If a path is provided, shows stats for that specific repository.
Otherwise, shows overall database statistics.
"""
_load_credentials()
if path:
path = str(Path(path).resolve())
stats_helper(path)
@app.command()
def delete(
path: Optional[str] = typer.Argument(None, help="Path of the repository to delete from the code graph."),
all_repos: bool = typer.Option(False, "--all", help="Delete all indexed repositories")
):
"""
Deletes a repository from the code graph.
Use --all to delete all repositories at once (requires confirmation).
Examples:
cgc delete ./my-project # Delete specific repository
cgc delete --all # Delete all repositories
"""
_load_credentials()
if all_repos:
# Delete all repositories
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
# Get list of repositories
repos = code_finder.list_indexed_repositories()
if not repos:
console.print("[yellow]No repositories to delete.[/yellow]")
return
# Show what will be deleted
console.print(f"\n[bold red]⚠️ WARNING: You are about to delete ALL {len(repos)} repositories![/bold red]\n")
table = Table(show_header=True, header_style="bold magenta")
table.add_column("Name", style="cyan")
table.add_column("Path", style="dim")
for repo in repos:
table.add_row(repo.get("name", ""), repo.get("path", ""))
console.print(table)
console.print()
# Double confirmation
if not typer.confirm("Are you sure you want to delete ALL repositories?", default=False):
console.print("[yellow]Deletion cancelled.[/yellow]")
return
console.print("[yellow]Please type 'delete all' to confirm:[/yellow] ", end="")
confirmation = input()
if confirmation.strip().lower() != "delete all":
console.print("[yellow]Deletion cancelled. Confirmation text did not match.[/yellow]")
return
# Delete all repositories
console.print("\n[cyan]Deleting all repositories...[/cyan]")
deleted_count = 0
for repo in repos:
repo_path = repo.get("path", "")
try:
graph_builder.delete_repository_from_graph(repo_path)
console.print(f"[green]✓[/green] Deleted: {repo.get('name', '')}")
deleted_count += 1
except Exception as e:
console.print(f"[red]✗[/red] Failed to delete {repo.get('name', '')}: {e}")
console.print(f"\n[bold green]Successfully deleted {deleted_count}/{len(repos)} repositories![/bold green]")
finally:
db_manager.close_driver()
else:
# Delete specific repository
if not path:
console.print("[red]Error: Please provide a path or use --all to delete all repositories[/red]")
console.print("Usage: cgc delete <path> or cgc delete --all")
raise typer.Exit(code=1)
delete_helper(path)
@app.command()
def visualize(query: Optional[str] = typer.Argument(None, help="The Cypher query to visualize.")):
"""
Generates a URL to visualize a Cypher query in the Neo4j Browser.
If no query is provided, a default query will be used.
"""
if query is None:
query = "MATCH p=()-->() RETURN p"
_load_credentials()
visualize_helper(query)
@app.command("list")
def list_repositories():
"""
List all indexed repositories.
Shows all projects and packages that have been indexed in the code graph.
"""
_load_credentials()
list_repos_helper()
@app.command(name="add-package")
def add_package(package_name: str = typer.Argument(..., help="Name of the package to add."), language: str = typer.Argument(..., help="Language of the package." )):
"""
Adds a package to the code graph.
"""
_load_credentials()
add_package_helper(package_name, language)
# ============================================================================
# WATCH COMMAND GROUP - Live File Monitoring
# ============================================================================
@app.command()
def watch(
path: str = typer.Argument(".", help="Path to the directory to watch. Defaults to current directory.")
):
"""
Watch a directory for file changes and automatically update the code graph.
This command runs in the foreground and monitors the specified directory
for any file changes. When changes are detected, the code graph is
automatically updated.
The watcher will:
- Perform an initial scan if the directory is not yet indexed
- Monitor for file creation, modification, deletion, and moves
- Automatically re-index affected files and update relationships
Press Ctrl+C to stop watching.
Examples:
cgc watch . # Watch current directory
cgc watch /path/to/project # Watch specific directory
cgc w . # Using shortcut alias
"""
_load_credentials()
watch_helper(path)
@app.command()
def unwatch(
path: str = typer.Argument(..., help="Path to stop watching")
):
"""
Stop watching a directory for changes.
Note: This command is primarily for MCP server mode.
For CLI watch mode, simply press Ctrl+C in the watch terminal.
Examples:
cgc unwatch /path/to/project
"""
_load_credentials()
unwatch_helper(path)
@app.command()
def watching():
"""
List all directories currently being watched for changes.
Note: This command is primarily for MCP server mode.
For CLI watch mode, check the terminal where you ran 'cgc watch'.
Examples:
cgc watching
"""
_load_credentials()
list_watching_helper()
# ============================================================================
# FIND COMMAND GROUP - Code Search & Discovery
# ============================================================================
find_app = typer.Typer(help="Find and search code elements")
app.add_typer(find_app, name="find")
@find_app.command("name")
def find_by_name(
name: str = typer.Argument(..., help="Exact name to search for"),
type: Optional[str] = typer.Option(None, "--type", "-t", help="Filter by type (function, class, file, module)")
):
"""
Find code elements by exact name.
Examples:
cgc find name MyClass
cgc find name calculate --type function
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
results = []
# Search based on type filter
if type is None or type.lower() == 'all':
funcs = code_finder.find_by_function_name(name, fuzzy_search=False)
classes = code_finder.find_by_class_name(name, fuzzy_search=False)
variables = code_finder.find_by_variable_name(name)
modules = code_finder.find_by_module_name(name)
imports = code_finder.find_imports(name)
for f in funcs: f['type'] = 'Function'
for c in classes: c['type'] = 'Class'
for v in variables: v['type'] = 'Variable'
for m in modules: m['type'] = 'Module'; m['file_path'] = m.get('name', 'External') # Modules might differ
for i in imports:
i['type'] = 'Import'
i['name'] = i.get('alias') or i.get('imported_name')
results.extend(funcs)
results.extend(classes)
results.extend(variables)
results.extend(modules)
results.extend(imports)
elif type.lower() == 'function':
results = code_finder.find_by_function_name(name, fuzzy_search=False)
for r in results: r['type'] = 'Function'
elif type.lower() == 'class':
results = code_finder.find_by_class_name(name, fuzzy_search=False)
for r in results: r['type'] = 'Class'
elif type.lower() == 'variable':
results = code_finder.find_by_variable_name(name)
for r in results: r['type'] = 'Variable'
elif type.lower() == 'module':
results = code_finder.find_by_module_name(name)
for r in results:
r['type'] = 'Module'
r['file_path'] = r.get('name')
elif type.lower() == 'file':
# Quick query for file
with db_manager.get_driver().session() as session:
res = session.run("MATCH (n:File) WHERE n.name = $name RETURN n.name as name, n.path as file_path, n.is_dependency as is_dependency", name=name)
results = [dict(record) for record in res]
for r in results: r['type'] = 'File'
if not results:
console.print(f"[yellow]No code elements found with name '{name}'[/yellow]")
return
table = Table(show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Name", style="cyan")
table.add_column("Type", style="bold blue")
table.add_column("Location", style="dim", overflow="fold")
for res in results:
file_path = res.get('file_path', '') or ''
line_str = str(res.get('line_number', ''))
location_str = f"{file_path}:{line_str}" if line_str else file_path
table.add_row(
res.get('name', ''),
res.get('type', 'Unknown'),
location_str
)
console.print(f"[cyan]Found {len(results)} matches for '{name}':[/cyan]")
console.print(table)
finally:
db_manager.close_driver()
@find_app.command("pattern")
def find_by_pattern(
pattern: str = typer.Argument(..., help="Substring pattern to search (fuzzy search fallback)"),
case_sensitive: bool = typer.Option(False, "--case-sensitive", "-c", help="Case-sensitive search")
):
"""
Find code elements using substring matching.
Examples:
cgc find pattern "Auth" # Finds Auth, Authentication, Authorize...
cgc find pattern "process_" # Finds process_data, process_request...
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
with db_manager.get_driver().session() as session:
# Search Functions, Classes, and Modules
# Note: FalkorDB Lite might not support regex, using CONTAINS
if not case_sensitive:
query = """
MATCH (n)
WHERE (n:Function OR n:Class OR n:Module OR n:Variable) AND toLower(n.name) CONTAINS toLower($pattern)
RETURN
labels(n)[0] as type,
n.name as name,
n.file_path as file_path,
n.line_number as line_number,
n.is_dependency as is_dependency
ORDER BY n.is_dependency ASC, n.name
LIMIT 50
"""
else:
query = """
MATCH (n)
WHERE (n:Function OR n:Class OR n:Module OR n:Variable) AND n.name CONTAINS $pattern
RETURN
labels(n)[0] as type,
n.name as name,
n.file_path as file_path,
n.line_number as line_number,
n.is_dependency as is_dependency
ORDER BY n.is_dependency ASC, n.name
LIMIT 50
"""
result = session.run(query, pattern=pattern)
results = [dict(record) for record in result]
if not results:
console.print(f"[yellow]No matches found for pattern '{pattern}'[/yellow]")
return
if not case_sensitive and any(c in pattern for c in "*?["):
console.print("[yellow]Note: Wildcards/Regex are not fully supported in this mode. Performing substring search.[/yellow]")
table = Table(show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Name", style="cyan")
table.add_column("Type", style="blue")
table.add_column("Location", style="dim", overflow="fold")
table.add_column("Source", style="yellow")
for res in results:
file_path = res.get('file_path', '') or ''
line_str = str(res.get('line_number', '') if res.get('line_number') is not None else '')
location_str = f"{file_path}:{line_str}" if line_str else file_path
table.add_row(
res.get('name', ''),
res.get('type', 'Unknown'),
location_str,
"📦 Dependency" if res.get('is_dependency') else "📝 Project"
)
console.print(f"[cyan]Found {len(results)} matches for pattern '{pattern}':[/cyan]")
console.print(table)
finally:
db_manager.close_driver()
@find_app.command("type")
def find_by_type(
element_type: str = typer.Argument(..., help="Type to search for (function, class, file, module)"),
limit: int = typer.Option(50, "--limit", "-l", help="Maximum results to return")
):
"""
Find all elements of a specific type.
Examples:
cgc find type class
cgc find type function --limit 100
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
results = code_finder.find_by_type(element_type, limit)
if not results:
console.print(f"[yellow]No elements found of type '{element_type}'[/yellow]")
return
table = Table(show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Name", style="cyan")
table.add_column("Location", style="dim", overflow="fold")
table.add_column("Source", style="yellow")
for res in results:
file_path = res.get('file_path', '') or ''
line_str = str(res.get('line_number', ''))
location_str = f"{file_path}:{line_str}" if line_str else file_path
table.add_row(
res.get('name', ''),
location_str,
"📦 Dependency" if res.get('is_dependency') else "📝 Project"
)
console.print(f"[cyan]Found {len(results)} {element_type}s:[/cyan]")
console.print(table)
finally:
db_manager.close_driver()
@find_app.command("variable")
def find_by_variable(
name: str = typer.Argument(..., help="Variable name to search for")
):
"""
Find variables by name.
Examples:
cgc find variable MAX_RETRIES
cgc find variable config
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
results = code_finder.find_by_variable_name(name)
if not results:
console.print(f"[yellow]No variables found with name '{name}'[/yellow]")
return
table = Table(show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Name", style="cyan")
table.add_column("Location", style="dim", overflow="fold")
table.add_column("Context", style="yellow")
for res in results:
file_path = res.get('file_path', '') or ''
line_str = str(res.get('line_number', ''))
location_str = f"{file_path}:{line_str}" if line_str else file_path
table.add_row(
res.get('name', ''),
location_str,
res.get('context', '') or 'module'
)
console.print(f"[cyan]Found {len(results)} variable(s) named '{name}':[/cyan]")
console.print(table)
finally:
db_manager.close_driver()
@find_app.command("content")
def find_by_content_search(
query: str = typer.Argument(..., help="Text to search for in source code and docstrings")
):
"""
Search code content (source and docstrings) using full-text index.
Examples:
cgc find content "error 503"
cgc find content "TODO: refactor"
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
try:
results = code_finder.find_by_content(query)
except Exception as e:
error_msg = str(e).lower()
if 'fulltext' in error_msg or 'db.index.fulltext' in error_msg:
console.print("\n[bold red]❌ Full-text search is not supported on FalkorDB[/bold red]\n")
console.print("[yellow]💡 You have two options:[/yellow]\n")
console.print(" 1. [cyan]Switch to Neo4j:[/cyan]")
console.print(f" [dim]cgc --database neo4j find content \"{query}\"[/dim]\n")
console.print(" 2. [cyan]Use pattern search instead:[/cyan]")
console.print(f" [dim]cgc find pattern \"{query}\"[/dim]")
console.print(" [dim](searches in names only, not source code)[/dim]\n")
return
else:
# Re-raise if it's a different error
raise
if not results:
console.print(f"[yellow]No content matches found for '{query}'[/yellow]")
return
table = Table(show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Name", style="cyan")
table.add_column("Type", style="blue")
table.add_column("Location", style="dim", overflow="fold")
for res in results:
file_path = res.get('file_path', '') or ''
line_str = str(res.get('line_number', ''))
location_str = f"{file_path}:{line_str}" if line_str else file_path
table.add_row(
res.get('name', ''),
res.get('type', 'Unknown'),
location_str
)
console.print(f"[cyan]Found {len(results)} content match(es) for '{query}':[/cyan]")
console.print(table)
finally:
db_manager.close_driver()
@find_app.command("decorator")
def find_by_decorator_search(
decorator: str = typer.Argument(..., help="Decorator name to search for"),
file: Optional[str] = typer.Option(None, "--file", "-f", help="Specific file path")
):
"""
Find functions with a specific decorator.
Examples:
cgc find decorator app.route
cgc find decorator test --file tests/test_main.py
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
results = code_finder.find_functions_by_decorator(decorator, file)
if not results:
console.print(f"[yellow]No functions found with decorator '@{decorator}'[/yellow]")
return
table = Table(show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Function", style="cyan")
table.add_column("Location", style="dim", overflow="fold")
table.add_column("Decorators", style="yellow")
for res in results:
decorators_str = ", ".join(res.get('decorators', []))
file_path = res.get('file_path', '') or ''
line_str = str(res.get('line_number', ''))
location_str = f"{file_path}:{line_str}" if line_str else file_path
table.add_row(
res.get('function_name', ''),
location_str,
decorators_str
)
console.print(f"[cyan]Found {len(results)} function(s) with decorator '@{decorator}':[/cyan]")
console.print(table)
finally:
db_manager.close_driver()
@find_app.command("argument")
def find_by_argument_search(
argument: str = typer.Argument(..., help="Argument/parameter name to search for"),
file: Optional[str] = typer.Option(None, "--file", "-f", help="Specific file path")
):
"""
Find functions that take a specific argument/parameter.
Examples:
cgc find argument password
cgc find argument user_id --file src/auth.py
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
results = code_finder.find_functions_by_argument(argument, file)
if not results:
console.print(f"[yellow]No functions found with argument '{argument}'[/yellow]")
return
table = Table(show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Function", style="cyan")
table.add_column("Location", style="dim", overflow="fold")
for res in results:
file_path = res.get('file_path', '') or ''
line_str = str(res.get('line_number', ''))
location_str = f"{file_path}:{line_str}" if line_str else file_path
table.add_row(
res.get('function_name', ''),
location_str
)
console.print(f"[cyan]Found {len(results)} function(s) with argument '{argument}':[/cyan]")
console.print(table)
finally:
db_manager.close_driver()
# ============================================================================
# ANALYZE COMMAND GROUP - Code Analysis & Relationships
# ============================================================================
analyze_app = typer.Typer(help="Analyze code relationships, dependencies, and quality")
app.add_typer(analyze_app, name="analyze")
@analyze_app.command("calls")
def analyze_calls(
function: str = typer.Argument(..., help="Function name to analyze"),
file: Optional[str] = typer.Option(None, "--file", "-f", help="Specific file path")
):
"""
Show what functions this function calls (callees).
Example:
cgc analyze calls process_data
cgc analyze calls process_data --file src/main.py
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
results = code_finder.what_does_function_call(function, file)
if not results:
console.print(f"[yellow]No function calls found for '{function}'[/yellow]")
return
table = Table(show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Called Function", style="cyan")
table.add_column("Location", style="dim", overflow="fold")
table.add_column("Type", style="yellow")
for result in results:
file_path = result.get("called_file_path", "")
line_str = str(result.get("called_line_number", ""))
location_str = f"{file_path}:{line_str}" if line_str else file_path
table.add_row(
result.get("called_function", ""),
location_str,
"📦 Dependency" if result.get("called_is_dependency") else "📝 Project"
)
console.print(f"\n[bold cyan]Function '{function}' calls:[/bold cyan]")
console.print(table)
console.print(f"\n[dim]Total: {len(results)} function(s)[/dim]")
finally:
db_manager.close_driver()
@analyze_app.command("callers")
def analyze_callers(
function: str = typer.Argument(..., help="Function name to analyze"),
file: Optional[str] = typer.Option(None, "--file", "-f", help="Specific file path")
):
"""
Show what functions call this function.
Example:
cgc analyze callers process_data
cgc analyze callers process_data --file src/main.py
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
results = code_finder.who_calls_function(function, file)
if not results:
console.print(f"[yellow]No callers found for '{function}'[/yellow]")
return
table = Table(show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Caller Function", style="cyan")
table.add_column("Location", style="green")
table.add_column("Call Type", style="yellow")
for result in results:
file_path = result.get("caller_file_path", "")
line_number = result.get("caller_line_number")
location = f"{file_path}:{line_number}" if line_number else file_path
table.add_row(
result.get("caller_function", ""),
location,
"📦 Dependency" if result.get("caller_is_dependency") else "📝 Project"
)
console.print(f"\n[bold cyan]Functions that call '{function}':[/bold cyan]")
console.print(table)
console.print(f"\n[dim]Total: {len(results)} caller(s)[/dim]")
finally:
db_manager.close_driver()
@analyze_app.command("chain")
def analyze_chain(
from_func: str = typer.Argument(..., help="Starting function"),
to_func: str = typer.Argument(..., help="Target function"),
max_depth: int = typer.Option(5, "--depth", "-d", help="Maximum call chain depth"),
from_file: Optional[str] = typer.Option(None, "--from-file", help="File for starting function"),
to_file: Optional[str] = typer.Option(None, "--to-file", help="File for target function")
):
"""
Show call chain between two functions.
Example:
cgc analyze chain main process_data --depth 10
cgc analyze chain main process --from-file main.py --to-file utils.py
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
results = code_finder.find_function_call_chain(from_func, to_func, max_depth, from_file, to_file)
if not results:
console.print(f"[yellow]No call chain found between '{from_func}' and '{to_func}' within depth {max_depth}[/yellow]")
return
for idx, chain in enumerate(results, 1):
console.print(f"\n[bold cyan]Call Chain #{idx} (length: {chain.get('chain_length', 0)}):[/bold cyan]")
functions = chain.get('function_chain', [])
call_details = chain.get('call_details', [])
for i, func in enumerate(functions):
indent = " " * i
# Print function
console.print(f"{indent}[cyan]{func.get('name', 'Unknown')}[/cyan] [dim]({func.get('file_path', '')}:{func.get('line_number', '')})[/dim]")
# If there is a next step, print the connecting call detail
if i < len(functions) - 1 and i < len(call_details):
detail = call_details[i]
line = detail.get('call_line', '?')
# Format args for display
args_info = ""
args_val = detail.get('args', [])
if args_val:
if isinstance(args_val, list):
# Filter legacy punctuation just in case
clean_args = [str(a) for a in args_val if str(a) not in ('(', ')', ',')]
args_str = ", ".join(clean_args)
else:
args_str = str(args_val)
# Truncate if too long
if len(args_str) > 50:
args_str = args_str[:47] + "..."
args_info = f" [dim]({args_str})[/dim]"
console.print(f"{indent} ⬇ [dim]calls at line {line}[/dim]{args_info}")
finally:
db_manager.close_driver()
@analyze_app.command("deps")
def analyze_dependencies(
target: str = typer.Argument(..., help="Module name"),
show_external: bool = typer.Option(True, "--external/--no-external", help="Show external dependencies")
):
"""
Show dependencies and imports for a module.
Example:
cgc analyze deps numpy
cgc analyze deps mymodule --no-external
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
results = code_finder.find_module_dependencies(target)
if not results.get('importers') and not results.get('imports'):
console.print(f"[yellow]No dependency information found for '{target}'[/yellow]")
return
# Show who imports this module
if results.get('importers'):
console.print(f"\n[bold cyan]Files that import '{target}':[/bold cyan]")
table = Table(show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Location", style="cyan", overflow="fold")
for imp in results['importers']:
file_path = imp.get('importer_file_path', '')
line_str = str(imp.get('import_line_number', ''))
location_str = f"{file_path}:{line_str}" if line_str else file_path
table.add_row(
location_str
)
console.print(table)
finally:
db_manager.close_driver()
@analyze_app.command("tree")
def analyze_inheritance_tree(
class_name: str = typer.Argument(..., help="Class name"),
file: Optional[str] = typer.Option(None, "--file", "-f", help="Specific file path")
):
"""
Show inheritance hierarchy for a class.
Example:
cgc analyze tree MyClass
cgc analyze tree MyClass --file src/models.py
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
results = code_finder.find_class_hierarchy(class_name, file)
console.print(f"\n[bold cyan]Class Hierarchy for '{class_name}':[/bold cyan]\n")
# Show parent classes
if results.get('parent_classes'):
console.print("[bold yellow]Parents (inherits from):[/bold yellow]")
for parent in results['parent_classes']:
console.print(f" ⬆ [cyan]{parent.get('parent_class', '')}[/cyan] [dim]({parent.get('parent_file_path', '')}:{parent.get('parent_line_number', '')})[/dim]")
else:
console.print("[dim]No parent classes found[/dim]")
console.print()
# Show child classes
if results.get('child_classes'):
console.print("[bold yellow]Children (classes that inherit from this):[/bold yellow]")
for child in results['child_classes']:
console.print(f" ⬇ [cyan]{child.get('child_class', '')}[/cyan] [dim]({child.get('child_file_path', '')}:{child.get('child_line_number', '')})[/dim]")
else:
console.print("[dim]No child classes found[/dim]")
console.print()
# Show methods
if results.get('methods'):
console.print(f"[bold yellow]Methods ({len(results['methods'])}):[/bold yellow]")
for method in results['methods'][:10]: # Limit to 10
console.print(f" • [green]{method.get('method_name', '')}[/green]({method.get('method_args', '')})")
if len(results['methods']) > 10:
console.print(f" [dim]... and {len(results['methods']) - 10} more[/dim]")
finally:
db_manager.close_driver()
@analyze_app.command("complexity")
def analyze_complexity(
path: Optional[str] = typer.Argument(None, help="Specific function name to analyze"),
threshold: int = typer.Option(10, "--threshold", "-t", help="Complexity threshold for warnings"),
limit: int = typer.Option(20, "--limit", "-l", help="Maximum results to show"),
file: Optional[str] = typer.Option(None, "--file", "-f", help="Specific file path (only used when function name is provided)")
):
"""
Show cyclomatic complexity for functions.
Example:
cgc analyze complexity # Most complex functions
cgc analyze complexity --threshold 15 # Functions over threshold
cgc analyze complexity my_function # Specific function
cgc analyze complexity my_function -f file.py # Specific function in file
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
if path:
# Specific function
result = code_finder.get_cyclomatic_complexity(path, file)
if result:
console.print(f"\n[bold cyan]Complexity for '{path}':[/bold cyan]")
console.print(f" Cyclomatic Complexity: [yellow]{result.get('complexity', 'N/A')}[/yellow]")
console.print(f" File: [dim]{result.get('file_path', '')}[/dim]")
console.print(f" Line: [dim]{result.get('line_number', '')}[/dim]")
else:
console.print(f"[yellow]Function '{path}' not found or has no complexity data[/yellow]")
else:
# Most complex functions
results = code_finder.find_most_complex_functions(limit)
if not results:
console.print("[yellow]No complexity data available[/yellow]")
return
table = Table(show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Function", style="cyan")
table.add_column("Complexity", style="yellow", justify="right")
table.add_column("Location", style="dim", overflow="fold")
for func in results:
complexity = func.get('complexity', 0)
color = "red" if complexity > threshold else "yellow" if complexity > threshold/2 else "green"
file_path = func.get('file_path', '')
line_str = str(func.get('line_number', ''))
location_str = f"{file_path}:{line_str}" if line_str else file_path
table.add_row(
func.get('function_name', ''),
f"[{color}]{complexity}[/{color}]",
location_str
)
console.print(f"\n[bold cyan]Most Complex Functions (threshold: {threshold}):[/bold cyan]")
console.print(table)
console.print(f"\n[dim]{len([f for f in results if f.get('complexity', 0) > threshold])} function(s) exceed threshold[/dim]")
finally:
db_manager.close_driver()
@analyze_app.command("dead-code")
def analyze_dead_code(
path: Optional[str] = typer.Argument(None, help="Path to analyze (not yet implemented)"),
exclude_decorators: Optional[str] = typer.Option(None, "--exclude", "-e", help="Comma-separated decorators to exclude")
):
"""
Find potentially unused functions and classes.
Example:
cgc analyze dead-code
cgc analyze dead-code --exclude route,task,api
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
exclude_list = exclude_decorators.split(',') if exclude_decorators else []
results = code_finder.find_dead_code(exclude_list)
unused_funcs = results.get('potentially_unused_functions', [])
if not unused_funcs:
console.print("[green]✓ No dead code found![/green]")
return
table = Table(show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Function", style="cyan")
table.add_column("Location", style="dim", overflow="fold")
for func in unused_funcs:
file_path = func.get('file_path', '')
line_str = str(func.get('line_number', ''))
location_str = f"{file_path}:{line_str}" if line_str else file_path
table.add_row(
func.get('function_name', ''),
location_str
)
console.print(f"\n[bold yellow]⚠️ Potentially Unused Functions:[/bold yellow]")
console.print(table)
console.print(f"\n[dim]Total: {len(unused_funcs)} function(s)[/dim]")
console.print(f"[dim]Note: {results.get('note', '')}[/dim]")
finally:
db_manager.close_driver()
@analyze_app.command("overrides")
def analyze_overrides(
function_name: str = typer.Argument(..., help="Function/method name to find implementations of")
):
"""
Find all implementations of a function across different classes.
Useful for finding polymorphic implementations and method overrides.
Example:
cgc analyze overrides area
cgc analyze overrides process
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
results = code_finder.find_function_overrides(function_name)
if not results:
console.print(f"[yellow]No implementations found for function '{function_name}'[/yellow]")
return
table = Table(show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Class", style="cyan")
table.add_column("Function", style="green")
table.add_column("Location", style="dim", overflow="fold")
for res in results:
file_path = res.get('class_file_path', '')
line_str = str(res.get('function_line_number', ''))
location_str = f"{file_path}:{line_str}" if line_str else file_path
table.add_row(
res.get('class_name', ''),
res.get('function_name', ''),
location_str
)
console.print(f"\n[bold cyan]Found {len(results)} implementation(s) of '{function_name}':[/bold cyan]")
console.print(table)
finally:
db_manager.close_driver()
@analyze_app.command("variable")
def analyze_variable_usage(
variable_name: str = typer.Argument(..., help="Variable name to analyze"),
file: Optional[str] = typer.Option(None, "--file", "-f", help="Specific file path")
):
"""
Analyze where a variable is defined and used across the codebase.
Shows all instances of the variable and their scope (function, class, module).
Example:
cgc analyze variable MAX_RETRIES
cgc analyze variable config
cgc analyze variable x --file math_utils.py
"""
_load_credentials()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
try:
# Get variable usage scope
scope_results = code_finder.find_variable_usage_scope(variable_name, file)
instances = scope_results.get('instances', [])
if not instances:
console.print(f"[yellow]No instances found for variable '{variable_name}'[/yellow]")
return
console.print(f"\n[bold cyan]Variable '{variable_name}' Usage Analysis:[/bold cyan]\n")
# Group by scope type
by_scope = {}
for inst in instances:
scope_type = inst.get('scope_type', 'unknown')
if scope_type not in by_scope:
by_scope[scope_type] = []
by_scope[scope_type].append(inst)
# Display by scope
for scope_type, items in by_scope.items():
console.print(f"[bold yellow]{scope_type.upper()} Scope ({len(items)} instance(s)):[/bold yellow]")
table = Table(show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Scope Name", style="cyan")
table.add_column("Location", style="dim", overflow="fold")
table.add_column("Value", style="yellow")
for item in items:
file_path = item.get('file_path', '')
line_str = str(item.get('line_number', ''))
location_str = f"{file_path}:{line_str}" if line_str else file_path
table.add_row(
item.get('scope_name', ''),
location_str,
str(item.get('variable_value', ''))[:50] if item.get('variable_value') else '-'
)
console.print(table)
console.print()
console.print(f"[dim]Total: {len(instances)} instance(s) across {len(by_scope)} scope type(s)[/dim]")
finally:
db_manager.close_driver()
# ============================================================================
# QUERY COMMAND - Raw Cypher Queries
# ============================================================================
@app.command("query")
def query_graph(query: str = typer.Argument(..., help="Cypher query to execute (read-only)")):
"""
Execute a custom Cypher query on the code graph.
Examples:
cgc query "MATCH (f:Function) RETURN f.name LIMIT 10"
cgc query "MATCH (c:Class)-[:CONTAINS]->(m) RETURN c.name, count(m)"
"""
_load_credentials()
cypher_helper(query)
# Keep old 'cypher' as alias for backward compatibility
@app.command("cypher", hidden=True)
def cypher_legacy(query: str = typer.Argument(..., help="The read-only Cypher query to execute.")):
"""[Deprecated] Use 'cgc query' instead."""
console.print("[yellow]⚠️ 'cgc cypher' is deprecated. Use 'cgc query' instead.[/yellow]")
cypher_helper(query)
# ============================================================================
# ABBREVIATIONS / SHORTCUTS for common commands
# ============================================================================
@app.command("i", rich_help_panel="Shortcuts")
def index_abbrev(path: Optional[str] = typer.Argument(None, help="Path to index")):
"""Shortcut for 'cgc index'"""
index(path)
@app.command("ls", rich_help_panel="Shortcuts")
def list_abbrev():
"""Shortcut for 'cgc list'"""
list_repositories()
@app.command("rm", rich_help_panel="Shortcuts")
def delete_abbrev(
path: Optional[str] = typer.Argument(None, help="Path to delete"),
all_repos: bool = typer.Option(False, "--all", help="Delete all indexed repositories")
):
"""Shortcut for 'cgc delete'"""
delete(path, all_repos)
@app.command("v", rich_help_panel="Shortcuts")
def visualize_abbrev(query: Optional[str] = typer.Argument(None, help="Cypher query")):
"""Shortcut for 'cgc visualize'"""
visualize(query)
@app.command("w", rich_help_panel="Shortcuts")
def watch_abbrev(path: str = typer.Argument(".", help="Path to watch")):
"""Shortcut for 'cgc watch'"""
watch(path)
# ============================================================================
@app.command()
def help(ctx: typer.Context):
"""Show the main help message and exit."""
root_ctx = ctx.parent or ctx
typer.echo(root_ctx.get_help())
@app.command("version")
def version_cmd():
"""Show the application version."""
console.print(f"CodeGraphContext [bold cyan]{get_version()}[/bold cyan]")
@app.callback(invoke_without_command=True)
def main(
ctx: typer.Context,
database: Optional[str] = typer.Option(
None,
"--database",
"-db",
help="[Global] Temporarily override database backend (falkordb or neo4j) for any command"
),
version_: bool = typer.Option(
None,
"--version",
"-v",
help="[Root-level only] Show version and exit",
is_eager=True,
),
help_: bool = typer.Option(
None,
"--help",
"-h",
help="[Root-level only] Show help and exit",
is_eager=True,
),
):
"""
Main entry point for the cgc CLI application.
If no subcommand is provided, it displays a welcome message with instructions.
"""
if database:
os.environ["CGC_RUNTIME_DB_TYPE"] = database
if version_:
console.print(f"CodeGraphContext [bold cyan]{get_version()}[/bold cyan]")
raise typer.Exit()
if ctx.invoked_subcommand is None:
console.print("[bold green]👋 Welcome to CodeGraphContext (cgc)![/bold green]\n")
console.print("CodeGraphContext is both an [bold cyan]MCP server[/bold cyan] and a [bold cyan]CLI toolkit[/bold cyan] for code analysis.\n")
console.print("🤖 [bold]For MCP Server Mode (AI assistants):[/bold]")
console.print(" 1. Run [cyan]cgc mcp setup[/cyan] (or [cyan]cgc m[/cyan]) to configure your IDE")
console.print(" 2. Run [cyan]cgc mcp start[/cyan] to launch the server\n")
console.print("🛠️ [bold]For CLI Toolkit Mode (direct usage):[/bold]")
console.print(" • [cyan]cgc index .[/cyan] - Index your current directory")
console.print(" • [cyan]cgc list[/cyan] - List indexed repositories\n")
console.print("📊 [bold]Using Neo4j instead of FalkorDB?[/bold]")
console.print(" Run [cyan]cgc neo4j setup[/cyan] (or [cyan]cgc n[/cyan]) to configure Neo4j\n")
console.print("👉 Run [cyan]cgc help[/cyan] to see all available commands")
console.print("👉 Run [cyan]cgc --version[/cyan] to check the version\n")
console.print("👉 Running [green]codegraphcontext[/green] works the same as using [green]cgc[/green]")
if __name__ == "__main__":
app()