import shutil
import logging
from git import Repo, GitCommandError
from pathlib import Path
from typing import Optional
# Assuming StatusResponse will be imported from .models if this were a class method
# For standalone functions, they might return simpler dicts or need direct import.
# from .models import StatusResponse # This would be for internal use if part of the package
logger = logging.getLogger(__name__)
REPO_URL = "https://github.com/pydantic/pydantic-ai.git"
# Use a local directory within the project for simplicity, e.g., 'docs_repo'
# instead of Path.home() / ".pydantic_docs"
# This makes the project more self-contained.
PROJECT_ROOT = (
Path(__file__).resolve().parent.parent
) # Assuming this file is in pydantic_docs_server/
LOCAL_DOCS_STORAGE_DIR = PROJECT_ROOT / "docs"
REPO_DIR = LOCAL_DOCS_STORAGE_DIR / "pydantic-ai_repo"
def _ensure_repo_directory_exists():
"""Ensure the base directory for storing the cloned repo exists."""
LOCAL_DOCS_STORAGE_DIR.mkdir(parents=True, exist_ok=True)
logger.info(
f"Ensured repository storage directory exists: {LOCAL_DOCS_STORAGE_DIR}"
)
def get_repo_path() -> Path:
"""Returns the path to the cloned repository."""
return REPO_DIR
def clone_or_pull_repository(force_clone: bool = False) -> dict:
"""Clones the repository if it doesn't exist, or pulls updates.
If force_clone is True, deletes and re-clones.
Returns a dictionary with status and message.
"""
_ensure_repo_directory_exists()
try:
if force_clone and REPO_DIR.exists():
logger.info(f"Force clone: Removing existing repository at {REPO_DIR}")
shutil.rmtree(REPO_DIR)
if not REPO_DIR.exists() or not (REPO_DIR / ".git").exists():
logger.info(f"Cloning repository from {REPO_URL} to {REPO_DIR}")
Repo.clone_from(REPO_URL, str(REPO_DIR))
logger.info("Repository cloned successfully.")
return {"status": "success", "message": "Repository cloned successfully."}
else:
logger.info(f"Repository exists at {REPO_DIR}. Pulling latest changes.")
repo = Repo(str(REPO_DIR))
origin = repo.remotes.origin
origin.pull()
logger.info("Repository updated successfully.")
return {"status": "success", "message": "Repository updated successfully."}
except GitCommandError as e:
logger.error(f"Git command error: {e}")
return {"status": "error", "message": f"Git command failed: {str(e)}"}
except Exception as e:
logger.error(
f"Unexpected error during repository operation: {e}", exc_info=True
)
return {"status": "error", "message": f"An unexpected error occurred: {str(e)}"}
def get_current_commit_hash() -> Optional[str]:
"""Get the current commit hash of the cloned repository."""
if REPO_DIR.exists() and (REPO_DIR / ".git").exists():
try:
repo = Repo(str(REPO_DIR))
return repo.head.commit.hexsha
except Exception as e:
logger.error(f"Could not get commit hash: {e}")
return None
return None