"""
Full-text indexing manager for semantic search.
This module provides asynchronous full-text extraction and indexing capabilities
that run in the background without blocking MCP operations.
"""
import asyncio
import hashlib
import logging
import tempfile
import os
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List, Optional, Callable
from concurrent.futures import ThreadPoolExecutor
import time
from .chunking_utils import create_streaming_fulltext_chunks, validate_chunking_config
from .client import convert_to_markdown, get_zotero_client
from .vector_client import EmbeddingItem
from .db_schema import create_database_manager
logger = logging.getLogger(__name__)
@dataclass
class FullTextExtractionTask:
"""Task for full-text extraction processing."""
parent_item_key: str
attachment_key: str
content_type: str
filename: str
file_size: int
file_hash: str
priority: int = 1 # Lower number = higher priority
retry_count: int = 0
created_at: datetime = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
@dataclass
class ProcessingStats:
"""Statistics for full-text processing."""
total_queued: int = 0
total_processed: int = 0
total_completed: int = 0
total_failed: int = 0
total_skipped: int = 0
current_processing: int = 0
start_time: datetime = None
def __post_init__(self):
if self.start_time is None:
self.start_time = datetime.now()
class FullTextIndexingManager:
"""Manages asynchronous full-text extraction and indexing."""
def __init__(self, config_path: Optional[str] = None):
"""
Initialize full-text indexing manager.
Args:
config_path: Path to configuration file
"""
self.config_path = config_path
self.config = self._load_config()
# Initialize components
self.db_manager = create_database_manager(
self.config.get("database", {}),
embedding_dimension=None # Will be set later
)
self.zotero_client = get_zotero_client()
# Processing queue and workers
self.task_queue = asyncio.Queue()
self.worker_tasks = []
self.is_running = False
# Configuration
fulltext_config = self.config.get("semantic_search", {}).get("fulltext_indexing", {})
self.enabled = fulltext_config.get("enabled", True)
self.num_workers = fulltext_config.get("background_workers", 2)
self.batch_size = fulltext_config.get("batch_size", 10)
self.retry_attempts = fulltext_config.get("retry_attempts", 3)
self.retry_delay = fulltext_config.get("retry_delay", 300) # seconds
self.supported_types = fulltext_config.get("supported_content_types", [
"application/pdf",
"text/html",
"text/plain",
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"
])
# Statistics tracking
self.stats = ProcessingStats()
self.processing_items = {} # attachment_key -> task
# Thread pool for blocking operations
self.thread_pool = ThreadPoolExecutor(max_workers=4)
logger.info(f"Full-text indexing manager initialized: "
f"enabled={self.enabled}, workers={self.num_workers}")
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from file with defaults."""
default_config = {
"database": {
"host": "192.168.1.173",
"port": 5432,
"database": "zotero_mcp_test",
"username": "zotero_mcp_test",
"password": "jt24jtiowjeiofjoi",
"schema": "public",
"pool_size": 5
},
"embedding": {
"provider": "ollama",
"openai": {
"api_key": "",
"model": "text-embedding-3-small",
"batch_size": 100
},
"ollama": {
"host": "192.168.1.189:8182",
"model": "nomic-embed-text",
"timeout": 60
}
},
"semantic_search": {
"fulltext_indexing": {
"enabled": True,
"auto_process_on_startup": True,
"background_workers": 2,
"batch_size": 10,
"retry_attempts": 3,
"retry_delay": 300,
"supported_content_types": [
"application/pdf",
"text/html",
"text/plain",
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"
],
"extraction_strategies": [
"zotero_api",
"markitdown"
],
"progress_reporting_interval": 30
}
},
"chunking": {
"chunk_size": 1000,
"overlap": 100,
"min_chunk_size": 100,
"fulltext_chunk_size": 2000,
"fulltext_overlap": 200,
"chunking_strategy": "sentences"
}
}
# Load from file if exists
if self.config_path and os.path.exists(self.config_path):
try:
import json
with open(self.config_path, 'r') as f:
file_config = json.load(f)
# Deep merge configurations
self._deep_merge_config(default_config, file_config)
except Exception as e:
logger.warning(f"Error loading config from {self.config_path}: {e}")
return default_config
def _deep_merge_config(self, default: Dict[str, Any], override: Dict[str, Any]) -> None:
"""Deep merge configuration dictionaries."""
for key, value in override.items():
if key in default and isinstance(default[key], dict) and isinstance(value, dict):
self._deep_merge_config(default[key], value)
else:
default[key] = value
async def start_background_workers(self) -> None:
"""Start background worker tasks for processing."""
if not self.enabled:
logger.info("Full-text indexing is disabled")
return
if self.is_running:
logger.warning("Background workers already running")
return
self.is_running = True
logger.info(f"Starting {self.num_workers} background workers for full-text processing")
# Start worker tasks
for i in range(self.num_workers):
worker_task = asyncio.create_task(self._worker_loop(f"worker-{i}"))
self.worker_tasks.append(worker_task)
logger.info("Full-text processing workers started")
async def shutdown(self) -> None:
"""Shutdown background workers gracefully."""
if not self.is_running:
return
logger.info("Shutting down full-text indexing manager...")
self.is_running = False
# Cancel all worker tasks
for task in self.worker_tasks:
task.cancel()
# Wait for workers to finish
if self.worker_tasks:
await asyncio.gather(*self.worker_tasks, return_exceptions=True)
self.worker_tasks.clear()
# Shutdown thread pool
self.thread_pool.shutdown(wait=True)
logger.info("Full-text indexing manager shut down")
async def queue_unprocessed_items(self, force_discovery: bool = False,
initial_discovery_only: bool = True,
reset_failed: bool = True) -> int:
"""
Queue unprocessed attachments for full-text extraction.
Args:
force_discovery: Force full attachment discovery even if recently done
initial_discovery_only: Only do minimal discovery on first call (for startup)
reset_failed: Reset previously failed documents to give them another chance
"""
if not self.enabled:
return 0
logger.info("Queuing unprocessed attachments for full-text extraction...")
try:
# Reset failed documents to pending if requested (useful after code fixes)
if reset_failed:
reset_count = await self._reset_failed_attachments()
if reset_count > 0:
logger.info(f"Reset {reset_count} failed attachments to pending status for retry")
# For startup, do minimal discovery to avoid blocking
if initial_discovery_only:
await self._discover_attachments_minimal()
else:
# Full discovery for manual/background operations
await self._discover_attachments()
# Get unprocessed attachments from database
unprocessed = self.db_manager.get_unprocessed_attachments(
self.supported_types,
limit=1000 # Process in batches
)
queued_count = 0
for attachment in unprocessed:
task = FullTextExtractionTask(
parent_item_key=attachment["parent_item_key"],
attachment_key=attachment["attachment_key"],
content_type=attachment["content_type"],
filename=attachment["filename"],
file_size=attachment["file_size"],
file_hash=attachment["file_hash"],
priority=1,
retry_count=attachment["retry_count"]
)
await self.task_queue.put(task)
queued_count += 1
self.stats.total_queued += queued_count
logger.info(f"Queued {queued_count} attachments for full-text processing")
return queued_count
except Exception as e:
logger.error(f"Error queuing unprocessed items: {e}")
return 0
async def _discover_attachments_minimal(self) -> None:
"""
Minimal attachment discovery for startup - only process existing database entries.
This avoids the expensive full Zotero library scan during startup.
"""
logger.info("Performing minimal attachment discovery (startup mode)...")
try:
# Just validate we have database connectivity and some basic setup
# The full discovery will happen in background after startup
existing_count = self.db_manager.count_tracked_attachments()
logger.info(f"Found {existing_count} already tracked attachments in database")
# Schedule full discovery for later if we have very few or no attachments
if existing_count < 10:
logger.info("Few attachments in database - will perform full discovery in background")
# Schedule background discovery task
asyncio.create_task(self._background_full_discovery())
except Exception as e:
logger.warning(f"Error in minimal attachment discovery: {e}")
async def _background_full_discovery(self) -> None:
"""Background task for full attachment discovery."""
# Wait a bit to let server fully start up
await asyncio.sleep(30)
logger.info("Starting background full attachment discovery...")
try:
await self._discover_attachments()
except Exception as e:
logger.error(f"Error in background attachment discovery: {e}")
async def _discover_attachments(self) -> None:
"""Discover and track attachments from Zotero (full discovery)."""
logger.info("Discovering attachments from Zotero...")
try:
# Use thread pool to avoid blocking async event loop
attachment_count = await asyncio.get_event_loop().run_in_executor(
self.thread_pool,
self._discover_attachments_sync
)
logger.info(f"Discovered and tracked {attachment_count} new/changed attachments")
except Exception as e:
logger.error(f"Error discovering attachments: {e}")
def _discover_attachments_sync(self) -> int:
"""Synchronous attachment discovery - runs in thread pool."""
attachment_count = 0
try:
# Get all items from Zotero in batches
start = 0
batch_size = 100
while True:
# Get batch of items
batch_params = {"start": start, "limit": batch_size}
items = self.zotero_client.items(**batch_params)
if not items:
break
# Process each item to find attachments
for item in items:
item_key = item.get("key", "")
if not item_key:
continue
# Get child attachments
try:
children = self.zotero_client.children(item_key)
for child in children:
child_data = child.get("data", {})
if child_data.get("itemType") == "attachment":
content_type = child_data.get("contentType", "")
# Only track supported content types
if content_type in self.supported_types:
attachment_key = child.get("key", "")
filename = child_data.get("filename", "")
# Calculate file hash (simplified - using attachment key)
file_hash = hashlib.sha256(
f"{attachment_key}:{child_data.get('md5', '')}".encode()
).hexdigest()
# Estimate file size (use md5 length as proxy)
file_size = len(child_data.get("md5", "")) * 1000
# Track attachment in database
file_changed = self.db_manager.upsert_attachment(
parent_item_key=item_key,
attachment_key=attachment_key,
content_type=content_type,
filename=filename,
file_size=file_size,
file_hash=file_hash
)
if file_changed:
attachment_count += 1
except Exception as e:
logger.debug(f"Error processing attachments for item {item_key}: {e}")
continue
start += batch_size
# Break if we got less than batch_size (last batch)
if len(items) < batch_size:
break
return attachment_count
except Exception as e:
logger.error(f"Error in sync attachment discovery: {e}")
return 0
async def _worker_loop(self, worker_id: str) -> None:
"""Main worker loop for processing extraction tasks."""
logger.info(f"Full-text worker {worker_id} started")
while self.is_running:
try:
# Get task from queue with timeout
task = await asyncio.wait_for(self.task_queue.get(), timeout=5.0)
# Track processing
self.processing_items[task.attachment_key] = task
self.stats.current_processing += 1
logger.debug(f"Worker {worker_id} processing attachment {task.attachment_key}")
# Update status to processing
self.db_manager.update_attachment_status(
task.attachment_key,
"processing"
)
# Process the attachment
success = await self._process_attachment_task(task, worker_id)
if success:
self.stats.total_completed += 1
logger.debug(f"Worker {worker_id} completed {task.attachment_key}")
else:
self.stats.total_failed += 1
logger.warning(f"Worker {worker_id} failed to process {task.attachment_key}")
# Update tracking
self.processing_items.pop(task.attachment_key, None)
self.stats.current_processing -= 1
self.stats.total_processed += 1
except asyncio.TimeoutError:
# No tasks in queue, continue
continue
except asyncio.CancelledError:
logger.info(f"Worker {worker_id} cancelled")
break
except Exception as e:
logger.error(f"Worker {worker_id} error: {e}")
# Continue processing other tasks
continue
logger.info(f"Full-text worker {worker_id} stopped")
async def _process_attachment_task(self, task: FullTextExtractionTask, worker_id: str) -> bool:
"""Process a single attachment extraction task."""
try:
# Extract full-text content
full_text = await self._extract_fulltext_content(task)
if not full_text or len(full_text.strip()) < 50:
# Mark as skipped if no meaningful content
self.db_manager.update_attachment_status(
task.attachment_key,
"skipped",
error_message="No extractable text content"
)
self.stats.total_skipped += 1
return True # Not a failure, just no content
# Process full-text into chunks and embeddings
chunk_count = await self._process_fulltext_chunks(task, full_text, worker_id)
if chunk_count > 0:
# Mark as completed
self.db_manager.update_attachment_status(
task.attachment_key,
"completed",
chunk_count=chunk_count
)
logger.info(f"Processed {task.attachment_key}: {chunk_count} chunks")
return True
else:
# Failed to create chunks
self.db_manager.update_attachment_status(
task.attachment_key,
"failed",
error_message="Failed to create text chunks"
)
return False
except Exception as e:
error_msg = f"Error processing {task.attachment_key}: {str(e)}"
logger.error(error_msg)
# Mark as failed and handle retries
self.db_manager.update_attachment_status(
task.attachment_key,
"failed",
error_message=error_msg
)
# Queue for retry if under retry limit
if task.retry_count < self.retry_attempts:
retry_task = FullTextExtractionTask(
parent_item_key=task.parent_item_key,
attachment_key=task.attachment_key,
content_type=task.content_type,
filename=task.filename,
file_size=task.file_size,
file_hash=task.file_hash,
priority=task.priority + 1, # Lower priority for retries
retry_count=task.retry_count + 1
)
# Delay retry
await asyncio.sleep(min(self.retry_delay, 600))
await self.task_queue.put(retry_task)
logger.info(f"Queued retry {task.retry_count + 1} for {task.attachment_key}")
return False
async def _extract_fulltext_content(self, task: FullTextExtractionTask) -> str:
"""Extract full-text content from attachment using multiple strategies."""
strategies = self.config.get("semantic_search", {}).get("fulltext_indexing", {}).get(
"extraction_strategies", ["zotero_api", "markitdown"]
)
logger.info(f"Attempting extraction for {task.attachment_key} using strategies: {strategies}")
for strategy in strategies:
try:
logger.info(f"Trying strategy '{strategy}' for {task.attachment_key}")
if strategy == "zotero_api":
content = await self._extract_via_zotero_api(task.attachment_key)
elif strategy == "markitdown":
content = await self._extract_via_markitdown(task)
else:
logger.warning(f"Unknown extraction strategy: {strategy}")
continue
if content and len(content.strip()) > 50:
logger.info(f"SUCCESS: Extracted {len(content)} characters from {task.attachment_key} using {strategy}")
return content
else:
logger.info(f"Strategy '{strategy}' returned insufficient content ({len(content) if content else 0} chars) for {task.attachment_key}")
except Exception as e:
logger.error(f"Strategy {strategy} failed for {task.attachment_key}: {e}")
import traceback
traceback.print_exc()
continue
logger.warning(f"All extraction strategies failed for {task.attachment_key}")
return ""
async def _extract_via_zotero_api(self, attachment_key: str) -> str:
"""Extract full-text using Zotero's API."""
import time
ZOTERO_API_TIMEOUT = 30 # 30 seconds for API call
try:
start_time = time.time()
logger.info(f"[TIMEOUT_BG] Zotero API: Starting fulltext extraction for {attachment_key} with {ZOTERO_API_TIMEOUT}s timeout")
# Run in thread pool with timeout to avoid blocking
full_text_data = await asyncio.wait_for(
asyncio.get_event_loop().run_in_executor(
self.thread_pool,
self.zotero_client.fulltext_item,
attachment_key
),
timeout=ZOTERO_API_TIMEOUT
)
api_time = time.time() - start_time
if full_text_data and "content" in full_text_data:
content = full_text_data["content"]
if content and len(content.strip()) > 50:
logger.info(f"[TIMEOUT_BG] Zotero API extracted {len(content):,} characters in {api_time:.2f}s from {attachment_key}")
return content
logger.info(f"[TIMEOUT_BG] Zotero API returned empty content in {api_time:.2f}s for {attachment_key}")
return ""
except asyncio.TimeoutError:
api_time = time.time() - start_time
logger.warning(f"[TIMEOUT_BG] Zotero API extraction TIMED OUT after {api_time:.2f}s (limit: {ZOTERO_API_TIMEOUT}s) for {attachment_key}")
return ""
except Exception as e:
api_time = time.time() - start_time
# This is expected when fulltext hasn't been indexed yet
logger.debug(f"[TIMEOUT_BG] Zotero API extraction failed for {attachment_key} in {api_time:.2f}s: {e}")
return ""
async def _extract_via_markitdown(self, task: FullTextExtractionTask) -> str:
"""Extract full-text using MarkItDown library."""
import time
# Timeout configurations
DOWNLOAD_TIMEOUT = 120 # 2 minutes for download
CONVERSION_TIMEOUT = 180 # 3 minutes for conversion
start_time = time.time()
try:
logger.info(f"[TIMEOUT_BG] MarkItDown: Starting extraction for {task.attachment_key}")
# Download file to temporary location
with tempfile.TemporaryDirectory() as tmpdir:
file_path = os.path.join(tmpdir, task.filename or f"{task.attachment_key}.pdf")
logger.info(f"[TIMEOUT_BG] MarkItDown: Downloading to {file_path} with {DOWNLOAD_TIMEOUT}s timeout")
# Download file with timeout protection
download_start = time.time()
try:
def download_file():
return self.zotero_client.dump(
task.attachment_key,
filename=os.path.basename(file_path),
path=tmpdir
)
await asyncio.wait_for(
asyncio.get_event_loop().run_in_executor(
self.thread_pool,
download_file
),
timeout=DOWNLOAD_TIMEOUT
)
download_time = time.time() - download_start
if os.path.exists(file_path):
file_size = os.path.getsize(file_path)
logger.info(f"[TIMEOUT_BG] MarkItDown: Downloaded {file_size:,} bytes in {download_time:.2f}s for {task.attachment_key}")
# Convert to markdown with timeout protection
convert_start = time.time()
logger.info(f"[TIMEOUT_BG] MarkItDown: Converting to markdown for {task.attachment_key} with {CONVERSION_TIMEOUT}s timeout")
try:
content = await asyncio.wait_for(
asyncio.get_event_loop().run_in_executor(
self.thread_pool,
convert_to_markdown,
file_path
),
timeout=CONVERSION_TIMEOUT
)
convert_time = time.time() - convert_start
total_time = time.time() - start_time
if content:
logger.info(f"[TIMEOUT_BG] MarkItDown: Successfully converted {len(content):,} characters in {convert_time:.2f}s (total: {total_time:.2f}s) for {task.attachment_key}")
return content
else:
logger.warning(f"[TIMEOUT_BG] MarkItDown: Conversion returned empty content in {convert_time:.2f}s for {task.attachment_key}")
return ""
except asyncio.TimeoutError:
convert_time = time.time() - convert_start
total_time = time.time() - start_time
logger.error(f"[TIMEOUT_BG] MarkItDown: Conversion TIMED OUT after {convert_time:.2f}s (limit: {CONVERSION_TIMEOUT}s, total: {total_time:.2f}s) for {task.attachment_key}")
raise TimeoutError(f"PDF conversion timed out after {CONVERSION_TIMEOUT} seconds")
else:
download_time = time.time() - download_start
logger.warning(f"[TIMEOUT_BG] MarkItDown: File not downloaded after {download_time:.2f}s for {task.attachment_key}")
logger.info(f"[TIMEOUT_BG] MarkItDown: Directory contents: {os.listdir(tmpdir)}")
return ""
except asyncio.TimeoutError:
download_time = time.time() - download_start
total_time = time.time() - start_time
logger.error(f"[TIMEOUT_BG] MarkItDown: Download TIMED OUT after {download_time:.2f}s (limit: {DOWNLOAD_TIMEOUT}s, total: {total_time:.2f}s) for {task.attachment_key}")
raise TimeoutError(f"File download timed out after {DOWNLOAD_TIMEOUT} seconds")
except TimeoutError as timeout_error:
total_time = time.time() - start_time
logger.error(f"[TIMEOUT_BG] MarkItDown extraction timed out for {task.attachment_key}: {timeout_error} (total time: {total_time:.2f}s)")
return ""
except Exception as e:
total_time = time.time() - start_time
logger.error(f"[TIMEOUT_BG] MarkItDown extraction failed for {task.attachment_key} after {total_time:.2f}s: {e}")
import traceback
traceback.print_exc()
return ""
async def _process_fulltext_chunks(self, task: FullTextExtractionTask,
full_text: str, worker_id: str) -> int:
"""Process full-text into chunks and create embeddings."""
try:
# Import components needed for embedding
from .embedding_service import create_embedding_provider
from .vector_client import create_vector_client
# Create embedding provider directly to avoid sync/async issues
embedding_config = self.config.get("embedding", {})
embedding_provider = await create_embedding_provider(embedding_config)
# Create vector client for storing embeddings
vector_client = create_vector_client(self.config.get("database", {}))
chunk_count = 0
embedding_items = []
# Create callback to process chunks as they are created
async def chunk_callback(chunk_text: str, chunk_index: int):
nonlocal chunk_count, embedding_items
try:
# Generate embedding for this chunk
embedding = await embedding_provider.embed_single_text(chunk_text)
if not embedding:
logger.warning(f"No embedding generated for chunk {chunk_index} of {task.attachment_key}")
return
# Create embedding item
embedding_item = EmbeddingItem(
item_key=f"{task.attachment_key}_chunk_{chunk_index}",
item_type="attachment",
title=task.filename or f"Attachment {task.attachment_key}",
content=chunk_text,
content_hash=hashlib.sha256(chunk_text.encode()).hexdigest(),
embedding=embedding,
embedding_model=embedding_provider.get_model_name(),
embedding_provider=embedding_provider.get_provider_name(),
metadata={
"attachment_content_type": task.content_type,
"attachment_filename": task.filename,
"attachment_file_size": task.file_size
},
content_type="fulltext",
parent_item_key=task.parent_item_key,
parent_attachment_key=task.attachment_key,
chunk_index=chunk_index,
chunk_total=0 # Will be set after we know total count
)
embedding_items.append(embedding_item)
chunk_count += 1
# Batch store embeddings every 10 chunks
if len(embedding_items) >= 10:
vector_client.upsert_embeddings(embedding_items)
logger.debug(f"Stored batch of {len(embedding_items)} embeddings for {task.attachment_key}")
embedding_items.clear()
except Exception as e:
logger.error(f"Error processing chunk {chunk_index} for {task.attachment_key}: {e}")
# Process full-text using streaming approach
chunking_config = validate_chunking_config(self.config.get("chunking", {}))
total_chunks = await create_streaming_fulltext_chunks(
full_text=full_text,
config=chunking_config,
chunk_callback=chunk_callback,
item_key=task.attachment_key
)
# Store any remaining embeddings
if embedding_items:
vector_client.upsert_embeddings(embedding_items)
logger.debug(f"Stored final batch of {len(embedding_items)} embeddings for {task.attachment_key}")
logger.info(f"Created {total_chunks} embeddings for {task.attachment_key} ({len(full_text):,} characters)")
return total_chunks
except Exception as e:
logger.error(f"Error processing fulltext chunks for {task.attachment_key}: {e}")
return 0
def get_processing_stats(self) -> Dict[str, Any]:
"""Get current processing statistics."""
return {
"enabled": self.enabled,
"is_running": self.is_running,
"total_queued": self.stats.total_queued,
"total_processed": self.stats.total_processed,
"total_completed": self.stats.total_completed,
"total_failed": self.stats.total_failed,
"total_skipped": self.stats.total_skipped,
"current_processing": self.stats.current_processing,
"queue_size": self.task_queue.qsize(),
"active_workers": len(self.worker_tasks),
"processing_items": list(self.processing_items.keys()),
"uptime": str(datetime.now() - self.stats.start_time) if self.stats.start_time else None
}
def get_status(self) -> Dict[str, Any]:
"""Get detailed status information for monitoring tools."""
return {
"enabled": self.enabled,
"workers_running": len([t for t in self.worker_tasks if not t.done()]),
"total_workers": self.num_workers,
"processing_active": self.is_running,
"queue_info": {
"size": self.task_queue.qsize(),
"processing_count": self.stats.current_processing,
"empty": self.task_queue.empty()
},
"processing_stats": {
"total_processed": self.stats.total_processed,
"successful": self.stats.total_completed,
"failed": self.stats.total_failed,
"skipped": self.stats.total_skipped,
"avg_processing_time": self._calculate_avg_processing_time()
},
"configuration": {
"max_workers": self.num_workers,
"batch_size": self.batch_size,
"max_retries": self.retry_attempts,
"streaming_threshold_mb": 1, # Static for now
"extraction_methods": self.config.get("semantic_search", {}).get("fulltext_indexing", {}).get(
"extraction_strategies", ["zotero_api", "markitdown"]
)
}
}
def _calculate_avg_processing_time(self) -> str:
"""Calculate average processing time."""
if self.stats.total_processed > 0 and self.stats.start_time:
total_time = datetime.now() - self.stats.start_time
avg_seconds = total_time.total_seconds() / self.stats.total_processed
return f"{avg_seconds:.1f}s"
return "N/A"
def queue_item(self, item_key: str, force_reprocess: bool = False) -> bool:
"""Queue a specific item for full-text processing."""
try:
if not self.enabled:
return False
# Get item attachments from Zotero
children = self.zotero_client.children(item_key)
queued_any = False
for child in children:
child_data = child.get("data", {})
if child_data.get("itemType") == "attachment":
content_type = child_data.get("contentType", "")
if content_type in self.supported_types:
attachment_key = child.get("key", "")
filename = child_data.get("filename", "")
# Check if already processed (unless forced)
if not force_reprocess:
attachment = self.db_manager.get_attachment_by_key(attachment_key)
if attachment and attachment.get("status") in ["completed", "processing"]:
continue
# Calculate file hash
file_hash = hashlib.sha256(
f"{attachment_key}:{child_data.get('md5', '')}".encode()
).hexdigest()
# Create and queue task
task = FullTextExtractionTask(
parent_item_key=item_key,
attachment_key=attachment_key,
content_type=content_type,
filename=filename,
file_size=len(child_data.get("md5", "")) * 1000, # Estimate
file_hash=file_hash,
priority=1
)
# Update database tracking
self.db_manager.upsert_attachment(
parent_item_key=item_key,
attachment_key=attachment_key,
content_type=content_type,
filename=filename,
file_size=task.file_size,
file_hash=file_hash
)
# Queue for processing
asyncio.create_task(self.task_queue.put(task))
self.stats.total_queued += 1
queued_any = True
return queued_any
except Exception as e:
logger.error(f"Error queuing item {item_key}: {e}")
return False
def pause_processing(self) -> bool:
"""Pause background processing."""
if self.is_running:
self.is_running = False
logger.info("Full-text processing paused")
return True
return False
def resume_processing(self) -> bool:
"""Resume background processing."""
if not self.is_running and self.enabled:
# Restart workers
asyncio.create_task(self.start_background_workers())
logger.info("Full-text processing resumed")
return True
return False
def clear_queue(self) -> int:
"""Clear the processing queue and return count of cleared items."""
cleared_count = 0
try:
# Count items in queue
while not self.task_queue.empty():
try:
self.task_queue.get_nowait()
cleared_count += 1
except asyncio.QueueEmpty:
break
logger.info(f"Cleared {cleared_count} items from processing queue")
return cleared_count
except Exception as e:
logger.error(f"Error clearing queue: {e}")
return cleared_count
async def _reset_failed_attachments(self) -> int:
"""
Reset failed attachments to pending status to retry them after code fixes.
This gives previously failed documents another chance when bugs have been fixed.
Only resets attachments that have hit the retry limit.
Returns:
Number of attachments reset to pending
"""
try:
# Use thread pool to avoid blocking
reset_count = await asyncio.get_event_loop().run_in_executor(
self.thread_pool,
self._reset_failed_attachments_sync
)
return reset_count
except Exception as e:
logger.error(f"Error resetting failed attachments: {e}")
return 0
def _reset_failed_attachments_sync(self) -> int:
"""Synchronous version of failed attachment reset."""
try:
conn = self.db_manager.connect()
cursor = conn.cursor()
# Reset failed attachments that have hit the retry limit
cursor.execute("""
UPDATE zotero_attachments
SET extraction_status = 'pending',
retry_count = 0,
extraction_error = NULL,
extraction_started_at = NULL,
extraction_completed_at = NULL,
updated_at = CURRENT_TIMESTAMP
WHERE extraction_status = 'failed'
AND retry_count >= %s
AND content_type = ANY(%s)
""", (self.retry_attempts, self.supported_types))
reset_count = cursor.rowcount
cursor.close()
conn.close()
return reset_count
except Exception as e:
logger.error(f"Error in sync failed attachment reset: {e}")
return 0
def reset_failed_attachments_manual(self) -> int:
"""
Manually reset failed attachments to pending status.
This can be called from management tools or CLI to give failed documents
another chance after code fixes are deployed.
Returns:
Number of attachments reset
"""
try:
conn = self.db_manager.connect()
cursor = conn.cursor()
# Get count before reset
cursor.execute("""
SELECT COUNT(*) FROM zotero_attachments
WHERE extraction_status = 'failed'
AND content_type = ANY(%s)
""", (self.supported_types,))
total_failed = cursor.fetchone()[0] if cursor.rowcount > 0 else 0
# Reset all failed attachments (not just those at retry limit)
cursor.execute("""
UPDATE zotero_attachments
SET extraction_status = 'pending',
retry_count = 0,
extraction_error = NULL,
extraction_started_at = NULL,
extraction_completed_at = NULL,
updated_at = CURRENT_TIMESTAMP
WHERE extraction_status = 'failed'
AND content_type = ANY(%s)
""", (self.supported_types,))
reset_count = cursor.rowcount
cursor.close()
conn.close()
logger.info(f"Manually reset {reset_count} of {total_failed} failed attachments to pending")
return reset_count
except Exception as e:
logger.error(f"Error manually resetting failed attachments: {e}")
return 0
def create_fulltext_indexing_manager(config_path: Optional[str] = None) -> FullTextIndexingManager:
"""Create a FullTextIndexingManager instance."""
return FullTextIndexingManager(config_path)