//! Indexing of SPARQL endpoints, to be run when the server starts
use std::sync::Arc;
use std::time::Instant;
use crate::config::{EndpointsConfig, SparqlEndpointConfig};
use crate::db::{SearchDB, SearchDocument};
use crate::error::AppResult;
use crate::sparql_client::SparqlClient;
use crate::void_schema::{SchemasMap, VoidSchema};
const TABLE_NAME: &str = "sparql_docs";
/// Initialize the database by running a SPARQL query to fetch example queries
/// This function should be called when the server starts
/// Returns the QueryDatabase instance for reuse across the application
pub async fn init_db(
mcp_conf: &EndpointsConfig,
db_path: &str,
force_index: bool,
) -> AppResult<(Arc<SearchDB>, SchemasMap)> {
let start_time = Instant::now();
let db = SearchDB::new(db_path, TABLE_NAME).await?;
// Check if table already has data to avoid reindexing
if force_index {
tracing::info!("🔄 Re-indexing, clearing existing database");
db.clear_table().await?;
}
db.init_table().await?;
let mut schemas_map = db.get_schemas().await;
let existing_count = db.count_docs().await?;
if existing_count > 0 && !force_index {
tracing::info!(
"⏭️ Database contains {existing_count} documents, skipping indexing (use -f to force reindexing)"
);
return Ok((Arc::new(db), schemas_map));
}
// Load documents for each endpoint
tracing::info!(
"🗂️ Indexing {} SPARQL endpoints",
mcp_conf.get_endpoints().len()
);
for config in mcp_conf.get_endpoints() {
schemas_map.insert(
config.endpoint_url.clone(),
init_endpoint(config, &db, false).await?,
);
}
// Load shared documents
let mut docs = Vec::new();
docs.extend(get_docs_summary_endpoints(mcp_conf.get_endpoints()).await);
if !docs.is_empty() {
db.insert_docs(docs.clone()).await?;
}
db.create_vector_index().await?;
let secs = start_time.elapsed().as_secs();
tracing::info!(
"✅ Database initialized with {} documents in {} min {} sec",
docs.len(),
secs / 60,
secs % 60
);
Ok((Arc::new(db), schemas_map))
}
/// Initialize a SPARQL endpoint, retrieve its VoID schema and example queries, and store them in the database
pub async fn init_endpoint(
config: &SparqlEndpointConfig,
db: &SearchDB,
create_index: bool,
) -> AppResult<VoidSchema> {
let void_schema = VoidSchema::from_endpoint(&config.endpoint_url).await;
// Store in LanceDB for future use
let _ = db.store_void_schemas(&void_schema).await;
let mut docs = Vec::new();
docs.extend(get_docs_sparql_examples(config).await);
docs.extend(get_docs_classes_schema(&void_schema).await);
docs.extend(get_docs_endpoint_info(config).await);
// Insert documents in LanceDB
if !docs.is_empty() {
db.insert_docs(docs.clone()).await?;
if create_index {
db.create_vector_index().await?;
}
}
Ok(void_schema)
}
const SPARQL_QUERY_EXAMPLES: &str = r#"
PREFIX sh: <http://www.w3.org/ns/shacl#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT DISTINCT ?comment ?query
WHERE {
?sq a sh:SPARQLExecutable ;
rdfs:comment ?comment ;
sh:select|sh:ask|sh:construct|sh:describe ?query .
} ORDER BY ?sq
"#;
/// Add docs for SPARQL query examples from the endpoint
async fn get_docs_sparql_examples(config: &SparqlEndpointConfig) -> Vec<SearchDocument> {
let mut docs = Vec::new();
let sparql_client = SparqlClient::builder().build().unwrap();
match sparql_client
.query_select(&config.endpoint_url, SPARQL_QUERY_EXAMPLES)
.await
{
Ok(res) => {
tracing::info!(
"{} SPARQL query examples for {} ({}):",
res.results.bindings.len(),
config.label,
config.endpoint_url
);
for binding in res.results.bindings.iter() {
let comment = binding.values.get("comment").map(|v| &v.value);
let query = binding.values.get("query").map(|v| &v.value);
if let (Some(comment), Some(query)) = (comment, query) {
docs.push(SearchDocument {
question: comment.to_string(),
answer: query.to_string(),
endpoint_url: config.endpoint_url.clone(),
doc_type: "SPARQL endpoints query examples".to_string(),
vector: None, // Will be generated auto in insert_docs
});
}
}
if res.results.bindings.is_empty() {
tracing::warn!("⚠️ No SPARQL query examples found in {}", config.label);
}
}
Err(e) => {
tracing::warn!(
"⚠️ Error executing examples SPARQL query for {} ({}): {e}",
config.label,
config.endpoint_url
);
}
}
docs
}
/// Add docs for classes in a SPARQL endpoint schema
async fn get_docs_classes_schema(void_schema: &VoidSchema) -> Vec<SearchDocument> {
let mut docs = Vec::new();
docs.extend(void_schema.to_docs().await);
docs
}
/// Add description docs for an endpoint
async fn get_docs_endpoint_info(config: &SparqlEndpointConfig) -> Vec<SearchDocument> {
let mut docs = Vec::new();
docs.push(SearchDocument {
question: format!("What is the SPARQL endpoint {} about?", config.label),
answer: config.description.clone(),
endpoint_url: config.endpoint_url.clone(),
doc_type: "General information".to_string(),
vector: None,
});
docs
}
/// Add description docs to summarize indexed endpoints
async fn get_docs_summary_endpoints(configs: &[SparqlEndpointConfig]) -> Vec<SearchDocument> {
let mut docs = Vec::new();
docs.push(SearchDocument {
question: "Which resources are supported by this assistant?".to_string(),
answer: format!(
"This system helps to access the following SPARQL endpoints:\n- {}",
configs
.iter()
.map(|endpoint| {
format!(
"{} ({}): {}",
endpoint.label, endpoint.endpoint_url, endpoint.description
)
})
.collect::<Vec<String>>()
.join("\n- ")
),
endpoint_url: "".to_string(),
doc_type: "General information".to_string(),
vector: None,
});
docs
}