Skip to main content
Glama

Convex MCP server

Official
by get-convex
qdrant_index.rs25.9 kB
use std::{ collections::{ BTreeMap, BTreeSet, }, fs, mem, ops::Deref, path::{ Path, PathBuf, }, str::FromStr, sync::{ atomic::AtomicBool, Arc, }, time::{ Duration, Instant, }, }; use atomic_refcell::AtomicRefCell; use common::{ bootstrap_model::index::vector_index::VectorIndexSpec, document::ResolvedDocument, knobs::VECTOR_INDEX_THREADS, persistence::DocumentStream, query::search_value_to_bytes, types::{ Timestamp, WriteTimestamp, }, }; use errors::ErrorMetadata; use futures::TryStreamExt; use pb::searchlight as proto; use qdrant_common::types::{ DetailsLevel, TelemetryDetail, }; use qdrant_segment::{ data_types::{ named_vectors::NamedVectors, vectors::{ VectorElementType, VectorRef, }, }, entry::entry_point::SegmentEntry, json_path::JsonPath, segment::Segment, spaces::{ metric::Metric, simple::CosineMetric, }, types::{ AnyVariants, Condition, ExtendedPointId, FieldCondition, Filter, Match, MatchAny, MatchValue, PayloadFieldSchema, PayloadSchemaType, PayloadSelector, PayloadSelectorInclude, PointIdType, SearchParams, ValueVariants, WithPayload, WithVector, VECTOR_ELEMENT_SIZE, }, }; use serde_json::Value as JsonValue; use tempfile::TempDir; use uuid::Uuid; use value::{ base64, ConvexValue, FieldPath, InternalDocumentId, InternalId, ResolvedDocumentId, }; use crate::{ id_tracker::VectorMemoryIdTracker, incorrect_vector_filter_field_error, metrics::{ self, }, qdrant_segments::{ build_disk_segment, create_mutable_segment, segment_config, snapshot_segment, VectorDiskSegmentValues, DEFAULT_VECTOR_NAME, }, query::{ CompiledVectorFilter, CompiledVectorSearch, InternalVectorSearch, VectorSearchExpression, }, vector_dimensions_mismatch_error, IndexedVector, VectorSearchQueryResult, DEFAULT_VECTOR_LIMIT, MAX_FILTER_LENGTH, MAX_VECTOR_RESULTS, }; const TIMESTAMP_FIELD: &str = "_ts"; #[derive(Clone, Debug)] pub struct QdrantSchema { dimension: usize, vector_field: FieldPath, filter_fields: BTreeSet<FieldPath>, } #[derive(Clone, Copy, Debug)] pub enum QdrantVectorIndexType { Plain, HNSW, } impl QdrantSchema { pub fn new(index_config: &VectorIndexSpec) -> Self { Self { dimension: u32::from(index_config.dimensions) as usize, vector_field: index_config.vector_field.clone(), filter_fields: index_config.filter_fields.clone(), } } pub fn index(&self, document: &ResolvedDocument) -> Option<QdrantDocument> { let object = document.value(); let Some(ConvexValue::Array(ref array)) = object.get_path(&self.vector_field) else { return None; }; let mut vector = Vec::with_capacity(self.dimension); if array.len() != self.dimension { tracing::debug!( "Ignoring mismatched vector length, expected: {}, actual: {}", self.dimension, array.len(), ); return None; } for value in array.iter() { let ConvexValue::Float64(f) = value else { return None; }; vector.push(*f as f32); } let vector = IndexedVector::try_from(vector).ok()?; let document = QdrantDocument { internal_id: document.internal_id(), vector, filter_fields: self .filter_fields .iter() .map(|f| (f.clone(), search_value_to_bytes(object.get_path(f)))) .collect(), }; Some(document) } pub fn estimate_vector_size(&self) -> usize { self.dimension * VECTOR_ELEMENT_SIZE } pub fn compile(&self, query: InternalVectorSearch) -> anyhow::Result<CompiledVectorSearch> { let timer = metrics::compile_timer(); let index_name = query.printable_index_name()?; let query_vector = IndexedVector::try_from(query.vector)?; let query_limit = query.limit.unwrap_or(DEFAULT_VECTOR_LIMIT); anyhow::ensure!( query_limit as usize <= MAX_VECTOR_RESULTS, ErrorMetadata::bad_request( "VectorLimitTooLargeError", format!( "Vector queries can fetch at most {} results, requested {}.", MAX_VECTOR_RESULTS, query_limit as usize, ) ) ); let mut filter_conditions = BTreeMap::new(); // Each equality expression contributes to this, so an `In` with N elements // increments this by N let mut filter_length = 0; for expresion in query.expressions { match expresion { VectorSearchExpression::Eq(field_path, value) => { if !self.filter_fields.contains(&field_path) { anyhow::bail!(incorrect_vector_filter_field_error( &index_name, &field_path )) } let value_bytes = search_value_to_bytes(value.as_ref()); if filter_conditions.contains_key(&field_path) { anyhow::bail!("Found multiple filters for the same field?") } filter_conditions.insert(field_path, CompiledVectorFilter::Eq(value_bytes)); filter_length += 1; }, VectorSearchExpression::In(field_path, values) => { if !self.filter_fields.contains(&field_path) { anyhow::bail!(incorrect_vector_filter_field_error( &index_name, &field_path )) } let values_bytes: Vec<_> = values .into_iter() .map(|v| search_value_to_bytes(v.as_ref())) .collect(); if filter_conditions.contains_key(&field_path) { anyhow::bail!("Found multiple filters for the same field?") } filter_length += values_bytes.len(); filter_conditions.insert(field_path, CompiledVectorFilter::In(values_bytes)); }, } } anyhow::ensure!( filter_length <= MAX_FILTER_LENGTH, ErrorMetadata::bad_request( "TooManyElementsInVectorQueryError", format!( "Vector query against {index_name} has too many conditions. Max: \ {MAX_FILTER_LENGTH} Actual: {filter_length}" ) ) ); anyhow::ensure!( query_vector.len() == self.dimension, vector_dimensions_mismatch_error(query_vector.len() as u32, self.dimension as u32) ); let result = CompiledVectorSearch { vector: query_vector, limit: query_limit, filter_conditions, }; metrics::log_compiled_query(&result); timer.finish(); Ok(result) } pub fn search( &self, segment: &Segment, query: CompiledVectorSearch, overfetch_delta: u32, slow_vector_query_threshold_millis: u64, require_exact: bool, ) -> anyhow::Result<Vec<VectorSearchQueryResult>> { let qdrant_conditions = query .filter_conditions .iter() .map(|(field_path, condition)| { let field_condition = FieldCondition::new_match( encode_user_field_path(field_path)?, qdrant_filter_condition(condition), ); Ok(Some(Condition::Field(field_condition))) }) .collect::<anyhow::Result<Option<Vec<_>>>>()?; let qdrant_filter = Filter { should: qdrant_conditions, min_should: None, must: None, must_not: None, }; let search_params = SearchParams { hnsw_ef: None, exact: require_exact, quantization: None, indexed_only: false, }; let payload_selector = PayloadSelectorInclude { include: vec![json_path_from_str(TIMESTAMP_FIELD)?], }; let start = Instant::now(); let qdrant_results = segment.search( DEFAULT_VECTOR_NAME, &query.vector.into(), &WithPayload { enable: true, payload_selector: Some(PayloadSelector::Include(payload_selector)), }, &WithVector::Bool(false), Some(&qdrant_filter), (query.limit + overfetch_delta) as usize, Some(&search_params), &AtomicBool::new(false), )?; let duration = Instant::now().duration_since(start); if duration > Duration::from_millis(slow_vector_query_threshold_millis) { let detail = TelemetryDetail { level: DetailsLevel::Level2, histograms: true, }; tracing::warn!( "Slow qdrant query, duration: {}ms, segment telemetry: {:?}", duration.as_millis(), segment.get_telemetry_data(detail), ) } let mut results = Vec::with_capacity(qdrant_results.len()); for qdrant_result in qdrant_results { let ExtendedPointId::Uuid(ref uuid) = qdrant_result.id else { anyhow::bail!("Received non-UUID ID from qdrant: {qdrant_result:?}"); }; let internal_id = InternalId::from(*uuid.as_bytes()); let Some(ref payload) = qdrant_result.payload else { anyhow::bail!("Received no payload from qdrant: {qdrant_result:?}"); }; let Some(JsonValue::String(ts_b64)) = payload.0.get(TIMESTAMP_FIELD) else { anyhow::bail!("Invalid timestamp from qdrant: {qdrant_result:?}"); }; let ts_bytes = base64::decode_urlsafe(ts_b64)?; let ts = u64::from_le_bytes(ts_bytes[..].try_into()?); let result = VectorSearchQueryResult { score: qdrant_result.score, id: internal_id, ts: WriteTimestamp::Committed(ts.try_into()?), }; results.push(result); } Ok(results) } pub async fn build_disk_index<T: PreviousVectorSegmentsHack>( &self, index_path: &Path, revision_stream: DocumentStream<'_>, hnsw_threshold_bytes: usize, previous_segments: &mut T, ) -> anyhow::Result<Option<VectorDiskSegmentValues>> { let tmpdir = TempDir::new()?; let memory_timer = metrics::qdrant_segment_memory_build_timer(); // With HNSW, we need to construct a temporary index, then do a one-time // non-appending write to the final disk index. Use a temp dir for the // temporary index. Since we don't know which index type we're going to use // upfront, always set up the more complex directory. let memory_dir: PathBuf = tmpdir.path().join("memory"); let id_tracker = Arc::new(AtomicRefCell::new(VectorMemoryIdTracker::new())); let mutable_config = segment_config(self.dimension, true, *VECTOR_INDEX_THREADS); let mut memory_segment = create_mutable_segment( &memory_dir, id_tracker.clone(), self.dimension, mutable_config, )?; let op_num = 1; futures::pin_mut!(revision_stream); while let Some(entry) = revision_stream.try_next().await? { let point_id = QdrantExternalId::try_from(&entry.id)?; if let Some(document) = entry.value { let Some(qdrant_doc) = self.index(&document) else { tracing::trace!("Skipping an invalid doc: {:?}", document); continue; }; memory_segment.upsert_point(op_num, *point_id, qdrant_doc.qdrant_vector())?; let payload = qdrant_doc.encode_payload(entry.ts)?; memory_segment.set_payload(op_num, *point_id, &payload.into(), &None)?; } else { // If the document was inserted and then deleted in this batch, // then we might need to remove a vector we just // added to this segment. if memory_segment.delete_point(op_num, *point_id)? { tracing::trace!("Delete a point"); } else { tracing::trace!("Point was neither added nor deleted!"); } } // Updates or deletes of documents need to clear out old versions of those docs // in previous segments. We could theoretically skip inserts here, // but we can't tell which documents are strictly new vs which are // updates based just on the stream. // Similarly we could theoretically use timestamps to determine exactly which // segment should have a given vector so long as merging retains // timestamp order. However to find the tombstoned document's // timestamps, we'd have to load previous revisions which // would require extra queries and logic, so instead we just try mutating each // segment in memory. This removes an opportunity to verify // consistency, but it's faster and simpler. previous_segments.maybe_delete_qdrant(*point_id)?; } // We encode all of our index values as strings. let field_schema = Some(&PayloadFieldSchema::FieldType(PayloadSchemaType::Keyword)); for field in self.filter_fields.iter() { memory_segment.create_field_index( op_num, &encode_user_field_path(field)?, field_schema, )?; } memory_timer.finish(); // Ideally we'd not even have created the memory segment, but because vectors // can be added and deleted from the segment in the same pass, it's not // trivial to tell up front if we're going to produce a useful segment or not. if memory_segment.available_point_count() == 0 { tracing::debug!("Skipping an empty vector index for {index_path:?}",); return Ok(None); } else { tracing::debug!( "Building segment with total vectors {}", memory_segment.total_point_count() ); } // Use total_point_count to include deleted vectors in this estimate. Qdrant // used to, but no longer actually drops the vector data so they do take // up space (at least until we rebuild the segment via compaction or to add an // HNSW index). let estimated_size_bytes = memory_segment.total_point_count() * self.dimension * VECTOR_ELEMENT_SIZE; let estmated_size_kb = estimated_size_bytes / 1024; let index_type = if estmated_size_kb >= hnsw_threshold_bytes { QdrantVectorIndexType::HNSW } else { QdrantVectorIndexType::Plain }; let disk_timer = metrics::qdrant_segment_disk_build_timer(index_type); let result = match index_type { QdrantVectorIndexType::Plain => { let num_vectors = memory_segment.total_point_count() as u32; let num_deleted = memory_segment.deleted_point_count() as u32; anyhow::ensure!( num_deleted + memory_segment.available_point_count() as u32 == num_vectors ); Ok::<VectorDiskSegmentValues, anyhow::Error>(VectorDiskSegmentValues { paths: snapshot_segment( &id_tracker, &memory_segment, tmpdir.path(), index_path, )?, num_vectors, num_deleted, }) }, QdrantVectorIndexType::HNSW => { let indexing_path = tmpdir.path().join("indexing"); fs::create_dir_all(&indexing_path)?; let disk_path = index_path.join("disk"); fs::create_dir_all(&disk_path)?; let disk_config = segment_config(self.dimension, false, *VECTOR_INDEX_THREADS); build_disk_segment(&memory_segment, &indexing_path, &disk_path, disk_config) }, }?; disk_timer.finish(); tracing::debug!("Built a {index_type:?} vector index for {index_path:?}",); Ok(Some(result)) } } #[derive(Clone, Debug)] pub struct QdrantDocument { pub internal_id: InternalId, pub vector: IndexedVector, pub filter_fields: BTreeMap<FieldPath, Vec<u8>>, } impl QdrantDocument { pub fn qdrant_vector(&self) -> NamedVectors<'_> { NamedVectors::from_ref(DEFAULT_VECTOR_NAME, VectorRef::Dense(&self.vector[..])) } pub fn encode_payload(&self, ts: Timestamp) -> anyhow::Result<JsonValue> { let mut map = serde_json::Map::new(); for (field_path, field_value) in &self.filter_fields { let mut current = &mut map; // The path should consist of nested json objects. for i in 0..field_path.fields().len() - 1 { let field: String = field_path.fields()[i].clone().into(); let JsonValue::Object(inner) = current .entry(field) .or_insert_with(|| JsonValue::Object(serde_json::Map::new())) else { // This means one filter field path is a prefix of another. We should // prevent the developer from defining such index. Throw a system error here. anyhow::bail!("Conflicting field path: {:?} {:?}", field_path, map); }; current = inner; } current.insert( field_path.last().clone().into(), JsonValue::String(base64::encode_urlsafe(&field_value[..])), ); } map.insert( TIMESTAMP_FIELD.to_string(), JsonValue::String(base64::encode_urlsafe(&u64::from(ts).to_le_bytes()[..])), ); Ok(map.into()) } /// Estimates size of `QdrantDocument` in bytes pub fn estimate_size(&self) -> usize { self.vector.len() * mem::size_of::<VectorElementType>() } } #[cfg(any(test, feature = "testing"))] pub fn cosine_similarity(v1: &[f32], v2: &[f32]) -> f32 { let v1 = CosineMetric::preprocess(v1.to_vec()); let v2 = CosineMetric::preprocess(v2.to_vec()); CosineMetric::similarity(&v1, &v2) } // NB: For cosine similarity, we need to normalize vectors before indexing them. #[derive(Clone, Debug)] pub struct NormalizedQdrantDocument { pub internal_id: InternalId, pub vector: Vec<f32>, pub filter_fields: BTreeMap<FieldPath, Vec<u8>>, } impl From<QdrantDocument> for NormalizedQdrantDocument { fn from(value: QdrantDocument) -> Self { let vector = Vec::from(value.vector); let vector = CosineMetric::preprocess(vector); Self { internal_id: value.internal_id, vector, filter_fields: value.filter_fields, } } } impl NormalizedQdrantDocument { pub fn size(&self) -> usize { let mut size = 0; size += self.vector.len() * mem::size_of::<f32>(); size += self.filter_fields.len() * mem::size_of::<(FieldPath, Vec<u8>)>(); for (field_path, maybe_value) in &self.filter_fields { size += field_path.fields().iter().map(|f| f.len()).sum::<usize>(); size += maybe_value.len(); } size } } fn encode_user_field_path(field_path: &FieldPath) -> anyhow::Result<JsonPath> { let key = String::from(field_path.clone()); json_path_from_str(key.as_str()) } fn qdrant_filter_condition(condition: &CompiledVectorFilter) -> Match { match condition { CompiledVectorFilter::Eq(value) => { let value_b64 = base64::encode_urlsafe(&value[..]); let match_value = MatchValue { value: ValueVariants::Keyword(value_b64), }; Match::Value(match_value) }, CompiledVectorFilter::In(values) => { let values_b64 = values .iter() .map(|v| base64::encode_urlsafe(&v[..])) .collect(); let match_value = MatchAny { any: AnyVariants::Keywords(values_b64), }; Match::Any(match_value) }, } } impl From<QdrantSchema> for proto::VectorIndexConfig { fn from(value: QdrantSchema) -> Self { proto::VectorIndexConfig { dimension: value.dimension as u32, vector_field_path: Some(value.vector_field.into()), filter_fields: value.filter_fields.into_iter().map(|f| f.into()).collect(), } } } impl TryFrom<proto::VectorIndexConfig> for QdrantSchema { type Error = anyhow::Error; fn try_from(value: proto::VectorIndexConfig) -> Result<Self, Self::Error> { let vector_field = value .vector_field_path .ok_or_else(|| anyhow::anyhow!("Missing vector field path in VectorIndexConfigProto"))? .try_into()?; let filter_fields = value .filter_fields .into_iter() .map(|f| f.try_into()) .collect::<Result<_, _>>()?; Ok(QdrantSchema { dimension: value.dimension as usize, vector_field, filter_fields, }) } } /// A workaround for circular dependencies between database /// (vector_index_worker) and qdrant. pub trait PreviousVectorSegmentsHack { /// Marks the id deleted, returning a failure of an invariant was violated /// (this should never happen!) fn maybe_delete_qdrant(&mut self, external_id: ExtendedPointId) -> anyhow::Result<()>; } pub struct QdrantExternalId(PointIdType); impl TryFrom<InternalId> for QdrantExternalId { type Error = anyhow::Error; fn try_from(value: InternalId) -> Result<Self, Self::Error> { let uuid = Uuid::from_bytes(value[..].try_into()?); Ok(Self(PointIdType::Uuid(uuid))) } } impl TryFrom<&InternalDocumentId> for QdrantExternalId { type Error = anyhow::Error; fn try_from(value: &InternalDocumentId) -> Result<Self, Self::Error> { let uuid = Uuid::from_bytes(value.internal_id()[..].try_into()?); Ok(Self(PointIdType::Uuid(uuid))) } } impl TryFrom<ResolvedDocumentId> for QdrantExternalId { type Error = anyhow::Error; fn try_from(value: ResolvedDocumentId) -> Result<Self, Self::Error> { let uuid = Uuid::from_bytes(value.internal_id()[..].try_into()?); Ok(Self(PointIdType::Uuid(uuid))) } } impl Deref for QdrantExternalId { type Target = PointIdType; fn deref(&self) -> &Self::Target { &self.0 } } fn json_path_from_str(s: &str) -> anyhow::Result<JsonPath> { match JsonPath::from_str(s) { Ok(path) => Ok(path), Err(()) => { anyhow::bail!("Unable to parse to JsonPath: {s}"); }, } } #[cfg(test)] mod tests { use common::types::Timestamp; use maplit::btreemap; use rand::Rng; use serde_json::json; use value::InternalId; use crate::QdrantDocument; #[test] fn test_encode_payload() -> anyhow::Result<()> { let mut rng = rand::rng(); let d = 1536; let document = QdrantDocument { internal_id: InternalId(1u128.to_le_bytes()), vector: (0..d) .map(|_| rng.random()) .collect::<Vec<_>>() .try_into() .unwrap(), filter_fields: btreemap!(), }; let payload = document.encode_payload(Timestamp::MIN)?; assert_eq!(payload, json!({ "_ts": "AAAAAAAAAAA"})); let document = QdrantDocument { internal_id: InternalId(1u128.to_le_bytes()), vector: (0..d) .map(|_| rng.random()) .collect::<Vec<_>>() .try_into() .unwrap(), filter_fields: btreemap!( "abc".parse()? => vec![97], "def.ghi".parse()? => vec![98], "def.xyz".parse()? => vec![99], ), }; let payload = document.encode_payload(Timestamp::MIN)?; assert_eq!( payload, json!({ "abc": "YQ", "def": { "ghi": "Yg", "xyz": "Yw"}, "_ts": "AAAAAAAAAAA"}) ); let document = QdrantDocument { internal_id: InternalId(1u128.to_le_bytes()), vector: (0..d) .map(|_| rng.random()) .collect::<Vec<_>>() .try_into() .unwrap(), filter_fields: btreemap!( "zzz".parse()? => vec![97], ), }; let payload = document.encode_payload(Timestamp::MIN)?; assert_eq!(payload, json!({ "zzz": "YQ", "_ts": "AAAAAAAAAAA"})); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server