Skip to main content
Glama
by cycodehq
commit_range_documents.py18.2 kB
import os import sys from typing import TYPE_CHECKING, Optional import typer from cycode.cli import consts from cycode.cli.files_collector.repository_documents import ( get_file_content_from_commit_path, ) from cycode.cli.models import Document from cycode.cli.utils.git_proxy import git_proxy from cycode.cli.utils.path_utils import get_file_content, get_path_by_os from cycode.cli.utils.progress_bar import ScanProgressBarSection from cycode.logger import get_logger if TYPE_CHECKING: from git import Diff, Repo from cycode.cli.utils.progress_bar import BaseProgressBar, ProgressBarSection logger = get_logger('Commit Range Collector') def get_safe_head_reference_for_diff(repo: 'Repo') -> str: """Get a safe reference to use for diffing against the current HEAD. In repositories with no commits, HEAD doesn't exist, so we return the empty tree hash. Args: repo: Git repository object Returns: Either "HEAD" string if commits exist, or empty tree hash if no commits exist """ try: repo.rev_parse(consts.GIT_HEAD_COMMIT_REV) return consts.GIT_HEAD_COMMIT_REV except Exception as e: # actually gitdb.exc.BadObject; no import because of lazy loading logger.debug( 'Repository has no commits, using empty tree hash for diffs, %s', {'repo_path': repo.working_tree_dir}, exc_info=e, ) # Repository has no commits, use the universal empty tree hash # This is the standard Git approach for initial commits return consts.GIT_EMPTY_TREE_OBJECT def _does_reach_to_max_commits_to_scan_limit(commit_ids: list[str], max_commits_count: Optional[int]) -> bool: if max_commits_count is None: return False return len(commit_ids) >= max_commits_count def collect_commit_range_diff_documents( ctx: typer.Context, path: str, commit_range: str, max_commits_count: Optional[int] = None ) -> list[Document]: """Collects documents from a specified commit range in a Git repository. Return a list of Document objects containing the diffs of files changed in each commit. """ progress_bar = ctx.obj['progress_bar'] commit_ids_to_scan = [] commit_documents_to_scan = [] repo = git_proxy.get_repo(path) total_commits_count = int(repo.git.rev_list('--count', commit_range)) logger.debug('Calculating diffs for %s commits in the commit range %s', total_commits_count, commit_range) progress_bar.set_section_length(ScanProgressBarSection.PREPARE_LOCAL_FILES, total_commits_count) for scanned_commits_count, commit in enumerate(repo.iter_commits(rev=commit_range)): if _does_reach_to_max_commits_to_scan_limit(commit_ids_to_scan, max_commits_count): logger.debug('Reached to max commits to scan count. Going to scan only %s last commits', max_commits_count) progress_bar.update(ScanProgressBarSection.PREPARE_LOCAL_FILES, total_commits_count - scanned_commits_count) break progress_bar.update(ScanProgressBarSection.PREPARE_LOCAL_FILES) commit_id = commit.hexsha commit_ids_to_scan.append(commit_id) parent = commit.parents[0] if commit.parents else git_proxy.get_null_tree() diff_index = commit.diff(parent, create_patch=True, R=True) for diff in diff_index: commit_documents_to_scan.append( Document( path=get_path_by_os(get_diff_file_path(diff, repo=repo)), content=get_diff_file_content(diff), is_git_diff_format=True, unique_id=commit_id, ) ) logger.debug( 'Found all relevant files in commit %s', {'path': path, 'commit_range': commit_range, 'commit_id': commit_id}, ) logger.debug('List of commit ids to scan, %s', {'commit_ids': commit_ids_to_scan}) return commit_documents_to_scan def calculate_pre_receive_commit_range(repo_path: str, branch_update_details: str) -> Optional[str]: end_commit = _get_end_commit_from_branch_update_details(branch_update_details) # branch is deleted, no need to perform scan if end_commit == consts.EMPTY_COMMIT_SHA: return None repo = git_proxy.get_repo(repo_path) start_commit = _get_oldest_unupdated_commit_for_branch(repo, end_commit) # no new commit to update found if not start_commit: return None # If the oldest not-yet-updated commit has no parent (root commit or orphaned history), # using '~1' will fail. In that case, scan from the end commit, which effectively # includes the entire history reachable from it (which is exactly what we need here). if not bool(repo.commit(start_commit).parents): return f'{end_commit}' return f'{start_commit}~1...{end_commit}' def _get_end_commit_from_branch_update_details(update_details: str) -> str: # update details pattern: <start_commit> <end_commit> <ref> _, end_commit, _ = update_details.split() return end_commit def _get_oldest_unupdated_commit_for_branch(repo: 'Repo', commit: str) -> Optional[str]: # get a list of commits by chronological order that are not in the remote repository yet # more info about rev-list command: https://git-scm.com/docs/git-rev-list not_updated_commits = repo.git.rev_list(commit, '--topo-order', '--reverse', '--not', '--all') commits = not_updated_commits.splitlines() if not commits: return None return commits[0] def _get_file_content_from_commit_diff(repo: 'Repo', commit: str, diff: 'Diff') -> Optional[str]: file_path = get_diff_file_path(diff, relative=True) return get_file_content_from_commit_path(repo, commit, file_path) def get_commit_range_modified_documents( progress_bar: 'BaseProgressBar', progress_bar_section: 'ProgressBarSection', path: str, from_commit_rev: str, to_commit_rev: str, reverse_diff: bool = True, ) -> tuple[list[Document], list[Document], list[Document]]: from_commit_documents = [] to_commit_documents = [] diff_documents = [] repo = git_proxy.get_repo(path) diff_index = repo.commit(from_commit_rev).diff(to_commit_rev, create_patch=True, R=reverse_diff) modified_files_diff = [ diff for diff in diff_index if diff.change_type != consts.COMMIT_DIFF_DELETED_FILE_CHANGE_TYPE ] progress_bar.set_section_length(progress_bar_section, len(modified_files_diff)) for diff in modified_files_diff: progress_bar.update(progress_bar_section) file_path = get_path_by_os(get_diff_file_path(diff, repo=repo)) diff_documents.append( Document( path=file_path, content=get_diff_file_content(diff), is_git_diff_format=True, ) ) file_content = _get_file_content_from_commit_diff(repo, from_commit_rev, diff) if file_content is not None: from_commit_documents.append(Document(file_path, file_content)) file_content = _get_file_content_from_commit_diff(repo, to_commit_rev, diff) if file_content is not None: to_commit_documents.append(Document(file_path, file_content)) return from_commit_documents, to_commit_documents, diff_documents def parse_pre_receive_input() -> str: """Parse input to pushed branch update details. Example input: old_value new_value refname ----------------------------------------------- 0000000000000000000000000000000000000000 9cf90954ef26e7c58284f8ebf7dcd0fcf711152a refs/heads/main 973a96d3e925b65941f7c47fa16129f1577d499f 0000000000000000000000000000000000000000 refs/heads/feature-branch 59564ef68745bca38c42fc57a7822efd519a6bd9 3378e52dcfa47fb11ce3a4a520bea5f85d5d0bf3 refs/heads/develop :return: First branch update details (input's first line) """ pre_receive_input = _read_hook_input_from_stdin() if not pre_receive_input: raise ValueError( 'Pre receive input was not found. Make sure that you are using this command only in pre-receive hook' ) # each line represents a branch update request, handle the first one only # TODO(MichalBor): support case of multiple update branch requests return pre_receive_input.splitlines()[0] def parse_pre_push_input() -> str: """Parse input to pre-push hook details. Example input: local_ref local_object_name remote_ref remote_object_name --------------------------------------------------------- refs/heads/main 9cf90954ef26e7c58284f8ebf7dcd0fcf711152a refs/heads/main 973a96d3e925b65941f7c47fa16129f1577d499f refs/heads/feature-branch 3378e52dcfa47fb11ce3a4a520bea5f85d5d0bf3 refs/heads/feature-branch 59564ef68745bca38c42fc57a7822efd519a6bd9 :return: First, push update details (input's first line) """ # noqa: E501 pre_push_input = _read_hook_input_from_stdin() if not pre_push_input: raise ValueError( 'Pre push input was not found. Make sure that you are using this command only in pre-push hook' ) # each line represents a branch push request, handle the first one only return pre_push_input.splitlines()[0] def _read_hook_input_from_stdin() -> str: """Read input from stdin when called from a hook. If called manually from the command line, return an empty string so it doesn't block the main thread. Returns: Input from stdin """ if sys.stdin.isatty(): return '' return sys.stdin.read().strip() def _get_default_branches_for_merge_base(repo: 'Repo') -> list[str]: """Get a list of default branches to try for merge base calculation. Priority order: 1. Environment variable CYCODE_DEFAULT_BRANCH 2. Git remote HEAD (git symbolic-ref refs/remotes/origin/HEAD) 3. Fallback to common default branch names Args: repo: Git repository object Returns: List of branch names to try for merge base calculation """ default_branches = [] # 1. Check environment variable first env_default_branch = os.getenv(consts.CYCODE_DEFAULT_BRANCH_ENV_VAR_NAME) if env_default_branch: logger.debug('Using default branch from environment variable: %s', env_default_branch) default_branches.append(env_default_branch) # 2. Try to get the actual default branch from remote HEAD try: remote_head = repo.git.symbolic_ref('refs/remotes/origin/HEAD') # symbolic-ref returns something like "refs/remotes/origin/main" if remote_head.startswith('refs/remotes/origin/'): default_branch = remote_head.replace('refs/remotes/origin/', '') logger.debug('Found remote default branch: %s', default_branch) # Add both the remote tracking branch and local branch variants default_branches.extend([f'origin/{default_branch}', default_branch]) except Exception as e: logger.debug('Failed to get remote HEAD via symbolic-ref: %s', exc_info=e) # Try an alternative method: git remote show origin try: remote_info = repo.git.remote('show', 'origin') for line in remote_info.splitlines(): if 'HEAD branch:' in line: default_branch = line.split('HEAD branch:')[1].strip() logger.debug('Found default branch via remote show: %s', default_branch) default_branches.extend([f'origin/{default_branch}', default_branch]) break except Exception as e2: logger.debug('Failed to get remote info via remote show: %s', exc_info=e2) # 3. Add fallback branches (avoiding duplicates) fallback_branches = ['origin/main', 'origin/master', 'main', 'master'] for branch in fallback_branches: if branch not in default_branches: default_branches.append(branch) logger.debug('Default branches to try: %s', default_branches) return default_branches def calculate_pre_push_commit_range(push_update_details: str) -> Optional[str]: """Calculate the commit range for pre-push hook scanning. Args: push_update_details: String in format "local_ref local_object_name remote_ref remote_object_name" Returns: Commit range string for scanning, or None if no scanning is needed Environment Variables: CYCODE_DEFAULT_BRANCH: Override the default branch for merge base calculation """ local_ref, local_object_name, remote_ref, remote_object_name = push_update_details.split() if remote_object_name == consts.EMPTY_COMMIT_SHA: try: repo = git_proxy.get_repo(os.getcwd()) default_branches = _get_default_branches_for_merge_base(repo) merge_base = None for default_branch in default_branches: try: merge_base = repo.git.merge_base(local_object_name, default_branch) logger.debug('Found merge base %s with branch %s', merge_base, default_branch) break except Exception as e: logger.debug('Failed to find merge base with %s: %s', default_branch, exc_info=e) continue if merge_base: return f'{merge_base}..{local_object_name}' logger.debug('Failed to find merge base with any default branch') return '--all' except Exception as e: logger.debug('Failed to get repo for pre-push commit range calculation: %s', exc_info=e) return '--all' # If deleting a branch (local_object_name is all zeros), no need to scan if local_object_name == consts.EMPTY_COMMIT_SHA: return None # For updates to existing branches, scan from remote to local return f'{remote_object_name}..{local_object_name}' def get_diff_file_path(diff: 'Diff', relative: bool = False, repo: Optional['Repo'] = None) -> Optional[str]: """Get the file path from a git Diff object. Args: diff: Git Diff object representing the file change relative: If True, return the path relative to the repository root; otherwise, return an absolute path IF possible repo: Optional Git Repo object, used to resolve absolute paths Note: It tries to get the absolute path, falling back to relative paths. `relative` flag forces relative paths. One case of relative paths is when the repository is bare and does not have a working tree directory. """ # try blob-based paths first (most reliable when available) blob = diff.b_blob if diff.b_blob else diff.a_blob if blob: if relative: return blob.path if repo and repo.working_tree_dir: return blob.abspath path = diff.b_path if diff.b_path else diff.a_path # relative path within the repo if not relative and path and repo and repo.working_tree_dir: # convert to the absolute path using the repo's working tree directory path = os.path.join(repo.working_tree_dir, path) return path def get_diff_file_content(diff: 'Diff') -> str: return diff.diff.decode('UTF-8', errors='replace') def get_pre_commit_modified_documents( progress_bar: 'BaseProgressBar', progress_bar_section: 'ProgressBarSection', repo_path: str, ) -> tuple[list[Document], list[Document], list[Document]]: git_head_documents = [] pre_committed_documents = [] diff_documents = [] repo = git_proxy.get_repo(repo_path) head_reference = get_safe_head_reference_for_diff(repo) diff_index = repo.index.diff(head_reference, create_patch=True, R=True) progress_bar.set_section_length(progress_bar_section, len(diff_index)) for diff in diff_index: progress_bar.update(progress_bar_section) file_path = get_path_by_os(get_diff_file_path(diff, repo=repo)) diff_documents.append( Document( path=file_path, content=get_diff_file_content(diff), is_git_diff_format=True, ) ) # Only get file content from HEAD if HEAD exists (not the empty tree hash) if head_reference == consts.GIT_HEAD_COMMIT_REV: file_content = _get_file_content_from_commit_diff(repo, head_reference, diff) if file_content: git_head_documents.append(Document(file_path, file_content)) if os.path.exists(file_path): file_content = get_file_content(file_path) if file_content: pre_committed_documents.append(Document(file_path, file_content)) return git_head_documents, pre_committed_documents, diff_documents def parse_commit_range(commit_range: str, path: str) -> tuple[Optional[str], Optional[str]]: """Parses a git commit range string and returns the full SHAs for the 'from' and 'to' commits. Supports: - 'from..to' - 'from...to' - 'commit' (interpreted as 'commit..HEAD') - '..to' (interpreted as 'HEAD..to') - 'from..' (interpreted as 'from..HEAD') """ repo = git_proxy.get_repo(path) if '...' in commit_range: from_spec, to_spec = commit_range.split('...', 1) elif '..' in commit_range: from_spec, to_spec = commit_range.split('..', 1) else: # Git commands like 'git diff <commit>' compare against HEAD. from_spec = commit_range to_spec = consts.GIT_HEAD_COMMIT_REV # If a spec is empty (e.g., from '..master'), default it to 'HEAD' if not from_spec: from_spec = consts.GIT_HEAD_COMMIT_REV if not to_spec: to_spec = consts.GIT_HEAD_COMMIT_REV try: # Use rev_parse to resolve each specifier to its full commit SHA from_commit_rev = repo.rev_parse(from_spec).hexsha to_commit_rev = repo.rev_parse(to_spec).hexsha return from_commit_rev, to_commit_rev except git_proxy.get_git_command_error() as e: logger.warning("Failed to parse commit range '%s'", commit_range, exc_info=e) return None, None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cycodehq/cycode-cli'

If you have feedback or need assistance with the MCP directory API, please join our Discord server