Skip to main content
Glama

Convex MCP server

Official
by get-convex
memory_index.rs7.15 kB
use std::{ collections::BTreeSet, mem, }; use common::types::{ Timestamp, WriteTimestamp, }; use imbl::{ OrdMap, OrdSet, Vector, }; use qdrant_segment::spaces::{ metric::Metric, simple::CosineMetric, }; use value::InternalId; use crate::{ qdrant_index::{ NormalizedQdrantDocument, QdrantDocument, }, query::{ CompiledVectorFilter, CompiledVectorSearch, VectorSearchQueryResult, }, }; #[derive(Clone)] pub struct MemoryVectorIndex { min_ts: WriteTimestamp, max_ts: WriteTimestamp, documents: OrdMap<InternalId, Revision>, documents_size: usize, tombstones: Vector<(WriteTimestamp, NormalizedQdrantDocument)>, tombstones_size: usize, transactions: OrdSet<WriteTimestamp>, } impl MemoryVectorIndex { pub fn new(base_ts: WriteTimestamp) -> Self { Self { min_ts: base_ts, max_ts: base_ts, documents: OrdMap::new(), documents_size: 0, tombstones: Vector::new(), tombstones_size: 0, transactions: OrdSet::new(), } } pub fn size(&self) -> usize { let mut size = 0; size += self.documents.len() * mem::size_of::<(InternalId, Revision)>(); size += self.documents_size; size += self.tombstones.len() * mem::size_of::<(WriteTimestamp, NormalizedQdrantDocument)>(); size += self.tombstones_size; size += self.transactions.len() * mem::size_of::<WriteTimestamp>(); size } pub fn min_ts(&self) -> WriteTimestamp { self.min_ts } pub fn num_transactions(&self) -> usize { self.transactions.len() } pub fn update( &mut self, id: InternalId, ts: WriteTimestamp, old_value: Option<QdrantDocument>, new_value: Option<QdrantDocument>, ) -> anyhow::Result<()> { anyhow::ensure!( self.min_ts <= ts, "Expected min_ts:{:?} <= ts:{ts:?} ", self.min_ts ); anyhow::ensure!( self.max_ts <= ts, "Expected max_ts:{:?} <= ts:{ts:?} ", self.max_ts ); self.max_ts = ts; { if !self.transactions.contains(&ts) { if let Some(prev_ts) = self.transactions.get_max() { anyhow::ensure!(*prev_ts < ts); } self.transactions.insert(ts); } } if let Some(old_value) = old_value { let normalized = NormalizedQdrantDocument::from(old_value); self.tombstones_size += normalized.size(); self.tombstones.push_back((ts, normalized)); } if self.documents.contains_key(&id) { let old_value = self.documents.remove(&id).unwrap(); self.documents_size -= old_value.document.size(); } if let Some(new_value) = new_value { let normalized = NormalizedQdrantDocument::from(new_value); self.documents_size += normalized.size(); let revision = Revision { ts, document: normalized, }; self.documents.insert(id, revision); } Ok(()) } pub fn truncate(&mut self, new_min_ts: Timestamp) -> anyhow::Result<()> { let new_min_ts = WriteTimestamp::Committed(new_min_ts); anyhow::ensure!( new_min_ts >= self.min_ts, "Expected new_min_ts:{new_min_ts:?} >= min_ts:{:?} ", self.min_ts ); let to_remove = self .documents .iter() .filter(|(_, document)| document.ts < new_min_ts) .map(|(id, _)| *id) .collect::<Vec<_>>(); for id in to_remove { let revision = self.documents.remove(&id).unwrap(); self.documents_size -= revision.document.size(); } while let Some((ts, _)) = self.tombstones.front() && *ts < new_min_ts { let (_, tombstone) = self.tombstones.pop_front().unwrap(); self.tombstones_size -= tombstone.size(); } while let Some(ts) = self.transactions.get_min() && *ts < new_min_ts { let ts = *ts; self.transactions.remove(&ts); } self.min_ts = new_min_ts; self.min_ts = new_min_ts; self.max_ts = self.max_ts.max(new_min_ts); Ok(()) } pub fn updated_matches( &self, snapshot_ts: Timestamp, query: &CompiledVectorSearch, ) -> anyhow::Result<BTreeSet<InternalId>> { anyhow::ensure!( self.min_ts <= WriteTimestamp::Committed(snapshot_ts.succ()?), "Timestamps are out of order! min ts:{:?} snapshot_ts:{snapshot_ts}", self.min_ts, ); let mut updated = BTreeSet::new(); for (ts, document) in &self.tombstones { if *ts <= WriteTimestamp::Committed(snapshot_ts) { continue; } if document.matches(query) { updated.insert(document.internal_id); } } Ok(updated) } pub fn query( &self, snapshot_ts: Timestamp, query: &CompiledVectorSearch, ) -> anyhow::Result<Vec<VectorSearchQueryResult>> { anyhow::ensure!( self.min_ts <= WriteTimestamp::Committed(snapshot_ts.succ()?), "Timestamps are out of order! min ts:{:?} snapshot_ts:{snapshot_ts}", self.min_ts, ); let query_vector = Vec::from(query.vector.clone()); let query_vector = CosineMetric::preprocess(query_vector); let mut candidates = vec![]; for (&id, revision) in &self.documents { if revision.document.matches(query) { let distance = CosineMetric::similarity(&query_vector, &revision.document.vector); candidates.push(VectorSearchQueryResult { score: distance, id, ts: revision.ts, }); } } candidates.sort_by(|a, b| a.cmp(b).reverse()); candidates.truncate(query.limit as usize); Ok(candidates) } } #[derive(Clone)] pub struct Revision { ts: WriteTimestamp, document: NormalizedQdrantDocument, } impl NormalizedQdrantDocument { fn matches(&self, query: &CompiledVectorSearch) -> bool { if query.filter_conditions.is_empty() { return true; } for (field_path, filter_condition) in &query.filter_conditions { let Some(value) = self.filter_fields.get(field_path) else { return false; }; let condition_result = match filter_condition { CompiledVectorFilter::Eq(ref term) => term == value, CompiledVectorFilter::In(ref terms) => terms.iter().any(|t| t == value), }; if condition_result { return true; } } false } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server