Deskaid

#!/usr/bin/env python3 import asyncio import logging import os from ..access import check_edit_permission from ..git import commit_changes __all__ = [ "check_file_path_and_permissions", "check_git_tracking_for_existing_file", "ensure_directory_exists", "write_text_content", ] async def check_file_path_and_permissions(file_path: str) -> tuple[bool, str | None]: """Check if the file path is valid and has the necessary permissions. Args: file_path: The absolute path to the file Returns: A tuple of (is_valid, error_message) If is_valid is True, error_message will be None """ # Check that the path is absolute if not os.path.isabs(file_path): return False, f"File path must be absolute, not relative: {file_path}" # Check if we have permission to edit this file is_permitted, permission_message = await check_edit_permission(file_path) if not is_permitted: return False, permission_message return True, None async def check_git_tracking_for_existing_file( file_path: str, chat_id: str, ) -> tuple[bool, str | None]: """Check if an existing file is tracked by git. Skips check for non-existent files. Args: file_path: The absolute path to the file Returns: A tuple of (success, error_message) If success is True, error_message will be None """ # Check if the file exists file_exists = os.path.exists(file_path) if file_exists: # Check if the file is tracked by git - use ls-files directly since we just need to check tracking directory = os.path.dirname(file_path) # Check if the file is tracked by git from ..shell import run_command file_status = await run_command( ["git", "ls-files", "--error-unmatch", file_path], cwd=directory, capture_output=True, text=True, check=False, ) file_is_tracked = file_status.returncode == 0 # If the file is not tracked, return an error if not file_is_tracked: error_msg = "File is not tracked by git. Please add the file to git tracking first using 'git add <file>'" return False, error_msg # If there are other uncommitted changes, commit them commit_success, commit_message = await commit_changes( file_path, description="Snapshot before codemcp change", chat_id=chat_id, ) if not commit_success: logging.debug(f"Failed to commit pending changes: {commit_message}") else: logging.debug(f"Pending changes status: {commit_message}") return True, None def ensure_directory_exists(file_path: str) -> None: """Ensure the directory for the file exists, creating it if necessary. Args: file_path: The absolute path to the file """ directory = os.path.dirname(file_path) if not os.path.exists(directory): os.makedirs(directory, exist_ok=True) async def write_text_content( file_path: str, content: str, encoding: str = "utf-8", line_endings: str = None, ) -> None: """Write text content to a file with specified encoding and line endings. Args: file_path: The path to the file content: The content to write encoding: The encoding to use line_endings: The line endings to use ('CRLF', 'LF', '\r\n', or '\n') """ # Handle different line ending formats: string constants or actual characters if isinstance(line_endings, str): if line_endings.upper() == "CRLF": actual_line_endings = "\r\n" elif line_endings.upper() == "LF": actual_line_endings = "\n" else: # Assume it's already the character sequence actual_line_endings = line_endings else: # Default to system line endings if None actual_line_endings = os.linesep # First normalize all line endings to \n normalized_content = content.replace("\r\n", "\n") # Then replace with the desired line endings if different from \n if actual_line_endings != "\n": final_content = normalized_content.replace("\n", actual_line_endings) else: final_content = normalized_content # Ensure directory exists ensure_directory_exists(file_path) # Write the content asynchronously using run_in_executor loop = asyncio.get_event_loop() await loop.run_in_executor( None, lambda: write_file_sync(file_path, final_content, encoding) ) def write_file_sync(file_path: str, content: str, encoding: str = "utf-8") -> None: """Synchronous helper function to write file content. Args: file_path: The path to the file content: The content to write encoding: The encoding to use """ with open(file_path, "w", encoding=encoding) as f: f.write(content)