pipeline.rs•1.97 kB
#![allow(dead_code, unused_variables, unused_imports)]
use crate::{
DeltaProcessorImpl, FileWatcherImpl, GraphUpdaterImpl, ProgressTrackerImpl, UpdateSchedulerImpl,
};
use codegraph_core::traits::{
DeltaProcessor, FileWatcher, GraphUpdater, ProgressTracker, UpdateScheduler,
};
use crossbeam_channel::unbounded;
use std::thread;
pub async fn run_pipeline(path: &str) -> codegraph_core::Result<()> {
let (change_tx, change_rx) = unbounded();
let (update_tx, update_rx) = unbounded();
let (delta_tx, delta_rx) = unbounded();
let file_watcher = FileWatcherImpl::new(path);
let update_scheduler = UpdateSchedulerImpl;
let delta_processor = DeltaProcessorImpl;
let graph_updater = GraphUpdaterImpl;
let progress_tracker = ProgressTrackerImpl;
let path_clone = path.to_string();
let watcher_thread = thread::spawn(move || {
file_watcher.watch(change_tx).unwrap();
});
let scheduler_thread = tokio::spawn(async move {
update_scheduler
.schedule(change_rx, update_tx)
.await
.unwrap();
});
let processor_thread = tokio::spawn(async move {
delta_processor.process(update_rx, delta_tx).await.unwrap();
});
let updater_thread = tokio::spawn(async move {
graph_updater.update(delta_rx).await.unwrap();
});
let tracker_thread = tokio::spawn(async move {
progress_tracker.track().await.unwrap();
});
watcher_thread.join().unwrap();
scheduler_thread
.await
.map_err(|e| codegraph_core::CodeGraphError::Threading(e.to_string()))?;
processor_thread
.await
.map_err(|e| codegraph_core::CodeGraphError::Threading(e.to_string()))?;
updater_thread
.await
.map_err(|e| codegraph_core::CodeGraphError::Threading(e.to_string()))?;
tracker_thread
.await
.map_err(|e| codegraph_core::CodeGraphError::Threading(e.to_string()))?;
Ok(())
}