Skip to main content
Glama
index.rs6.67 kB
//! Indexing of SPARQL endpoints, to be run when the server starts use std::sync::Arc; use std::time::Instant; use crate::config::{EndpointsConfig, SparqlEndpointConfig}; use crate::db::{SearchDB, SearchDocument}; use crate::error::AppResult; use crate::sparql_client::SparqlClient; use crate::void_schema::{SchemasMap, VoidSchema}; const TABLE_NAME: &str = "sparql_docs"; /// Initialize the database by running a SPARQL query to fetch example queries /// This function should be called when the server starts /// Returns the QueryDatabase instance for reuse across the application pub async fn init_db( mcp_conf: &EndpointsConfig, db_path: &str, force_index: bool, ) -> AppResult<(Arc<SearchDB>, SchemasMap)> { let start_time = Instant::now(); let db = SearchDB::new(db_path, TABLE_NAME).await?; // Check if table already has data to avoid reindexing if force_index { tracing::info!("🔄 Re-indexing, clearing existing database"); db.clear_table().await?; } db.init_table().await?; let mut schemas_map = db.get_schemas().await; let existing_count = db.count_docs().await?; if existing_count > 0 && !force_index { tracing::info!( "⏭️ Database contains {existing_count} documents, skipping indexing (use -f to force reindexing)" ); return Ok((Arc::new(db), schemas_map)); } // Load documents for each endpoint tracing::info!( "🗂️ Indexing {} SPARQL endpoints", mcp_conf.get_endpoints().len() ); for config in mcp_conf.get_endpoints() { schemas_map.insert( config.endpoint_url.clone(), init_endpoint(config, &db, false).await?, ); } // Load shared documents let mut docs = Vec::new(); docs.extend(get_docs_summary_endpoints(mcp_conf.get_endpoints()).await); if !docs.is_empty() { db.insert_docs(docs.clone()).await?; } db.create_vector_index().await?; let secs = start_time.elapsed().as_secs(); tracing::info!( "✅ Database initialized with {} documents in {} min {} sec", docs.len(), secs / 60, secs % 60 ); Ok((Arc::new(db), schemas_map)) } /// Initialize a SPARQL endpoint, retrieve its VoID schema and example queries, and store them in the database pub async fn init_endpoint( config: &SparqlEndpointConfig, db: &SearchDB, create_index: bool, ) -> AppResult<VoidSchema> { let void_schema = VoidSchema::from_endpoint(&config.endpoint_url).await; // Store in LanceDB for future use let _ = db.store_void_schemas(&void_schema).await; let mut docs = Vec::new(); docs.extend(get_docs_sparql_examples(config).await); docs.extend(get_docs_classes_schema(&void_schema).await); docs.extend(get_docs_endpoint_info(config).await); // Insert documents in LanceDB if !docs.is_empty() { db.insert_docs(docs.clone()).await?; if create_index { db.create_vector_index().await?; } } Ok(void_schema) } const SPARQL_QUERY_EXAMPLES: &str = r#" PREFIX sh: <http://www.w3.org/ns/shacl#> PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> SELECT DISTINCT ?comment ?query WHERE { ?sq a sh:SPARQLExecutable ; rdfs:comment ?comment ; sh:select|sh:ask|sh:construct|sh:describe ?query . } ORDER BY ?sq "#; /// Add docs for SPARQL query examples from the endpoint async fn get_docs_sparql_examples(config: &SparqlEndpointConfig) -> Vec<SearchDocument> { let mut docs = Vec::new(); let sparql_client = SparqlClient::builder().build().unwrap(); match sparql_client .query_select(&config.endpoint_url, SPARQL_QUERY_EXAMPLES) .await { Ok(res) => { tracing::info!( "{} SPARQL query examples for {} ({}):", res.results.bindings.len(), config.label, config.endpoint_url ); for binding in res.results.bindings.iter() { let comment = binding.values.get("comment").map(|v| &v.value); let query = binding.values.get("query").map(|v| &v.value); if let (Some(comment), Some(query)) = (comment, query) { docs.push(SearchDocument { question: comment.to_string(), answer: query.to_string(), endpoint_url: config.endpoint_url.clone(), doc_type: "SPARQL endpoints query examples".to_string(), vector: None, // Will be generated auto in insert_docs }); } } if res.results.bindings.is_empty() { tracing::warn!("⚠️ No SPARQL query examples found in {}", config.label); } } Err(e) => { tracing::warn!( "⚠️ Error executing examples SPARQL query for {} ({}): {e}", config.label, config.endpoint_url ); } } docs } /// Add docs for classes in a SPARQL endpoint schema async fn get_docs_classes_schema(void_schema: &VoidSchema) -> Vec<SearchDocument> { let mut docs = Vec::new(); docs.extend(void_schema.to_docs().await); docs } /// Add description docs for an endpoint async fn get_docs_endpoint_info(config: &SparqlEndpointConfig) -> Vec<SearchDocument> { let mut docs = Vec::new(); docs.push(SearchDocument { question: format!("What is the SPARQL endpoint {} about?", config.label), answer: config.description.clone(), endpoint_url: config.endpoint_url.clone(), doc_type: "General information".to_string(), vector: None, }); docs } /// Add description docs to summarize indexed endpoints async fn get_docs_summary_endpoints(configs: &[SparqlEndpointConfig]) -> Vec<SearchDocument> { let mut docs = Vec::new(); docs.push(SearchDocument { question: "Which resources are supported by this assistant?".to_string(), answer: format!( "This system helps to access the following SPARQL endpoints:\n- {}", configs .iter() .map(|endpoint| { format!( "{} ({}): {}", endpoint.label, endpoint.endpoint_url, endpoint.description ) }) .collect::<Vec<String>>() .join("\n- ") ), endpoint_url: "".to_string(), doc_type: "General information".to_string(), vector: None, }); docs }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sib-swiss/sparql-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server