use crate::index::{IndexConfig, Indexer};
use crate::libs;
use crate::memory::MemoryStore;
use crate::ollama::OllamaEmbedder;
use crate::repo_manager;
use crate::search::MemoryState;
use crate::watcher;
use anyhow::Result;
use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{info, warn};
#[derive(Clone)]
pub struct RepoRuntime {
pub repo_id: String,
pub legacy_repo_id: String,
pub repo_root: PathBuf,
pub indexer: Arc<Indexer>,
pub libs_indexer: Option<Arc<libs::LibsIndexer>>,
pub memory: Option<MemoryState>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RepoMountStatus {
Ready,
Indexing,
}
impl RepoMountStatus {
pub fn as_str(self) -> &'static str {
match self {
RepoMountStatus::Ready => "ready",
RepoMountStatus::Indexing => "indexing",
}
}
}
pub struct RepoMount {
pub repo: Arc<RepoRuntime>,
pub status: RepoMountStatus,
}
struct RepoEntry {
runtime: Arc<RepoRuntime>,
watcher: Option<watcher::WatcherHandle>,
last_access: Instant,
}
pub struct RepoManager {
repos: RwLock<HashMap<String, Arc<Mutex<RepoEntry>>>>,
legacy_repos: RwLock<HashMap<String, Arc<Mutex<RepoEntry>>>>,
memory_embedder: Option<OllamaEmbedder>,
shared_state_dir: Option<PathBuf>,
pinned_repo_id: RwLock<Option<String>>,
idle_timeout: Duration,
hibernate_timeout: Duration,
cleanup_interval: Duration,
}
impl RepoManager {
pub fn new(memory_embedder: Option<OllamaEmbedder>, shared_state_dir: Option<PathBuf>) -> Self {
fn duration_from_env(var: &str, default: Duration) -> Duration {
let Ok(value) = std::env::var(var) else {
return default;
};
let Ok(seconds) = value.trim().parse::<u64>() else {
return default;
};
if seconds == 0 {
return default;
}
Duration::from_secs(seconds)
}
let idle_timeout =
duration_from_env("DOCDEX_REPO_IDLE_SECONDS", Duration::from_secs(2 * 60 * 60));
let mut hibernate_timeout = duration_from_env(
"DOCDEX_REPO_HIBERNATE_SECONDS",
Duration::from_secs(24 * 60 * 60),
);
if hibernate_timeout < idle_timeout {
hibernate_timeout = idle_timeout + Duration::from_secs(60);
}
let cleanup_interval = duration_from_env(
"DOCDEX_REPO_CLEANUP_INTERVAL_SECONDS",
Duration::from_secs(600),
);
Self {
repos: RwLock::new(HashMap::new()),
legacy_repos: RwLock::new(HashMap::new()),
memory_embedder,
shared_state_dir,
pinned_repo_id: RwLock::new(None),
idle_timeout,
hibernate_timeout,
cleanup_interval,
}
}
#[cfg(test)]
pub(crate) fn new_with_timeouts(
memory_embedder: Option<OllamaEmbedder>,
shared_state_dir: Option<PathBuf>,
idle_timeout: Duration,
hibernate_timeout: Duration,
cleanup_interval: Duration,
) -> Self {
Self {
repos: RwLock::new(HashMap::new()),
legacy_repos: RwLock::new(HashMap::new()),
memory_embedder,
shared_state_dir,
pinned_repo_id: RwLock::new(None),
idle_timeout,
hibernate_timeout,
cleanup_interval,
}
}
pub fn pin_repo(&self, repo_id: String) {
*self.pinned_repo_id.write() = Some(repo_id);
}
pub fn insert_repo(&self, repo: Arc<RepoRuntime>, watcher: Option<watcher::WatcherHandle>) {
let entry = Arc::new(Mutex::new(RepoEntry {
runtime: repo.clone(),
watcher,
last_access: Instant::now(),
}));
self.repos
.write()
.insert(repo.repo_id.clone(), entry.clone());
self.legacy_repos
.write()
.insert(repo.legacy_repo_id.clone(), entry);
}
fn get_entry(&self, repo_id: &str) -> Option<Arc<Mutex<RepoEntry>>> {
if let Some(entry) = self.repos.read().get(repo_id) {
return Some(entry.clone());
}
self.legacy_repos.read().get(repo_id).cloned()
}
fn touch_entry(&self, entry: &Arc<Mutex<RepoEntry>>) {
let mut entry = entry.lock();
entry.last_access = Instant::now();
if entry.watcher.is_none() {
match watcher::spawn(entry.runtime.indexer.clone()) {
Ok(handle) => {
entry.watcher = Some(handle);
}
Err(err) => {
warn!(
target: "docdexd",
error = ?err,
repo = %entry.runtime.repo_root.display(),
"failed to restart file watcher"
);
}
}
}
}
pub fn get_by_id(&self, repo_id: &str) -> Option<Arc<RepoRuntime>> {
let entry = self.get_entry(repo_id)?;
self.touch_entry(&entry);
let runtime = entry.lock().runtime.clone();
Some(runtime)
}
pub fn repo_count(&self) -> usize {
self.repos.read().len()
}
pub fn start_housekeeping(self: &Arc<Self>) {
let manager = Arc::clone(self);
let interval = manager.cleanup_interval;
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
manager.sweep_idle(Instant::now());
}
});
}
pub(crate) fn sweep_idle(&self, now: Instant) {
let mut idle_entries: Vec<Arc<Mutex<RepoEntry>>> = Vec::new();
let mut hibernate_ids: Vec<String> = Vec::new();
let pinned_repo_id = self.pinned_repo_id.read().clone();
{
let repos = self.repos.read();
for (repo_id, entry) in repos.iter() {
let last_access = entry.lock().last_access;
let idle_for = now.duration_since(last_access);
if idle_for >= self.hibernate_timeout {
let is_pinned = pinned_repo_id
.as_ref()
.map(|pinned| pinned == repo_id)
.unwrap_or(false);
if !is_pinned {
hibernate_ids.push(repo_id.clone());
} else {
idle_entries.push(entry.clone());
}
} else if idle_for >= self.idle_timeout {
idle_entries.push(entry.clone());
}
}
}
for entry in idle_entries {
let mut entry = entry.lock();
if let Some(mut watcher) = entry.watcher.take() {
watcher.stop();
}
}
if !hibernate_ids.is_empty() {
let mut repos = self.repos.write();
let mut legacy = self.legacy_repos.write();
for repo_id in hibernate_ids {
if let Some(entry) = repos.remove(&repo_id) {
let mut entry = entry.lock();
if let Some(mut watcher) = entry.watcher.take() {
watcher.stop();
}
let legacy_id = entry.runtime.legacy_repo_id.clone();
legacy.remove(&legacy_id);
}
}
}
}
pub fn mount_repo(&self, repo_root: &Path) -> Result<RepoMount> {
let repo_root = repo_root
.canonicalize()
.unwrap_or_else(|_| repo_root.to_path_buf());
let repo_id = repo_manager::repo_fingerprint_sha256(&repo_root)?;
if let Some(existing) = self.get_entry(&repo_id) {
self.touch_entry(&existing);
return Ok(RepoMount {
repo: existing.lock().runtime.clone(),
status: RepoMountStatus::Ready,
});
}
let legacy_repo_id = repo_manager::fingerprint::legacy_repo_id_for_root(&repo_root);
if let Some(existing) = self.get_entry(&legacy_repo_id) {
self.touch_entry(&existing);
return Ok(RepoMount {
repo: existing.lock().runtime.clone(),
status: RepoMountStatus::Ready,
});
}
let config = match self.shared_state_dir.clone() {
Some(base) => {
IndexConfig::with_overrides(&repo_root, Some(base), Vec::new(), Vec::new(), true)?
}
None => IndexConfig::for_repo(&repo_root)?,
};
let has_index = config.state_dir().join("meta.json").exists();
let (indexer, read_only) = match Indexer::with_config(repo_root.clone(), config.clone()) {
Ok(indexer) => (Arc::new(indexer), false),
Err(err) if is_lock_busy_error(&err) => {
warn!(
target: "docdexd",
repo = %repo_root.display(),
error = %err,
"index writer busy; opening read-only"
);
let readonly = Indexer::with_config_read_only(repo_root.clone(), config)?;
(Arc::new(readonly), true)
}
Err(err) => return Err(err),
};
let libs_indexer = {
let libs_dir = libs::libs_state_dir_from_index_state_dir(indexer.state_dir());
libs::LibsIndexer::open_read_only(libs_dir)
.ok()
.flatten()
.map(Arc::new)
};
let memory = self.memory_embedder.clone().map(|embedder| MemoryState {
store: MemoryStore::new(indexer.state_dir()),
embedder,
repo_id: repo_id.clone(),
});
let repo = Arc::new(RepoRuntime {
repo_id: repo_id.clone(),
legacy_repo_id,
repo_root: repo_root.clone(),
indexer: indexer.clone(),
libs_indexer,
memory,
});
let watcher = if read_only {
None
} else {
match watcher::spawn(indexer.clone()) {
Ok(handle) => Some(handle),
Err(err) => {
warn!(
target: "docdexd",
error = ?err,
repo = %repo_root.display(),
"failed to start file watcher"
);
None
}
}
};
self.insert_repo(repo.clone(), watcher);
let status = if has_index {
RepoMountStatus::Ready
} else {
RepoMountStatus::Indexing
};
if status == RepoMountStatus::Indexing && !read_only {
let repo_id_clone = repo.repo_id.clone();
tokio::spawn(async move {
if let Err(err) = indexer.reindex_all().await {
warn!(repo_id = %repo_id_clone, error = ?err, "background reindex failed");
} else {
info!(repo_id = %repo_id_clone, "background reindex complete");
}
});
}
Ok(RepoMount { repo, status })
}
}
fn is_lock_busy_error(err: &anyhow::Error) -> bool {
let message = err.to_string();
message.contains("LockBusy")
|| message.contains("Failed to acquire")
|| message.contains("failed to acquire")
|| message.contains("index writer")
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn repo_manager_stops_watcher_after_idle() {
let repo = TempDir::new().expect("repo dir");
std::fs::write(repo.path().join("README.md"), "# test\n").expect("write file");
let state_dir = TempDir::new().expect("state dir");
let manager = RepoManager::new_with_timeouts(
None,
Some(state_dir.path().to_path_buf()),
Duration::from_secs(1),
Duration::from_secs(10),
Duration::from_secs(1),
);
let mount = manager.mount_repo(repo.path()).expect("mount repo");
let entry = manager
.repos
.read()
.get(&mount.repo.repo_id)
.expect("entry")
.clone();
assert!(entry.lock().watcher.is_some());
let now = Instant::now();
entry.lock().last_access = now - Duration::from_secs(2);
manager.sweep_idle(now);
assert!(entry.lock().watcher.is_none());
}
#[tokio::test]
async fn repo_manager_hibernates_idle_repo() {
let repo = TempDir::new().expect("repo dir");
std::fs::write(repo.path().join("README.md"), "# test\n").expect("write file");
let state_dir = TempDir::new().expect("state dir");
let manager = RepoManager::new_with_timeouts(
None,
Some(state_dir.path().to_path_buf()),
Duration::from_secs(1),
Duration::from_secs(3),
Duration::from_secs(1),
);
let mount = manager.mount_repo(repo.path()).expect("mount repo");
let entry = manager
.repos
.read()
.get(&mount.repo.repo_id)
.expect("entry")
.clone();
let now = Instant::now();
entry.lock().last_access = now - Duration::from_secs(4);
manager.sweep_idle(now);
assert!(manager.repos.read().is_empty());
}
}