import json
import os
import time
from pathlib import Path
from typing import Any, Dict, Optional
import portalocker
from .config import ConfigManager
# Initialize global config
_config = ConfigManager()
def find_project_root(start_path: Path) -> Path:
"""
Find the project root by looking for .ai, .git, or .amicus directories.
"""
current = start_path.resolve()
for parent in [current, *current.parents]:
if (parent / ".ai").exists():
return parent
if (parent / ".git").exists():
return parent
if (parent / ".amicus").exists():
return parent
return start_path
def get_context_bus_dir() -> Path:
"""Get the context bus directory from environment or default."""
env_dir = os.environ.get("CONTEXT_BUS_DIR")
if env_dir:
return Path(env_dir)
# Try to find existing one or default to cwd
root = find_project_root(Path.cwd())
return root / ".ai"
def ensure_directory_exists(directory: Path) -> None:
"""Ensure the directory exists, creating it if necessary."""
directory.mkdir(parents=True, exist_ok=True)
def ensure_gitignore(directory: Path) -> None:
"""Add .ai/ to .gitignore in the parent directory if not already present."""
gitignore_path = directory.parent / ".gitignore"
ai_pattern = ".ai/"
# Read existing .gitignore if it exists
existing_content = ""
if gitignore_path.exists():
with open(gitignore_path, "r") as f:
existing_content = f.read()
# Check if .ai/ is already in .gitignore
if ai_pattern not in existing_content:
# Add .ai/ to .gitignore
with open(gitignore_path, "a") as f:
# Add newline before if file doesn't end with one
if existing_content and not existing_content.endswith("\n"):
f.write("\n")
f.write(f"{ai_pattern}\n")
def get_state_file() -> Path:
"""Get the path to the state.json file."""
context_dir = get_context_bus_dir()
ensure_directory_exists(context_dir)
ensure_gitignore(context_dir)
return context_dir / "state.json"
def get_lock_file() -> Path:
"""Get the path to the lock file."""
return get_state_file().with_suffix(".lock")
def get_tracking_file() -> Path:
"""Get the path to the tracking state file."""
context_dir = get_context_bus_dir()
ensure_directory_exists(context_dir)
return context_dir / "tracking.json"
def atomic_write(file_path: Path, data: Dict[str, Any]) -> None:
"""
Atomically write data to a file using a temporary file.
Args:
file_path: Path to the target file
data: Data to write
"""
temp_path = file_path.with_suffix(".tmp")
# Write to temporary file
with open(temp_path, "w") as f:
json.dump(data, f, indent=2)
# Atomically replace the target file
os.replace(temp_path, file_path)
def read_with_lock(file_path: Path) -> Dict[str, Any]:
"""
Read data from a file with locking.
Args:
file_path: Path to the file to read
Returns:
The data read from the file, or an empty dict if file doesn't exist
"""
lock_path = get_lock_file()
timeout = _config.get("lock_timeout", 10.0)
if not file_path.exists():
return {}
try:
with portalocker.Lock(lock_path, "r", timeout=timeout, flags=portalocker.LOCK_SH | portalocker.LOCK_NB):
with open(file_path, "r") as f:
return json.load(f)
except portalocker.LockException:
return {}
def write_with_lock(file_path: Path, data: Dict[str, Any]) -> None:
"""
Write data to a file with locking and atomic operations.
Args:
file_path: Path to the file to write
data: Data to write
"""
lock_path = get_lock_file()
timeout = _config.get("lock_timeout", 10.0)
with portalocker.Lock(lock_path, "w", timeout=timeout, flags=portalocker.LOCK_EX | portalocker.LOCK_NB):
atomic_write(file_path, data)
def is_tracking_enabled() -> bool:
"""Check if tracking is currently enabled."""
tracking_file = get_tracking_file()
if tracking_file.exists():
try:
with open(tracking_file, "r") as f:
data = json.load(f)
return data.get("enabled", False)
except (json.JSONDecodeError, KeyError):
return False
return False
def set_tracking_enabled(enabled: bool) -> None:
"""Set the tracking enabled state."""
tracking_file = get_tracking_file()
with open(tracking_file, "w") as f:
json.dump({"enabled": enabled}, f)