file_utils.py•7 kB
"""Utilities for file operations."""
import hashlib
from pathlib import Path
import re
from typing import Any, Dict, Union
import aiofiles
import yaml
import frontmatter
from loguru import logger
from basic_memory.utils import FilePath
class FileError(Exception):
    """Base exception for file operations."""
    pass
class FileWriteError(FileError):
    """Raised when file operations fail."""
    pass
class ParseError(FileError):
    """Raised when parsing file content fails."""
    pass
async def compute_checksum(content: Union[str, bytes]) -> str:
    """
    Compute SHA-256 checksum of content.
    Args:
        content: Content to hash (either text string or bytes)
    Returns:
        SHA-256 hex digest
    Raises:
        FileError: If checksum computation fails
    """
    try:
        if isinstance(content, str):
            content = content.encode()
        return hashlib.sha256(content).hexdigest()
    except Exception as e:  # pragma: no cover
        logger.error(f"Failed to compute checksum: {e}")
        raise FileError(f"Failed to compute checksum: {e}")
async def write_file_atomic(path: FilePath, content: str) -> None:
    """
    Write file with atomic operation using temporary file.
    Uses aiofiles for true async I/O (non-blocking).
    Args:
        path: Target file path (Path or string)
        content: Content to write
    Raises:
        FileWriteError: If write operation fails
    """
    # Convert string to Path if needed
    path_obj = Path(path) if isinstance(path, str) else path
    temp_path = path_obj.with_suffix(".tmp")
    try:
        # Use aiofiles for non-blocking write
        async with aiofiles.open(temp_path, mode="w", encoding="utf-8") as f:
            await f.write(content)
        # Atomic rename (this is fast, doesn't need async)
        temp_path.replace(path_obj)
        logger.debug("Wrote file atomically", path=str(path_obj), content_length=len(content))
    except Exception as e:  # pragma: no cover
        temp_path.unlink(missing_ok=True)
        logger.error("Failed to write file", path=str(path_obj), error=str(e))
        raise FileWriteError(f"Failed to write file {path}: {e}")
def has_frontmatter(content: str) -> bool:
    """
    Check if content contains valid YAML frontmatter.
    Args:
        content: Content to check
    Returns:
        True if content has valid frontmatter markers (---), False otherwise
    """
    if not content:
        return False
    content = content.strip()
    if not content.startswith("---"):
        return False
    return "---" in content[3:]
def parse_frontmatter(content: str) -> Dict[str, Any]:
    """
    Parse YAML frontmatter from content.
    Args:
        content: Content with YAML frontmatter
    Returns:
        Dictionary of frontmatter values
    Raises:
        ParseError: If frontmatter is invalid or parsing fails
    """
    try:
        if not content.strip().startswith("---"):
            raise ParseError("Content has no frontmatter")
        # Split on first two occurrences of ---
        parts = content.split("---", 2)
        if len(parts) < 3:
            raise ParseError("Invalid frontmatter format")
        # Parse YAML
        try:
            frontmatter = yaml.safe_load(parts[1])
            # Handle empty frontmatter (None from yaml.safe_load)
            if frontmatter is None:
                return {}
            if not isinstance(frontmatter, dict):
                raise ParseError("Frontmatter must be a YAML dictionary")
            return frontmatter
        except yaml.YAMLError as e:
            raise ParseError(f"Invalid YAML in frontmatter: {e}")
    except Exception as e:  # pragma: no cover
        if not isinstance(e, ParseError):
            logger.error(f"Failed to parse frontmatter: {e}")
            raise ParseError(f"Failed to parse frontmatter: {e}")
        raise
def remove_frontmatter(content: str) -> str:
    """
    Remove YAML frontmatter from content.
    Args:
        content: Content with frontmatter
    Returns:
        Content with frontmatter removed, or original content if no frontmatter
    Raises:
        ParseError: If content starts with frontmatter marker but is malformed
    """
    content = content.strip()
    # Return as-is if no frontmatter marker
    if not content.startswith("---"):
        return content
    # Split on first two occurrences of ---
    parts = content.split("---", 2)
    if len(parts) < 3:
        raise ParseError("Invalid frontmatter format")
    return parts[2].strip()
def dump_frontmatter(post: frontmatter.Post) -> str:
    """
    Serialize frontmatter.Post to markdown with Obsidian-compatible YAML format.
    This function ensures that tags are formatted as YAML lists instead of JSON arrays:
    Good (Obsidian compatible):
    ---
    tags:
    - system
    - overview
    - reference
    ---
    Bad (current behavior):
    ---
    tags: ["system", "overview", "reference"]
    ---
    Args:
        post: frontmatter.Post object to serialize
    Returns:
        String containing markdown with properly formatted YAML frontmatter
    """
    if not post.metadata:
        # No frontmatter, just return content
        return post.content
    # Serialize YAML with block style for lists
    yaml_str = yaml.dump(
        post.metadata, sort_keys=False, allow_unicode=True, default_flow_style=False
    )
    # Construct the final markdown with frontmatter
    if post.content:
        return f"---\n{yaml_str}---\n\n{post.content}"
    else:
        return f"---\n{yaml_str}---\n"
def sanitize_for_filename(text: str, replacement: str = "-") -> str:
    """
    Sanitize string to be safe for use as a note title
    Replaces path separators and other problematic characters
    with hyphens.
    """
    # replace both POSIX and Windows path separators
    text = re.sub(r"[/\\]", replacement, text)
    # replace some other problematic chars
    text = re.sub(r'[<>:"|?*]', replacement, text)
    # compress multiple, repeated replacements
    text = re.sub(f"{re.escape(replacement)}+", replacement, text)
    return text.strip(replacement)
def sanitize_for_folder(folder: str) -> str:
    """
    Sanitize folder path to be safe for use in file system paths.
    Removes leading/trailing whitespace, compresses multiple slashes,
    and removes special characters except for /, -, and _.
    """
    if not folder:
        return ""
    sanitized = folder.strip()
    if sanitized.startswith("./"):
        sanitized = sanitized[2:]
    # ensure no special characters (except for a few that are allowed)
    sanitized = "".join(
        c for c in sanitized if c.isalnum() or c in (".", " ", "-", "_", "\\", "/")
    ).rstrip()
    # compress multiple, repeated instances of path separators
    sanitized = re.sub(r"[\\/]+", "/", sanitized)
    # trim any leading/trailing path separators
    sanitized = sanitized.strip("\\/")
    return sanitized