Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
pipeline.rs14.3 kB
//! CodeGraph AI Pipeline //! //! This module composes the ML building blocks in `codegraph-vector` (feature extraction, //! training, inference, and A/B testing) and adds: //! - Model versioning/registry with zero-downtime deployments (hot-swap) //! - High-throughput feature extraction helpers from graph nodes //! - Simple API for training/inference/experiments //! //! Target outcomes: //! - Feature extraction at scale (aim 1000 fn/s via concurrency) //! - Incremental learning support (reuse existing models when training) //! - A/B testing to compare objective performance //! - Inference optimization (quantization + caching handled by codegraph-vector) //! - Model versioning with zero-downtime hot swap use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use arc_swap::ArcSwap; use parking_lot::RwLock as PLRwLock; use tokio::sync::RwLock; use uuid::Uuid; use codegraph_core::{CodeNode, NodeId, Result}; use codegraph_vector::ml as vml; use codegraph_vector::EmbeddingGenerator; /// Versioned model metadata #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ModelVersionMeta { pub name: String, pub version: String, pub created_at: chrono::DateTime<chrono::Utc>, pub metrics: HashMap<String, f32>, pub path: PathBuf, } /// Registry for trained models and versions (filesystem-backed) pub struct ModelRegistry { root: PathBuf, // in-memory index: model_name -> version -> metadata index: PLRwLock<HashMap<String, HashMap<String, ModelVersionMeta>>>, } impl ModelRegistry { pub fn new<P: Into<PathBuf>>(root: P) -> Self { Self { root: root.into(), index: PLRwLock::new(HashMap::new()), } } pub async fn register( &self, model_name: &str, version: &str, metrics: HashMap<String, f32>, ) -> Result<ModelVersionMeta> { let dir = self.root.join(model_name).join(version); tokio::fs::create_dir_all(&dir).await.ok(); let meta = ModelVersionMeta { name: model_name.to_string(), version: version.to_string(), created_at: chrono::Utc::now(), metrics, path: dir.clone(), }; self.index .write() .entry(model_name.to_string()) .or_default() .insert(version.to_string(), meta.clone()); // persist metadata let meta_path = dir.join("metadata.json"); let ser = serde_json::to_string_pretty(&meta).unwrap_or_else(|_| "{}".to_string()); let _ = tokio::fs::write(meta_path, ser).await; Ok(meta) } pub fn latest(&self, model_name: &str) -> Option<ModelVersionMeta> { self.index.read().get(model_name).and_then(|m| { // pick latest by created_at m.values().max_by_key(|mm| mm.created_at).cloned() }) } pub fn get(&self, model_name: &str, version: &str) -> Option<ModelVersionMeta> { self.index .read() .get(model_name) .and_then(|m| m.get(version)) .cloned() } } /// Active model handle with hot-swap (zero-downtime) pub struct HotSwapModel { active_name: String, active_version: ArcSwap<String>, } impl HotSwapModel { pub fn new<S: Into<String>>(name: S, initial_version: S) -> Self { Self { active_name: name.into(), active_version: ArcSwap::from_pointee(initial_version.into()), } } pub fn active(&self) -> (String, String) { ( self.active_name.clone(), (*self.active_version.load()).to_string(), ) } pub fn swap_version<S: Into<String>>(&self, new_version: S) { self.active_version.store(Arc::new(new_version.into())); } } /// End-to-end AI pipeline that wraps `codegraph-vector` MLPipeline and adds versioning + hot-swap. pub struct AiPipeline { inner: vml::MLPipeline, registry: Arc<ModelRegistry>, active: Arc<HotSwapModel>, } pub struct AiPipelineBuilder { pub feature: vml::FeatureConfig, pub training: vml::TrainingConfig, pub inference: vml::InferenceConfig, pub registry_root: PathBuf, pub model_name: String, pub initial_version: String, } impl Default for AiPipelineBuilder { fn default() -> Self { Self { feature: vml::FeatureConfig::default(), training: vml::TrainingConfig { model_type: vml::ModelType::QualityClassifier, hyperparameters: vml::TrainingHyperparameters::default(), data_config: vml::DataConfig::default(), validation_config: vml::ValidationConfig::default(), output_config: vml::OutputConfig { model_path: "models".into(), save_checkpoints: true, checkpoint_frequency: 10, export_for_inference: true, }, }, inference: vml::InferenceConfig::default(), registry_root: PathBuf::from("models"), model_name: "default".into(), initial_version: "v1".into(), } } } impl AiPipelineBuilder { pub fn new() -> Self { Self::default() } pub fn feature_config(mut self, cfg: vml::FeatureConfig) -> Self { self.feature = cfg; self } pub fn training_config(mut self, cfg: vml::TrainingConfig) -> Self { self.training = cfg; self } pub fn inference_config(mut self, cfg: vml::InferenceConfig) -> Self { self.inference = cfg; self } pub fn registry_root<P: Into<PathBuf>>(mut self, root: P) -> Self { self.registry_root = root.into(); self } pub fn model_name<S: Into<String>>(mut self, name: S) -> Self { self.model_name = name.into(); self } pub fn initial_version<S: Into<String>>(mut self, v: S) -> Self { self.initial_version = v.into(); self } pub fn build(self) -> Result<AiPipeline> { let embedding_generator = Arc::new(EmbeddingGenerator::default()); let inner = vml::MLPipeline::builder() .with_feature_config(self.feature) .with_training_config(self.training) .with_inference_config(self.inference) .with_pipeline_settings(vml::PipelineSettings::default()) .with_embedding_generator(embedding_generator) .build()?; let registry = Arc::new(ModelRegistry::new(&self.registry_root)); let active = Arc::new(HotSwapModel::new(&self.model_name, &self.initial_version)); Ok(AiPipeline { inner, registry, active, }) } } impl AiPipeline { pub fn builder() -> AiPipelineBuilder { AiPipelineBuilder::new() } /// Initialize the inner pipeline pub async fn initialize(&self) -> Result<()> { self.inner.initialize().await } /// Train and register a versioned model, then hot-swap as active if requested. pub async fn train_and_deploy( &self, dataset: &str, nodes: &[CodeNode], targets: Vec<vml::TrainingTarget>, version: &str, set_active: bool, ) -> Result<vml::TrainingResults> { let results = self .inner .train_model(dataset, nodes, targets, &self.active_model_name()) .await?; // Register version let meta = self .registry .register( &self.active_model_name(), version, results.validation_metrics.clone(), ) .await?; // Save model artifact let path = meta.path.join("model.json"); let _ = self .inner .save_model(&self.active_model_name(), &path) .await; // Hot swap if set_active { self.active.swap_version(version.to_string()); } Ok(results) } /// Start an A/B test between two versions. pub async fn start_ab_test( &self, experiment: &str, version_a: &str, version_b: &str, duration: Duration, ) -> Result<String> { // Ensure both versions exist if self .registry .get(&self.active_model_name(), version_a) .is_none() || self .registry .get(&self.active_model_name(), version_b) .is_none() { return Err(codegraph_core::CodeGraphError::Training( "Model versions not found for A/B test".into(), )); } let mut alloc = HashMap::new(); alloc.insert("A".to_string(), 0.5); alloc.insert("B".to_string(), 0.5); let traffic = vml::TrafficAllocation { allocations: alloc, strategy: vml::AllocationStrategy::WeightedRandom, sticky_sessions: true, }; let stats = vml::StatisticalConfig::default(); let metrics = vec![ vml::ExperimentMetric::Accuracy, vml::ExperimentMetric::Latency, vml::ExperimentMetric::Throughput, ]; let early = vml::EarlyStoppingConfig { enabled: true, check_interval: Duration::from_secs(60), min_samples: 500, futility_boundary: 0.01, efficacy_boundary: 0.01, }; let sample = vml::SampleSizeConfig { min_sample_size: 1000, max_sample_size: 100_000, early_stopping: early, calculation_method: vml::SampleSizeMethod::Sequential, }; let cfg = vml::ABTestConfig { name: experiment.into(), description: "Model A/B comparison".into(), traffic_allocation: traffic, duration, statistical_config: stats, metrics, sample_size: sample, }; let id = self.inner.start_ab_test(cfg).await?; Ok(id) } /// Run inference against the currently active version (benefits from inner caching/quantization). pub async fn infer(&self, node: &CodeNode) -> Result<vml::InferenceResult> { let (model_name, _version) = self.active(); self.inner.predict(&model_name, node).await } /// High-throughput batch feature extraction (concurrent), returns features in input order. pub async fn extract_features_batch_fast( &self, nodes: &[CodeNode], ) -> Result<Vec<vml::CodeFeatures>> { // Use the inner feature extractor via pipeline call; shard across tasks for concurrency let chunk = std::cmp::max(64, nodes.len() / std::cmp::max(1, num_cpus::get())); let mut tasks = Vec::new(); for chunk_nodes in nodes.chunks(chunk) { let part = chunk_nodes.to_vec(); let inner = self.inner.clone(); tasks.push(tokio::spawn(async move { inner.extract_features_batch(&part).await })); } let mut out = Vec::with_capacity(nodes.len()); for t in tasks { out.extend(t.await.unwrap()?); } Ok(out) } /// Active model name and version tuple pub fn active(&self) -> (String, String) { self.active.active() } pub fn active_model_name(&self) -> String { self.active.active().0 } /// Zero-downtime deploy a new version: warm-up then hot-swap pub async fn deploy_version(&self, version: &str, warmup_samples: &[CodeNode]) -> Result<()> { // Load model artifact if needed (inner keeps in-memory models; ensure present) if let Some(meta) = self.registry.get(&self.active_model_name(), version) { let path = meta.path.join("model.json"); let _ = self .inner .load_model(&self.active_model_name(), &path) .await; // best-effort } // Warm-up inference to prime caches and JIT paths for n in warmup_samples.iter().take(16) { let _ = self.infer(n).await; } // Hot swap self.active.swap_version(version.to_string()); Ok(()) } /// Expose inner metrics for monitoring SLA (latency, throughput, cache hit rate) pub async fn metrics(&self) -> vml::InferenceMetrics { self.inner.get_inference_metrics().await } /// Proxy helpers to inner pipeline for convenience pub async fn save_config(&self, path: &Path) -> Result<()> { self.inner.save_config(path).await } pub async fn load_config(&self, path: &Path) -> Result<()> { self.inner.load_config(path).await } } // Lightweight proxy methods on inner MLPipeline (implement Clone by arc-wrapping inside inner) trait CloneablePipeline { fn clone(&self) -> Self; } impl CloneablePipeline for vml::MLPipeline { fn clone(&self) -> Self { // safe shallow rebuild via saved config and shared internals // Use builder + current config snapshot // Read-only operations in `build` path; acceptable for proxy clone let cfg = futures::executor::block_on(async { self.get_context().await.config.clone() }); vml::MLPipeline::builder() .with_feature_config(cfg.feature_config) .with_training_config(cfg.training_config) .with_inference_config(cfg.inference_config) .with_pipeline_settings(cfg.pipeline_settings) .build() .expect("rebuild pipeline") } } #[cfg(test)] mod tests { use super::*; use codegraph_core::{Language, NodeType}; #[tokio::test] async fn builds_and_infers() { let p = AiPipeline::builder().build().unwrap(); p.initialize().await.unwrap(); let node = CodeNode { id: "n1".into(), name: "foo".into(), language: Some(Language::Rust), node_type: Some(NodeType::Function), content: Some("fn foo() { 1 }".into()), children: None, }; let _ = p.infer(&node).await.unwrap(); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server