"""Directory operations business logic."""
import logging
import shutil
from pathlib import Path
from ..schemas.directory import DirectoryEntry, DirectoryListResult, DirectoryMetadata
from ..schemas.common import OperationResult
from ..core.safety import validate_path, get_relative_path
logger = logging.getLogger(__name__)
class DirectoryService:
"""Handles all directory operations."""
def __init__(self, root: Path):
"""
Initialize directory service.
Args:
root: Root directory for all operations
"""
self.root = root
def list_directories(self, path: str) -> DirectoryListResult:
"""
List all subdirectories in a given path.
Args:
path: Relative path from root directory
Returns:
DirectoryListResult with list of directories
Raises:
FileNotFoundError: If directory not found
NotADirectoryError: If path is not a directory
"""
logger.debug(f"list_directories called: path={path}, root={self.root}")
# Validate and resolve path
target_path = validate_path(path, self.root)
logger.debug(f"Resolved path: {target_path}")
if not target_path.exists():
raise FileNotFoundError(f"Directory not found: {path}")
if not target_path.is_dir():
raise NotADirectoryError(f"Not a directory: {path}")
# List directories
directories = []
for item in target_path.iterdir():
if item.is_dir():
stat = item.stat()
# Count items in directory
try:
item_count = sum(1 for _ in item.iterdir())
except (PermissionError, OSError):
item_count = 0
directories.append(
DirectoryEntry(
name=item.name,
path=get_relative_path(item, self.root),
item_count=item_count,
created=stat.st_ctime,
modified=stat.st_mtime,
)
)
# Sort by name
directories.sort(key=lambda d: d.name.lower())
logger.debug(f"Found {len(directories)} directories in {path}")
return DirectoryListResult(
directories=directories,
total_count=len(directories),
)
def create_directory(self, path: str, create_parents: bool = False) -> OperationResult:
"""
Create a new directory.
Args:
path: Relative path for the new directory
create_parents: If True, create parent directories as needed
Returns:
OperationResult indicating success or failure
"""
logger.debug(f"create_directory called: path={path}, create_parents={create_parents}")
# Validate path
target_path = validate_path(path, self.root)
logger.debug(f"Resolved path: {target_path}")
# Check if already exists
if target_path.exists():
logger.warning(f"Directory already exists: {target_path}")
return OperationResult(
success=False,
message=f"Directory already exists: {path}",
path=get_relative_path(target_path, self.root),
)
try:
if create_parents:
target_path.mkdir(parents=True, exist_ok=False)
else:
target_path.mkdir(parents=False, exist_ok=False)
logger.info(f"Directory created successfully: {target_path}")
return OperationResult(
success=True,
message=f"Directory created successfully: {path}",
path=get_relative_path(target_path, self.root),
)
except FileNotFoundError:
return OperationResult(
success=False,
message="Parent directory does not exist. Use create_parents=True to create it.",
path=get_relative_path(target_path, self.root),
)
except Exception as e:
return OperationResult(
success=False,
message=f"Failed to create directory: {str(e)}",
path=get_relative_path(target_path, self.root),
)
def delete_directory(self, path: str, recursive: bool = False) -> OperationResult:
"""
Delete a directory.
Args:
path: Relative path to the directory to delete
recursive: If True, delete directory and all contents
Returns:
OperationResult indicating success or failure
"""
logger.debug(f"delete_directory called: path={path}, recursive={recursive}")
# Validate and resolve path
target_path = validate_path(path, self.root)
logger.debug(f"Resolved path: {target_path}")
if not target_path.exists():
return OperationResult(
success=False,
message=f"Directory not found: {path}",
path=get_relative_path(target_path, self.root),
)
if not target_path.is_dir():
return OperationResult(
success=False,
message=f"Not a directory: {path}",
path=get_relative_path(target_path, self.root),
)
try:
if recursive:
logger.warning(f"Recursive delete: {target_path}")
shutil.rmtree(target_path)
else:
target_path.rmdir()
logger.info(f"Directory deleted successfully: {target_path}")
return OperationResult(
success=True,
message=f"Directory deleted successfully: {path}",
path=get_relative_path(target_path, self.root),
)
except OSError as e:
if "Directory not empty" in str(e):
return OperationResult(
success=False,
message="Directory not empty. Use recursive=True to delete with contents.",
path=get_relative_path(target_path, self.root),
)
else:
return OperationResult(
success=False,
message=f"Failed to delete directory: {str(e)}",
path=get_relative_path(target_path, self.root),
)
def move_directory(self, source_path: str, destination_path: str) -> OperationResult:
"""
Move or rename a directory.
Args:
source_path: Relative path to the source directory
destination_path: Relative path to the destination
Returns:
OperationResult indicating success or failure
"""
# Validate both paths
source = validate_path(source_path, self.root)
destination = validate_path(destination_path, self.root)
if not source.exists():
return OperationResult(
success=False,
message=f"Source directory not found: {source_path}",
path=get_relative_path(source, self.root),
)
if not source.is_dir():
return OperationResult(
success=False,
message=f"Source is not a directory: {source_path}",
path=get_relative_path(source, self.root),
)
if destination.exists():
return OperationResult(
success=False,
message=f"Destination already exists: {destination_path}",
path=get_relative_path(destination, self.root),
)
try:
shutil.move(str(source), str(destination))
return OperationResult(
success=True,
message=f"Directory moved from '{source_path}' to '{destination_path}'",
path=get_relative_path(destination, self.root),
)
except Exception as e:
return OperationResult(
success=False,
message=f"Failed to move directory: {str(e)}",
path=get_relative_path(source, self.root),
)
def get_directory_metadata(self, path: str) -> DirectoryMetadata:
"""
Get detailed metadata about a directory.
Args:
path: Relative path to the directory
Returns:
DirectoryMetadata with detailed information
"""
# Validate and resolve path
target_path = validate_path(path, self.root)
if not target_path.exists():
return DirectoryMetadata(
path=get_relative_path(target_path, self.root),
exists=False,
item_count=0,
file_count=0,
directory_count=0,
created=0,
modified=0,
accessed=0,
)
if not target_path.is_dir():
raise NotADirectoryError(f"Not a directory: {path}")
stat = target_path.stat()
# Count items
try:
items = list(target_path.iterdir())
file_count = sum(1 for item in items if item.is_file())
directory_count = sum(1 for item in items if item.is_dir())
item_count = len(items)
except (PermissionError, OSError):
file_count = 0
directory_count = 0
item_count = 0
return DirectoryMetadata(
path=get_relative_path(target_path, self.root),
exists=True,
item_count=item_count,
file_count=file_count,
directory_count=directory_count,
created=stat.st_ctime,
modified=stat.st_mtime,
accessed=stat.st_atime,
)