import logging
import uuid
from typing import Any
from ...clients.repo import RelaceRepoClient
from ..core import (
build_cloud_error_details,
clear_sync_state,
get_repo_identity,
load_sync_state,
log_cloud_event,
)
logger = logging.getLogger(__name__)
def cloud_clear_logic(
client: RelaceRepoClient,
base_dir: str,
confirm: bool = False,
repo_id: str | None = None,
) -> dict[str, Any]:
"""Clear (delete) the cloud repository and optionally local sync state.
Args:
client: RelaceRepoClient instance.
base_dir: Base directory of the repository.
confirm: Confirmation flag to proceed with deletion.
repo_id: Optional repo ID to delete directly (bypasses base_dir lookup).
Note: Direct repo_id mode only deletes the cloud repo; local sync
state is not cleared since no base_dir is used for lookup.
Returns:
Dict containing operation result.
"""
trace_id = str(uuid.uuid4())[:8]
logger.debug("[%s] Starting cloud clear", trace_id)
if not confirm:
result = {
"trace_id": trace_id,
"status": "cancelled",
"message": "Operation cancelled. Access to this tool requires 'confirm=True' to proceed with irreversible deletion.",
"repo_id": None,
}
log_cloud_event(
"cloud_clear_start", trace_id, repo_name=None, cloud_repo_name=None, confirm=False
)
log_cloud_event("cloud_clear_complete", trace_id, status="cancelled")
return result
# Direct repo_id mode: bypass base_dir lookup entirely
if repo_id:
logger.debug("[%s] Direct delete mode with repo_id=%s", trace_id, repo_id)
log_cloud_event(
"cloud_clear_start",
trace_id,
repo_name=None,
cloud_repo_name=None,
confirm=True,
repo_id=repo_id,
)
try:
if client.delete_repo(repo_id, trace_id=trace_id):
result = {
"trace_id": trace_id,
"status": "deleted",
"message": f"Repository ({repo_id}) deleted successfully.",
"repo_id": repo_id,
}
log_cloud_event("cloud_clear_complete", trace_id, status="deleted")
return result
else:
result = {
"trace_id": trace_id,
"status": "error",
"message": f"Failed to delete repository ({repo_id}).",
"repo_id": repo_id,
}
log_cloud_event("cloud_clear_complete", trace_id, status="error")
return result
except Exception as exc:
logger.error("[%s] Cloud clear failed: %s", trace_id, exc)
result = {
"trace_id": trace_id,
"status": "error",
"message": f"Operation failed: {str(exc)}",
"error": str(exc),
"repo_id": repo_id,
**build_cloud_error_details(exc),
}
log_cloud_event("cloud_clear_error", trace_id, result=result)
return result
local_repo_name: str | None = None
cloud_repo_name: str | None = None
try:
local_repo_name, cloud_repo_name, _project_fingerprint = get_repo_identity(base_dir)
if not local_repo_name or not cloud_repo_name:
result = {
"trace_id": trace_id,
"status": "error",
"message": "Invalid base_dir: cannot derive repository name from root, current directory, or empty path.",
"repo_id": None,
}
log_cloud_event(
"cloud_clear_start", trace_id, repo_name=None, cloud_repo_name=None, confirm=True
)
log_cloud_event("cloud_clear_error", trace_id, result=result)
return result
log_cloud_event(
"cloud_clear_start",
trace_id,
repo_name=local_repo_name,
cloud_repo_name=cloud_repo_name,
confirm=True,
)
# 1. Try to get repo_id from local sync state (safest)
repo_id = None
sync_state = load_sync_state(base_dir)
if sync_state:
repo_id = sync_state.repo_id
logger.debug("[%s] Found repo_id from local sync state: %s", trace_id, repo_id)
# 2. Fallback: Search by name (riskier, but needed if local state is missing)
if not repo_id:
logger.warning(
"[%s] No local sync state found; searching API for %s", trace_id, local_repo_name
)
repos = client.list_repos(trace_id=trace_id)
matching_repos = []
for r in repos:
# Handle different API response structures if necessary
metadata = r.get("metadata") or {}
r_name = metadata.get("name") or r.get("name")
if r_name == cloud_repo_name:
matching_repos.append(r)
if len(matching_repos) > 1:
logger.error(
"[%s] Multiple repos (%d) found for %s; aborting unsafe delete",
trace_id,
len(matching_repos),
cloud_repo_name,
)
result = {
"trace_id": trace_id,
"status": "error",
"message": f"Multiple repositories found with name '{cloud_repo_name}'. Cannot safely delete unambiguously.",
"repo_name": local_repo_name,
"cloud_repo_name": cloud_repo_name,
}
log_cloud_event("cloud_clear_complete", trace_id, status="error")
return result
if matching_repos:
r = matching_repos[0]
repo_id = r.get("repo_id") or r.get("id")
if not repo_id:
logger.debug("[%s] No repository found on cloud for %s", trace_id, cloud_repo_name)
# Even if repo not found remotely, ensure local state is clean
clear_sync_state(base_dir)
result = {
"trace_id": trace_id,
"status": "not_found",
"message": f"Repository '{cloud_repo_name}' not found on cloud.",
"repo_name": local_repo_name,
"cloud_repo_name": cloud_repo_name,
}
log_cloud_event("cloud_clear_complete", trace_id, status="not_found")
return result
# 3. specific deletion
logger.debug("[%s] Deleting cloud repo %s (repo_id=%s)", trace_id, cloud_repo_name, repo_id)
if client.delete_repo(repo_id, trace_id=trace_id):
# 4. Clear local state only after successful remote deletion
clear_sync_state(base_dir)
result = {
"trace_id": trace_id,
"status": "deleted",
"message": f"Repository '{cloud_repo_name}' ({repo_id}) and local sync state deleted successfully.",
"repo_name": local_repo_name,
"cloud_repo_name": cloud_repo_name,
"repo_id": repo_id,
}
log_cloud_event("cloud_clear_complete", trace_id, status="deleted")
return result
else:
result = {
"trace_id": trace_id,
"status": "error",
"message": f"Failed to delete repository '{cloud_repo_name}' ({repo_id}).",
"repo_name": local_repo_name,
"cloud_repo_name": cloud_repo_name,
"repo_id": repo_id,
}
log_cloud_event("cloud_clear_complete", trace_id, status="error")
return result
except Exception as exc:
logger.error("[%s] Cloud clear failed: %s", trace_id, exc)
result = {
"trace_id": trace_id,
"status": "error",
"message": f"Operation failed: {str(exc)}",
"error": str(exc),
**build_cloud_error_details(exc),
}
log_cloud_event("cloud_clear_error", trace_id, result=result)
return result