"""
Semantic search functionality for Zotero MCP.
This module provides semantic search capabilities using PostgreSQL with pg-vector
and OpenAI/Ollama embeddings for vector-based similarity search over research libraries.
"""
import asyncio
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
import logging
from .logging_config import configure_logging_from_config, ensure_logging_configured
from .vector_client import PostgreSQLVectorClient, create_vector_client, EmbeddingItem
from .embedding_service import create_embedding_provider, detect_model_change, EmbeddingProvider
from .chunking_utils import create_document_chunks, extract_searchable_text, calculate_content_hash, validate_chunking_config
from .db_schema import DatabaseManager, create_database_manager
from .client import get_zotero_client
from .utils import format_creators
logger = logging.getLogger(__name__)
class ZoteroSemanticSearch:
"""Semantic search interface for Zotero libraries using PostgreSQL with pg-vector."""
def __init__(self,
vector_client: Optional[PostgreSQLVectorClient] = None,
embedding_provider: Optional[EmbeddingProvider] = None,
config_path: Optional[str] = None):
"""
Initialize semantic search.
Args:
vector_client: Optional PostgreSQLVectorClient instance
embedding_provider: Optional EmbeddingProvider instance
config_path: Path to configuration file
"""
self.config_path = config_path
self.config = self._load_config()
# Configure logging as early as possible
try:
configure_logging_from_config(config_path=self.config_path, config=self.config)
except Exception as e:
# Fallback to ensure some logging is configured
ensure_logging_configured(self.config_path)
logger.warning(f"Error configuring logging from config, using fallback: {e}")
# Initialize components
self.vector_client = vector_client or create_vector_client(self.config.get("database", {}))
self.embedding_provider = embedding_provider
self.zotero_client = get_zotero_client()
# Initialize embedding provider first if not provided
if not self.embedding_provider:
self._initialize_embedding_provider()
# Get embedding dimension for database manager
embedding_dimension = None
if self.embedding_provider:
try:
embedding_dimension = self.embedding_provider.get_embedding_dimension()
except Exception as e:
logger.warning(f"Could not get embedding dimension: {e}")
self.db_manager = create_database_manager(
self.config.get("database", {}),
embedding_dimension=embedding_dimension
)
# Validate chunking configuration
self.chunking_config = validate_chunking_config(self.config.get("chunking", {}))
# Load update configuration
self.update_config = self.config.get("semantic_search", {}).get("update_config", {})
self._ensure_default_update_config()
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from file with defaults."""
default_config = {
"database": {
"host": "192.168.1.173",
"port": 5432,
"database": "zotero_mcp_test",
"username": "zotero_mcp_test",
"password": "jt24jtiowjeiofjoi",
"schema": "public",
"pool_size": 5
},
"embedding": {
"provider": "ollama",
"openai": {
"api_key": "",
"model": "text-embedding-3-small",
"base_url": "",
"batch_size": 100
},
"ollama": {
"host": "192.168.1.189:8182",
"model": "nomic-embed-text",
"timeout": 60
}
},
"chunking": {
"chunk_size": 1000,
"overlap": 100,
"min_chunk_size": 100,
"max_chunks_per_item": 10,
"chunking_strategy": "sentences"
},
"semantic_search": {
"similarity_threshold": 0.7,
"max_results": 50,
"update_config": {
"auto_update": False,
"update_frequency": "manual",
"batch_size": 50,
"parallel_workers": 4
}
}
}
# Load from file if exists
if self.config_path and os.path.exists(self.config_path):
try:
with open(self.config_path, 'r') as f:
file_config = json.load(f)
# Deep merge configurations
self._deep_merge_config(default_config, file_config)
except Exception as e:
logger.warning(f"Error loading config from {self.config_path}: {e}")
# Override with environment variables
self._load_env_overrides(default_config)
return default_config
def _deep_merge_config(self, default: Dict[str, Any], override: Dict[str, Any]) -> None:
"""Deep merge configuration dictionaries."""
for key, value in override.items():
if key in default and isinstance(default[key], dict) and isinstance(value, dict):
self._deep_merge_config(default[key], value)
else:
default[key] = value
def _load_env_overrides(self, config: Dict[str, Any]) -> None:
"""Load environment variable overrides."""
# Database overrides
if os.getenv("ZOTERO_DB_HOST"):
config["database"]["host"] = os.getenv("ZOTERO_DB_HOST")
if os.getenv("ZOTERO_DB_PORT"):
config["database"]["port"] = int(os.getenv("ZOTERO_DB_PORT"))
if os.getenv("ZOTERO_DB_NAME"):
config["database"]["database"] = os.getenv("ZOTERO_DB_NAME")
if os.getenv("ZOTERO_DB_USER"):
config["database"]["username"] = os.getenv("ZOTERO_DB_USER")
if os.getenv("ZOTERO_DB_PASSWORD"):
config["database"]["password"] = os.getenv("ZOTERO_DB_PASSWORD")
# Embedding provider overrides
if os.getenv("ZOTERO_EMBEDDING_PROVIDER"):
config["embedding"]["provider"] = os.getenv("ZOTERO_EMBEDDING_PROVIDER")
if os.getenv("OPENAI_API_KEY"):
config["embedding"]["openai"]["api_key"] = os.getenv("OPENAI_API_KEY")
if os.getenv("OPENAI_BASE_URL"):
config["embedding"]["openai"]["base_url"] = os.getenv("OPENAI_BASE_URL")
if os.getenv("OLLAMA_HOST"):
config["embedding"]["ollama"]["host"] = os.getenv("OLLAMA_HOST")
def _ensure_default_update_config(self) -> None:
"""Ensure update config has required defaults with proper type conversion."""
defaults = {
"auto_update": False,
"update_frequency": "manual",
"last_update": None,
"batch_size": 50,
"parallel_workers": 4
}
for key, default_value in defaults.items():
if key not in self.update_config:
self.update_config[key] = default_value
elif key in ["batch_size", "parallel_workers"]:
# Ensure numeric values are properly typed
try:
self.update_config[key] = int(self.update_config[key])
except (ValueError, TypeError):
logger.warning(f"Invalid {key} value {self.update_config[key]}, using default {default_value}")
self.update_config[key] = default_value
def _initialize_embedding_provider(self) -> None:
"""Initialize embedding provider from configuration."""
try:
embedding_config = self.config.get("embedding", {})
# Check if we're in an async context
try:
# Try to get current event loop to see if we're in async context
current_loop = asyncio.get_running_loop()
# We're in an async context, use thread-based initialization
logger.debug("Initializing embedding provider from async context using thread")
self._initialize_in_thread(embedding_config)
except RuntimeError:
# No event loop running, we can use synchronous initialization
logger.debug("Initializing embedding provider from sync context")
try:
from .embedding_service import create_embedding_provider_sync
self.embedding_provider = create_embedding_provider_sync(embedding_config)
logger.info(f"Initialized {self.embedding_provider.get_provider_name()} embedding provider")
except Exception as sync_error:
logger.warning(f"Sync initialization failed: {sync_error}, trying thread-based approach")
self._initialize_in_thread(embedding_config)
except Exception as e:
logger.error(f"Error initializing embedding provider: {e}")
import traceback
logger.error(f"Full initialization traceback: {traceback.format_exc()}")
self.embedding_provider = None
def _initialize_in_thread(self, embedding_config: Dict[str, Any]) -> None:
"""Initialize embedding provider in a separate thread with isolated event loop."""
import threading
import concurrent.futures
result = None
exception = None
def init_provider():
nonlocal result, exception
try:
# Create a completely isolated event loop
new_loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(new_loop)
result = new_loop.run_until_complete(
create_embedding_provider(embedding_config)
)
finally:
try:
# Clean up pending tasks
pending = asyncio.all_tasks(new_loop)
if pending:
for task in pending:
task.cancel()
new_loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
except Exception as cleanup_error:
logger.debug(f"Error cleaning up tasks: {cleanup_error}")
finally:
new_loop.close()
# Clear the loop from the thread
asyncio.set_event_loop(None)
except Exception as e:
exception = e
import traceback
logger.debug(f"Thread initialization error: {traceback.format_exc()}")
thread = threading.Thread(target=init_provider, daemon=True)
thread.start()
thread.join(timeout=30) # Add timeout to prevent hanging
if thread.is_alive():
logger.error("Embedding provider initialization timed out")
self.embedding_provider = None
return
if exception:
logger.error(f"Thread initialization failed: {exception}")
self.embedding_provider = None
return
if result:
self.embedding_provider = result
logger.info(f"Initialized {self.embedding_provider.get_provider_name()} embedding provider")
else:
logger.error("Thread initialization returned no result")
self.embedding_provider = None
async def _ensure_embedding_provider(self) -> EmbeddingProvider:
"""Ensure embedding provider is initialized."""
if self.embedding_provider is None:
embedding_config = self.config.get("embedding", {})
self.embedding_provider = await create_embedding_provider(embedding_config)
return self.embedding_provider
def _save_update_config(self) -> None:
"""Save update configuration to file."""
if not self.config_path:
return
config_dir = Path(self.config_path).parent
config_dir.mkdir(parents=True, exist_ok=True)
try:
# Load existing config or create new one
full_config = {}
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
full_config = json.load(f)
# Update semantic search config
if "semantic_search" not in full_config:
full_config["semantic_search"] = {}
full_config["semantic_search"]["update_config"] = self.update_config
# Save config
with open(self.config_path, 'w') as f:
json.dump(full_config, f, indent=2)
except Exception as e:
logger.error(f"Error saving update config: {e}")
def should_update_database(self) -> bool:
"""Check if the database should be updated based on configuration."""
if not self.update_config.get("auto_update", False):
return False
frequency = self.update_config.get("update_frequency", "manual")
if frequency == "manual":
return False
elif frequency == "startup":
return True
elif frequency == "daily":
last_update = self.update_config.get("last_update")
if not last_update:
return True
last_update_date = datetime.fromisoformat(last_update)
return datetime.now() - last_update_date >= timedelta(days=1)
elif frequency.startswith("every_"):
try:
days = int(frequency.split("_")[1])
last_update = self.update_config.get("last_update")
if not last_update:
return True
last_update_date = datetime.fromisoformat(last_update)
return datetime.now() - last_update_date >= timedelta(days=days)
except (ValueError, IndexError):
return False
return False
def update_database(self,
force_full_rebuild: bool = False,
limit: Optional[int] = None,
start_fulltext_indexing: bool = True) -> Dict[str, Any]:
"""
Update the semantic search database with Zotero items.
Args:
force_full_rebuild: Whether to rebuild the entire database
limit: Limit number of items to process (for testing)
start_fulltext_indexing: Whether to start background full-text indexing
Returns:
Update statistics
"""
logger.info("Starting database update...")
start_time = datetime.now()
stats = {
"total_items": 0,
"processed_items": 0,
"added_items": 0,
"updated_items": 0,
"skipped_items": 0,
"errors": 0,
"start_time": start_time.isoformat(),
"duration": None
}
try:
# Check for model change
if not force_full_rebuild and detect_model_change(self.embedding_provider, self.config.get("embedding", {})):
logger.warning("Embedding model changed, forcing full rebuild")
force_full_rebuild = True
# Initialize database if needed
if force_full_rebuild:
logger.info("Initializing database schema...")
self.db_manager.initialize_database()
# Clear existing embeddings
self.vector_client.delete_items([]) # Delete all
# Get all items from Zotero
logger.info("Fetching items from Zotero...")
# Fetch items in batches to handle large libraries
batch_size = 100
start = 0
all_items = []
while True:
batch_params = {"start": start, "limit": batch_size}
if limit and len(all_items) >= limit:
break
items = self.zotero_client.items(**batch_params)
if not items:
break
# Filter out attachments and notes by default
filtered_items = [
item for item in items
if item.get("data", {}).get("itemType") not in ["attachment", "note"]
]
all_items.extend(filtered_items)
start += batch_size
if len(items) < batch_size:
break
if limit:
all_items = all_items[:limit]
stats["total_items"] = len(all_items)
logger.info(f"Found {stats['total_items']} items to process")
# Process items in batches
batch_size = self.update_config.get("batch_size", 50)
for i in range(0, len(all_items), batch_size):
batch = all_items[i:i + batch_size]
# Run async batch processing
loop = asyncio.get_event_loop()
batch_stats = loop.run_until_complete(
self._process_item_batch_async(batch, force_full_rebuild)
)
stats["processed_items"] += batch_stats["processed"]
stats["added_items"] += batch_stats["added"]
stats["updated_items"] += batch_stats["updated"]
stats["skipped_items"] += batch_stats["skipped"]
stats["errors"] += batch_stats["errors"]
logger.info(f"Processed {stats['processed_items']}/{stats['total_items']} items")
# Update last update time
self.update_config["last_update"] = datetime.now().isoformat()
self._save_update_config()
end_time = datetime.now()
stats["duration"] = str(end_time - start_time)
stats["end_time"] = end_time.isoformat()
# Start full-text indexing if requested
if start_fulltext_indexing:
fulltext_stats = self._trigger_fulltext_indexing()
stats["fulltext_indexing"] = fulltext_stats
logger.info(f"Database update completed in {stats['duration']}")
return stats
except Exception as e:
logger.error(f"Error updating database: {e}")
stats["error"] = str(e)
end_time = datetime.now()
stats["duration"] = str(end_time - start_time)
return stats
async def _process_item_batch_async(self, items: List[Dict[str, Any]],
force_rebuild: bool = False) -> Dict[str, int]:
"""Process a batch of items asynchronously."""
stats = {"processed": 0, "added": 0, "updated": 0, "skipped": 0, "errors": 0}
embedding_items = []
provider = await self._ensure_embedding_provider()
for item in items:
try:
item_key = item.get("key", "")
if not item_key:
stats["skipped"] += 1
continue
# Extract searchable text and calculate hash
full_text = extract_searchable_text(item)
if not full_text.strip():
stats["skipped"] += 1
continue
content_hash = calculate_content_hash(full_text)
# Check if item exists and needs update
if not force_rebuild and self.vector_client.check_item_exists(item_key, content_hash):
stats["skipped"] += 1
continue
# Create text chunks
chunks = create_document_chunks(item, self.chunking_config)
if not chunks:
stats["skipped"] += 1
continue
# Use the first chunk as the main content (could be modified to handle multiple chunks)
try:
main_content = chunks[0] if chunks else full_text[:1000] # Fallback to truncated text
except (IndexError, TypeError):
logger.warning(f"Could not access first chunk for item {item_key}, using truncated text")
main_content = full_text[:1000] if full_text else "No content available"
# Create metadata
data = item.get("data", {})
metadata = {
"item_type": data.get("itemType", ""),
"title": data.get("title", ""),
"date": data.get("date", ""),
"date_added": data.get("dateAdded", ""),
"creators": format_creators(data.get("creators", [])),
"publication": data.get("publicationTitle", ""),
"url": data.get("url", ""),
"doi": data.get("DOI", ""),
"tags": " ".join([tag.get("tag", "") for tag in data.get("tags", [])]),
"num_chunks": len(chunks)
}
embedding_items.append({
"item_key": item_key,
"item_type": data.get("itemType", ""),
"title": data.get("title", ""),
"content": main_content,
"content_hash": content_hash,
"metadata": metadata
})
stats["processed"] += 1
except Exception as e:
logger.error(f"Error processing item {item.get('key', 'unknown')}: {e}")
stats["errors"] += 1
# Generate embeddings for all items at once
if embedding_items:
try:
texts = [item["content"] for item in embedding_items]
embeddings = await provider.embed_texts(texts)
# Validate embeddings length matches texts
if not embeddings or len(embeddings) != len(texts):
logger.error(f"Embedding count mismatch: {len(embeddings)} embeddings for {len(texts)} texts")
stats["errors"] += len(embedding_items)
return stats
# Create EmbeddingItem objects
final_items = []
for i, (item, embedding) in enumerate(zip(embedding_items, embeddings)):
# Validate embedding is not empty
if not embedding or not isinstance(embedding, list):
logger.warning(f"Invalid embedding for item {item['item_key']}, skipping")
stats["errors"] += 1
continue
final_items.append(EmbeddingItem(
item_key=item["item_key"],
item_type=item["item_type"],
title=item["title"],
content=item["content"],
content_hash=item["content_hash"],
embedding=embedding,
embedding_model=provider.get_model_name(),
embedding_provider=provider.get_provider_name(),
metadata=item["metadata"]
))
# Upsert to database
upsert_stats = self.vector_client.upsert_embeddings(final_items)
stats["added"] += upsert_stats["added"]
stats["updated"] += upsert_stats["updated"]
except Exception as e:
logger.error(f"Error generating embeddings or upserting: {e}")
stats["errors"] += len(embedding_items)
return stats
async def search_async(self,
query: str,
limit: int = 10,
filters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Perform semantic search over the Zotero library (async version).
Args:
query: Search query text
limit: Maximum number of results to return
filters: Optional metadata filters
Returns:
Search results with Zotero item details
"""
try:
# Ensure embedding provider is initialized
try:
provider = await self._ensure_embedding_provider()
logger.debug("Embedding provider initialized successfully")
except Exception as e:
logger.error(f"Error initializing embedding provider: {e}")
raise
# Generate query embedding with proper event loop handling
try:
logger.debug(f"About to generate embedding for query: '{query[:50]}...' using provider: {provider.get_provider_name()}")
# Ensure we have a running event loop for the embedding generation
try:
# Get the current event loop
current_loop = asyncio.get_running_loop()
logger.debug(f"Using current event loop: {current_loop}")
query_embedding = await provider.embed_single_text(query)
except RuntimeError as loop_error:
logger.warning(f"No running event loop, creating new one: {loop_error}")
# If no event loop is running, create one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
query_embedding = loop.run_until_complete(provider.embed_single_text(query))
finally:
loop.close()
asyncio.set_event_loop(None)
logger.debug(f"Generated query embedding of dimension {len(query_embedding) if query_embedding else 0}")
# Debug: Check if embedding is all zeros
if query_embedding and all(x == 0.0 for x in query_embedding):
logger.warning(f"Generated embedding is all zeros! This indicates an embedding generation problem.")
logger.debug(f"Query was: '{query}'")
logger.debug(f"Provider: {provider.get_provider_name()}, Model: {provider.get_model_name()}")
except Exception as e:
logger.error(f"Error generating query embedding: {e}")
logger.error(f"Provider: {provider.get_provider_name() if provider else 'None'}")
logger.error(f"Query: '{query}'")
import traceback
logger.error(f"Embedding error traceback: {traceback.format_exc()}")
raise
# Validate query embedding
if not query_embedding or not isinstance(query_embedding, list):
logger.error(f"Invalid query embedding: {query_embedding}")
raise ValueError("Query embedding is invalid or empty")
# DEBUG NOTE: The list index error has to happen after this ...
logger.debug("About to perform vector search...")
# Validate query embedding before search
if not query_embedding or len(query_embedding) == 0:
logger.error("Query embedding is empty")
raise ValueError("Query embedding is empty")
# Check if embedding is all zeros (which might cause issues)
if all(x == 0.0 for x in query_embedding):
logger.warning("Query embedding contains all zeros - this may indicate an embedding generation issue")
# Perform vector search
try:
logger.debug(f"Calling vector search with embedding length: {len(query_embedding)}")
search_results = self.vector_client.search_similar(
query_vector=query_embedding,
limit=limit,
filters=filters,
similarity_threshold=self.config.get("semantic_search", {}).get("similarity_threshold", 0.0)
)
logger.debug(f"Vector search returned {len(search_results) if search_results else 0} results")
logger.debug(f"Search results type: {type(search_results)}")
except IndexError as ie:
logger.error(f"IndexError in vector search: {ie}")
import traceback
logger.error(f"IndexError traceback: {traceback.format_exc()}")
raise
except Exception as e:
logger.error(f"Error in vector search: {e}")
import traceback
logger.error(f"Vector search traceback: {traceback.format_exc()}")
raise
# Check if search_results is valid
if not isinstance(search_results, list):
logger.error(f"Invalid search results type: {type(search_results)}")
search_results = []
# Validate each search result before enriching
logger.debug("Validating search results before enrichment...")
valid_results = []
for i, result in enumerate(search_results):
try:
if result is None:
logger.warning(f"Search result {i} is None, skipping")
continue
# Check if result has expected attributes
if not hasattr(result, 'item_key'):
logger.error(f"Search result {i} missing item_key attribute: {type(result)}")
continue
if not result.item_key:
logger.error(f"Search result {i} has empty item_key")
continue
valid_results.append(result)
except Exception as e:
logger.error(f"Error validating search result {i}: {e}")
continue
logger.debug(f"Validated {len(valid_results)} out of {len(search_results)} search results")
# Enrich results with full Zotero item data
try:
logger.debug("About to enrich search results...")
enriched_results = self._enrich_search_results(valid_results, query)
logger.debug(f"Enriched {len(enriched_results)} search results")
except IndexError as ie:
logger.error(f"IndexError in enriching search results: {ie}")
import traceback
logger.error(f"IndexError traceback: {traceback.format_exc()}")
raise
except Exception as e:
logger.error(f"Error enriching search results: {e}")
import traceback
logger.error(f"Enrichment traceback: {traceback.format_exc()}")
raise
return {
"query": query,
"limit": limit,
"filters": filters,
"results": enriched_results,
"total_found": len(enriched_results)
}
except Exception as e:
import traceback
logger.error(f"Error performing semantic search: {e}")
logger.error(f"Full traceback: {traceback.format_exc()}")
return {
"query": query,
"limit": limit,
"filters": filters,
"results": [],
"total_found": 0,
"error": str(e)
}
def search(self,
query: str,
limit: int = 10,
filters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Perform semantic search over the Zotero library (sync version).
Args:
query: Search query text
limit: Maximum number of results to return
filters: Optional metadata filters
Returns:
Search results with Zotero item details
"""
try:
logger.debug(f"Starting sync search for query: '{query}', limit: {limit}")
loop = asyncio.get_event_loop()
if loop.is_running():
# If event loop is already running, we can't use run_until_complete
# Instead, we need to run the async code in a thread with proper event loop isolation
import concurrent.futures
import threading
result = None
exception = None
def run_async():
nonlocal result, exception
try:
# Create a completely isolated event loop for this thread
new_loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(new_loop)
result = new_loop.run_until_complete(self.search_async(query, limit, filters))
finally:
# Properly close the loop and clear it from thread
try:
new_loop.close()
finally:
asyncio.set_event_loop(None)
except Exception as e:
exception = e
import traceback
logger.error(f"Exception in async thread: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
thread = threading.Thread(target=run_async, daemon=True)
thread.start()
thread.join(timeout=60) # Add timeout to prevent hanging
if thread.is_alive():
logger.error("Search thread timed out")
return {
"query": query,
"limit": limit,
"filters": filters,
"results": [],
"total_found": 0,
"error": "Search operation timed out"
}
if exception:
logger.error(f"Thread execution failed: {exception}")
raise exception
return result
else:
return loop.run_until_complete(self.search_async(query, limit, filters))
except IndexError as ie:
logger.error(f"IndexError (list index out of range) in search: {ie}")
import traceback
logger.error(f"IndexError traceback: {traceback.format_exc()}")
return {
"query": query,
"limit": limit,
"filters": filters,
"results": [],
"total_found": 0,
"error": f"List index error: {str(ie)}"
}
except RuntimeError as re:
logger.debug("No event loop running, creating new one")
try:
return asyncio.run(self.search_async(query, limit, filters))
except Exception as e:
logger.error(f"Error in asyncio.run: {e}")
import traceback
logger.error(f"asyncio.run traceback: {traceback.format_exc()}")
return {
"query": query,
"limit": limit,
"filters": filters,
"results": [],
"total_found": 0,
"error": str(e)
}
except Exception as e:
logger.error(f"Unexpected error in search: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return {
"query": query,
"limit": limit,
"filters": filters,
"results": [],
"total_found": 0,
"error": str(e)
}
def _enrich_search_results(self, search_results: List, query: str) -> List[Dict[str, Any]]:
"""Enrich search results with full Zotero item data."""
enriched = []
# Check if search_results is empty or None
if not search_results:
logger.info("No search results to enrich")
return enriched
# Validate search_results is a proper list
if not isinstance(search_results, list):
logger.error(f"search_results is not a list: {type(search_results)}")
return enriched
for i, result in enumerate(search_results):
try:
# Handle different result formats more robustly
if result is None:
logger.warning(f"Result {i} is None, skipping")
continue
# Validate result structure
if not hasattr(result, 'item_key'):
logger.error(f"Result {i} missing item_key attribute: {result}")
continue
if not result.item_key:
logger.error(f"Result {i} has empty item_key")
continue
# Get full item data from Zotero
try:
zotero_item = self.zotero_client.item(result.item_key)
except Exception as zotero_error:
logger.warning(f"Could not fetch Zotero item {result.item_key}: {zotero_error}")
zotero_item = None
enriched_result = {
"item_key": result.item_key,
"similarity_score": getattr(result, 'similarity_score', 0.0),
"matched_text": getattr(result, 'content', ''),
"metadata": getattr(result, 'metadata', {}),
"zotero_item": zotero_item,
"embedding_model": getattr(result, 'embedding_model', 'unknown'),
"embedding_provider": getattr(result, 'embedding_provider', 'unknown'),
"query": query
}
enriched.append(enriched_result)
except AttributeError as e:
logger.error(f"Error accessing result attributes for item {i}: {e}, result type: {type(result)}")
# Try to handle different result formats
try:
if isinstance(result, dict):
enriched_result = {
"item_key": result.get("item_key", "unknown"),
"similarity_score": result.get("similarity_score", 0.0),
"matched_text": result.get("content", ""),
"metadata": result.get("metadata", {}),
"zotero_item": None,
"embedding_model": result.get("embedding_model", "unknown"),
"embedding_provider": result.get("embedding_provider", "unknown"),
"query": query,
"error": f"Result format issue: {e}"
}
enriched.append(enriched_result)
except Exception as inner_e:
logger.error(f"Failed to handle result format: {inner_e}")
except Exception as e:
logger.error(f"Error enriching result for item {getattr(result, 'item_key', 'unknown')}: {e}")
# Include basic result even if enrichment fails
try:
enriched.append({
"item_key": getattr(result, 'item_key', 'unknown'),
"similarity_score": getattr(result, 'similarity_score', 0.0),
"matched_text": getattr(result, 'content', ''),
"metadata": getattr(result, 'metadata', {}),
"embedding_model": getattr(result, 'embedding_model', 'unknown'),
"embedding_provider": getattr(result, 'embedding_provider', 'unknown'),
"query": query,
"error": f"Could not fetch full item data: {e}"
})
except Exception as inner_e:
logger.error(f"Failed to create fallback result: {inner_e}")
return enriched
def get_database_status(self) -> Dict[str, Any]:
"""Get status information about the semantic search database."""
try:
# Get vector client status
vector_status = self.vector_client.get_database_status()
# Get database manager status
db_status = self.db_manager.get_database_status()
# Extract collection information for the expected format
collection_info = {
"name": "Zotero Semantic Search",
"count": vector_status.get("total_items", 0),
"embedding_model": None,
"persist_directory": f"{self.config.get('database', {}).get('host', 'Unknown')}:{self.config.get('database', {}).get('port', 'Unknown')}"
}
# Get embedding model info
if self.embedding_provider:
try:
provider_name = self.embedding_provider.get_provider_name()
model_name = self.embedding_provider.get_model_name()
collection_info["embedding_model"] = f"{provider_name}/{model_name}"
except Exception as e:
logger.warning(f"Error getting embedding provider info: {e}")
collection_info["embedding_model"] = "Unknown"
else:
# Try to initialize embedding provider if it's None
logger.info("Embedding provider not initialized, attempting to initialize...")
self._initialize_embedding_provider()
if self.embedding_provider:
try:
provider_name = self.embedding_provider.get_provider_name()
model_name = self.embedding_provider.get_model_name()
collection_info["embedding_model"] = f"{provider_name}/{model_name}"
except Exception as e:
logger.warning(f"Error getting embedding provider info after initialization: {e}")
collection_info["embedding_model"] = "Unknown"
else:
collection_info["embedding_model"] = "Unknown"
# Combine status information in expected format
status = {
"collection_info": collection_info,
"vector_database": vector_status,
"database_info": db_status,
"update_config": self.update_config,
"should_update": self.should_update_database(),
"last_update": self.update_config.get("last_update"),
"embedding_provider": {
"name": self.embedding_provider.get_provider_name() if self.embedding_provider else None,
"model": self.embedding_provider.get_model_name() if self.embedding_provider else None,
"dimension": self.embedding_provider.get_embedding_dimension() if self.embedding_provider else None
},
"chunking_config": self.chunking_config
}
return status
except Exception as e:
logger.error(f"Error getting database status: {e}")
return {
"error": str(e),
"collection_info": {
"name": "Unknown",
"count": 0,
"embedding_model": "Unknown",
"persist_directory": "Unknown"
},
"update_config": self.update_config,
"should_update": False
}
def delete_item(self, item_key: str) -> bool:
"""Delete an item from the semantic search database."""
try:
deleted_count = self.vector_client.delete_items([item_key])
return deleted_count > 0
except Exception as e:
logger.error(f"Error deleting item {item_key}: {e}")
return False
def get_item_count(self) -> int:
"""Get total number of indexed items."""
try:
return self.vector_client.get_item_count()
except Exception as e:
logger.error(f"Error getting item count: {e}")
return 0
def _trigger_fulltext_indexing(self) -> Dict[str, Any]:
"""
Trigger full-text indexing for attachments.
This method starts the full-text indexing process for attachments
that haven't been processed yet.
Returns:
Dictionary with indexing trigger statistics
"""
try:
# Check if full-text indexing is enabled
fulltext_config = self.config.get("semantic_search", {}).get("fulltext_indexing", {})
if not fulltext_config.get("enabled", True):
return {"status": "disabled", "message": "Full-text indexing is disabled"}
logger.info("Triggering full-text indexing for attachments...")
# Import and create full-text indexing manager
from .fulltext_indexing import create_fulltext_indexing_manager
indexing_manager = create_fulltext_indexing_manager(self.config_path)
# Get processing statistics before queuing
stats_before = indexing_manager.get_processing_stats()
# Note: In a real implementation, this would need to run in a background task
# For now, we'll just return information about what would be triggered
# Get unprocessed attachments count
unprocessed = self.db_manager.get_unprocessed_attachments(
supported_content_types=fulltext_config.get("supported_content_types", [
"application/pdf",
"text/html",
"text/plain"
]),
limit=100
)
return {
"status": "triggered",
"message": f"Full-text indexing triggered for {len(unprocessed)} unprocessed attachments",
"unprocessed_count": len(unprocessed),
"indexing_enabled": True,
"background_workers": fulltext_config.get("background_workers", 2),
"supported_types": fulltext_config.get("supported_content_types", [])
}
except Exception as e:
logger.error(f"Error triggering full-text indexing: {e}")
return {
"status": "error",
"message": f"Failed to trigger full-text indexing: {str(e)}"
}
def get_fulltext_indexing_status(self) -> Dict[str, Any]:
"""
Get comprehensive status of full-text indexing operations.
Returns:
Dictionary with current indexing status matching MCP tool expectations
"""
try:
# Get attachment processing statistics from database
attachment_stats = self.db_manager.get_attachment_statistics()
# Get full-text chunk statistics from vector database
try:
# Query for full-text chunks
fulltext_chunk_stats = self._get_fulltext_chunk_statistics()
except Exception as e:
logger.warning(f"Could not get full-text chunk statistics: {e}")
fulltext_chunk_stats = {}
# Get full-text indexing configuration
fulltext_config = self.config.get("semantic_search", {}).get("fulltext_indexing", {})
# Calculate derived statistics
total_attachments = attachment_stats.get("total_attachments", 0)
extracted_count = attachment_stats.get("extracted", 0)
failed_count = attachment_stats.get("failed", 0)
pending_count = attachment_stats.get("pending", 0)
processing_count = attachment_stats.get("processing", 0)
total_chunks = attachment_stats.get("total_chunks", 0)
# Calculate success rate
total_attempted = extracted_count + failed_count
success_rate = (extracted_count / total_attempted) if total_attempted > 0 else 0.0
# Calculate average chunks per document
avg_chunks_per_doc = (total_chunks / extracted_count) if extracted_count > 0 else 0.0
# Extract extraction methods from content type breakdown
extraction_methods = {}
content_types = attachment_stats.get("content_type_breakdown", [])
for ct in content_types:
content_type = ct.get("content_type", "unknown")
method_name = self._get_extraction_method_name(content_type)
extraction_methods[method_name] = extraction_methods.get(method_name, 0) + ct.get("extracted_count", 0)
# Get recent activity (already provided by get_attachment_statistics)
recent_activity = attachment_stats.get("recent_activity", [])
# Calculate recent activity summary
recent_summary = {}
if recent_activity:
total_processed_24h = sum(activity.get("completed_count", 0) for activity in recent_activity)
recent_summary = {
"processed_24h": total_processed_24h,
"failed_24h": 0, # Would need additional query to get failed items in last 24h
"total_time_24h": "N/A" # Would need timing data
}
return {
# Basic counts expected by MCP tool
"total_attachments": total_attachments,
"processed_count": extracted_count,
"failed_count": failed_count,
"pending_count": pending_count,
"processing_count": processing_count,
# Content statistics
"fulltext_chunks": fulltext_chunk_stats.get("total_fulltext_chunks", total_chunks),
"avg_chunks_per_doc": avg_chunks_per_doc,
"total_text_size": fulltext_chunk_stats.get("total_text_length", 0),
# Performance metrics
"avg_processing_time": "N/A", # Would need timing tracking
"success_rate": success_rate,
# Extraction methods used
"extraction_methods": extraction_methods,
# Recent activity
"recent_activity": recent_summary,
# Configuration and enabled status
"enabled": fulltext_config.get("enabled", True),
"configuration": {
"background_workers": fulltext_config.get("background_workers", 2),
"batch_size": fulltext_config.get("batch_size", 10),
"retry_attempts": fulltext_config.get("retry_attempts", 3),
"supported_content_types": fulltext_config.get("supported_content_types", []),
"extraction_strategies": fulltext_config.get("extraction_strategies", [])
},
# Original detailed statistics for backwards compatibility
"attachment_statistics": attachment_stats
}
except Exception as e:
logger.error(f"Error getting fulltext indexing status: {e}")
return {"error": str(e)}
def _get_fulltext_chunk_statistics(self) -> Dict[str, Any]:
"""Get statistics about full-text chunks from the vector database."""
try:
# Query the vector database for full-text chunk statistics
query = """
SELECT
COUNT(*) as total_fulltext_chunks,
SUM(LENGTH(content)) as total_text_length,
COUNT(DISTINCT parent_item_key) as items_with_fulltext,
AVG(LENGTH(content)) as avg_chunk_length
FROM zotero_embeddings
WHERE content_type = 'fulltext'
"""
conn = self.db_manager.connect()
cursor = conn.cursor()
cursor.execute(query)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
return {
"total_fulltext_chunks": result[0] or 0,
"total_text_length": result[1] or 0,
"items_with_fulltext": result[2] or 0,
"avg_chunk_length": int(result[3] or 0)
}
return {}
except Exception as e:
logger.debug(f"Could not get full-text chunk statistics: {e}")
return {}
def _get_extraction_method_name(self, content_type: str) -> str:
"""Map content type to extraction method name."""
if content_type == "application/pdf":
return "pdf_extraction"
elif content_type in ["text/html", "text/plain"]:
return "text_extraction"
elif content_type in ["application/msword", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"]:
return "document_extraction"
else:
return "unknown_extraction"
def create_semantic_search(config_path: Optional[str] = None) -> ZoteroSemanticSearch:
"""
Create a ZoteroSemanticSearch instance.
Args:
config_path: Path to configuration file
Returns:
Configured ZoteroSemanticSearch instance
"""
# Ensure logging is configured before creating the instance
ensure_logging_configured(config_path)
return ZoteroSemanticSearch(config_path=config_path)