Skip to main content
Glama
juanqui
by juanqui
web_document_service.py52 kB
"""Web document service that wraps DocumentProcessor functionality.""" import logging import time import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional from ...background_queue import BackgroundProcessingQueue, Job, JobStatus, JobType, Priority from ...document_processor import DocumentProcessor from ...models import Document from ...vector_store import VectorStore from ..models.web_models import ( ChunkResponse, DocumentDetailResponse, DocumentListResponse, DocumentPreviewResponse, DocumentSummary, DocumentUploadResponse, JobCancelResponse, JobStatusResponse, PaginationParams, ProcessingStatus, ) logger = logging.getLogger(__name__) class WebDocumentService: """Service for document management operations via web interface.""" def __init__( self, document_processor: DocumentProcessor, vector_store: VectorStore, document_cache: Dict[str, Document], save_cache_callback: Optional[callable] = None, background_queue: Optional[BackgroundProcessingQueue] = None, websocket_manager: Optional[Any] = None, ): """Initialize the web document service. Args: document_processor: PDF processing service vector_store: Vector storage service document_cache: Document metadata cache save_cache_callback: Optional callback to save document cache background_queue: Optional background processing queue websocket_manager: Optional WebSocket manager for real-time updates """ self.document_processor = document_processor self.vector_store = vector_store self.document_cache = document_cache self.save_cache_callback = save_cache_callback self.background_queue = background_queue self.websocket_manager = websocket_manager # Store job metadata for status tracking self.job_metadata: Dict[str, Dict[str, Any]] = {} # Track in-progress documents (documents currently being processed) self.in_progress_documents: Dict[str, DocumentSummary] = {} # Clean existing document metadata in cache self._clean_existing_document_metadata() def _get_uploads_directory(self) -> Path: """Get the uploads directory path, creating it if necessary. Returns: Path to the uploads directory inside the knowledgebase. """ uploads_dir = self.document_processor.config.knowledgebase_path / "uploads" uploads_dir.mkdir(parents=True, exist_ok=True) return uploads_dir def _clean_metadata(self, value: Any) -> Any: """Recursively convert metadata to JSON-serializable primitives. - Converts datetime to ISO 8601 UTC strings - Converts Path to str - Converts Enum to its value - Converts sets/tuples to lists - Converts modules/classes/callables to 'module.name' strings - Recursively processes dicts/lists - Falls back to str(...) for unknown types """ # Localized imports to avoid changing module-level imports import enum as _enum import inspect as _inspect import types as _types from datetime import datetime as _dt from pathlib import Path as _Path def _clean(v): if v is None or isinstance(v, (str, int, float, bool)): return v if isinstance(v, _dt): # Ensure timezone-aware ISO 8601 if getattr(v, "tzinfo", None) is None: try: from datetime import timezone as _timezone v = v.replace(tzinfo=_timezone.utc) except Exception: pass else: try: from datetime import timezone as _timezone v = v.astimezone(_timezone.utc) except Exception: pass return v.isoformat() if isinstance(v, _Path): return str(v) if isinstance(v, _enum.Enum): return v.value if isinstance(v, dict): # Clean dictionary more carefully cleaned_dict = {} for k, val in v.items(): try: # Ensure keys are strings and properly cleaned clean_key = str(_clean(k)) if k is not None else "null" clean_val = _clean(val) cleaned_dict[clean_key] = clean_val except Exception as e: logger.debug(f"Failed to clean dict item {k}: {e}") # Fallback: convert problematic items to safe string representations clean_key = str(k) if k is not None else "null" cleaned_dict[clean_key] = str(val) if val is not None else None return cleaned_dict if isinstance(v, (list, tuple, set)): cleaned_list = [] for item in v: try: cleaned_list.append(_clean(item)) except Exception as e: logger.debug(f"Failed to clean list item {item}: {e}") # Fallback: convert problematic items to string cleaned_list.append(str(item) if item is not None else None) return cleaned_list if ( isinstance(v, _types.ModuleType) or _inspect.isclass(v) or _inspect.isfunction(v) or _inspect.ismethod(v) or _inspect.isbuiltin(v) ): name = getattr(v, "__name__", str(v)) module = getattr(v, "__module__", "") return f"{module}.{name}" if module else name # Handle any object with a __dict__ attribute (custom classes) if hasattr(v, "__dict__") and not isinstance(v, (_dt, _Path)): try: # Convert object to dict and clean recursively obj_dict = {} for attr_name, attr_value in v.__dict__.items(): if not attr_name.startswith("_"): # Skip private attributes obj_dict[attr_name] = _clean(attr_value) return obj_dict except Exception as e: logger.debug(f"Failed to convert object {type(v)} to dict: {e}") return ( f"{type(v).__module__}.{type(v).__name__}" if hasattr(type(v), "__module__") else str(type(v)) ) try: # Try JSON serialization test import json json.dumps(v) return v # If it passes JSON serialization, return as-is except (TypeError, ValueError): pass try: return str(v) except Exception: return repr(v) return _clean(value) def _clean_existing_document_metadata(self) -> None: """Clean metadata for all existing documents in the cache.""" try: cleaned_count = 0 for document_id, document in self.document_cache.items(): if document.metadata: original_metadata = document.metadata.copy() cleaned_metadata = self._clean_metadata(document.metadata) # Only update if there were changes if cleaned_metadata != original_metadata: document.metadata = cleaned_metadata cleaned_count += 1 if cleaned_count > 0: logger.info(f"Cleaned metadata for {cleaned_count} existing documents") # Save the cleaned cache if self.save_cache_callback: try: # Try to run the save callback import asyncio if asyncio.iscoroutinefunction(self.save_cache_callback): # If it's async, we can't run it from __init__, log a warning logger.warning( "Cannot save cleaned metadata synchronously - " "cache will be saved on next document operation" ) else: self.save_cache_callback() except Exception as e: logger.warning(f"Failed to save cleaned document cache: {e}") except Exception as e: logger.error(f"Error cleaning existing document metadata: {e}") async def list_documents( self, pagination: PaginationParams, metadata_filter: Optional[Dict[str, Any]] = None, ) -> DocumentListResponse: """List documents with pagination and optional filtering. Args: pagination: Pagination parameters metadata_filter: Optional metadata filters Returns: DocumentListResponse with paginated document list """ try: # Get all documents from cache all_documents = list(self.document_cache.values()) # Apply metadata filter if provided if metadata_filter: filtered_docs = [] for doc in all_documents: matches = True for key, value in metadata_filter.items(): if key not in doc.metadata or doc.metadata[key] != value: matches = False break if matches: filtered_docs.append(doc) all_documents = filtered_docs # Convert completed documents to DocumentSummary objects, ensuring metadata is clean document_summaries = [] for doc in all_documents: # Clean document metadata before converting to summary if doc.metadata: doc.metadata = self._clean_metadata(doc.metadata) document_summaries.append(self._document_to_summary(doc)) # Add in-progress documents to the list for in_progress_doc in self.in_progress_documents.values(): # Apply metadata filter to in-progress documents too if metadata_filter: # For in-progress docs, we don't have full metadata, so just skip filtering pass document_summaries.append(in_progress_doc) # Sort by added_at descending (newest first) document_summaries.sort(key=lambda d: d.added_at or d.updated_at, reverse=True) # Calculate pagination total_count = len(document_summaries) total_pages = (total_count + pagination.page_size - 1) // pagination.page_size start_idx = pagination.offset end_idx = min(start_idx + pagination.page_size, total_count) # Get documents for current page page_documents = document_summaries[start_idx:end_idx] # Backfill chunk_count for completed documents that were populated without chunks for doc_summary in page_documents: # Only backfill for completed documents (they exist in document_cache) if ( doc_summary.processing_status == ProcessingStatus.COMPLETED and doc_summary.id in self.document_cache ): try: doc = self.document_cache[doc_summary.id] if getattr(doc, "chunk_count", 0) == 0: chunks = await self.vector_store.get_document_chunks(doc.id) doc.chunk_count = len(chunks) # Update the summary too doc_summary.chunk_count = len(chunks) except Exception as e: logger.debug(f"Could not backfill chunk_count for document {doc_summary.id}: {e}") return DocumentListResponse( documents=page_documents, total_count=total_count, page=pagination.page, page_size=pagination.page_size, total_pages=total_pages, has_next=pagination.page < total_pages, has_previous=pagination.page > 1, ) except Exception as e: logger.error(f"Error listing documents: {e}") raise async def get_document_detail(self, document_id: str, include_chunks: bool = False) -> DocumentDetailResponse: """Get detailed information about a document. Args: document_id: Document ID include_chunks: Whether to include document chunks Returns: DocumentDetailResponse with document details """ try: if document_id not in self.document_cache: raise ValueError(f"Document not found: {document_id}") document = self.document_cache[document_id] # Ensure metadata is clean before proceeding if document.metadata: document.metadata = self._clean_metadata(document.metadata) document_summary = self._document_to_summary(document) chunks = None if include_chunks: # Prefer in-memory chunks; fall back to vector store if not present chunk_source = document.chunks if not chunk_source: try: chunk_source = await self.vector_store.get_document_chunks(document_id) if chunk_source: # Backfill count for accurate summaries/status document.chunk_count = len(chunk_source) except Exception as e: logger.debug(f"Failed to fetch chunks for {document_id} from vector store: {e}") chunk_source = [] if chunk_source: chunks = [ ChunkResponse( id=chunk.id, text=getattr(chunk, "text", ""), page_number=getattr(chunk, "page_number", None), chunk_index=getattr(chunk, "chunk_index", 0), metadata=self._clean_metadata(getattr(chunk, "metadata", {})), ) for chunk in chunk_source ] # Clean and log metadata for debugging original_metadata = document.metadata.copy() if document.metadata else {} cleaned_metadata = self._clean_metadata(document.metadata) if document.metadata else {} if document.metadata and cleaned_metadata != original_metadata: logger.info( f"Document {document_id} metadata cleaned - " f"element_types: {original_metadata.get('element_types')} -> " f"{cleaned_metadata.get('element_types')}" ) return DocumentDetailResponse( document=document_summary, chunks=chunks, metadata=cleaned_metadata, ) except Exception as e: logger.error(f"Error getting document detail: {e}") raise async def get_document_chunks(self, document_id: str) -> List[ChunkResponse]: """Get all chunks for a document. Args: document_id: Document ID Returns: List of ChunkResponse objects """ try: if document_id not in self.document_cache: raise ValueError(f"Document not found: {document_id}") document = self.document_cache[document_id] # Prefer in-memory chunks; fall back to vector store if not present chunk_source = document.chunks if not chunk_source: try: chunk_source = await self.vector_store.get_document_chunks(document_id) except Exception as e: logger.error(f"Failed to fetch chunks for document {document_id}: {e}") raise return [ ChunkResponse( id=chunk.id, text=getattr(chunk, "text", ""), page_number=getattr(chunk, "page_number", None), chunk_index=getattr(chunk, "chunk_index", 0), metadata=self._clean_metadata(getattr(chunk, "metadata", {})), ) for chunk in chunk_source ] except Exception as e: logger.error(f"Error getting document chunks: {e}") raise async def get_document_preview(self, document_id: str) -> DocumentPreviewResponse: """Get document preview/content. Args: document_id: Document ID Returns: DocumentPreviewResponse with document content """ try: if document_id not in self.document_cache: raise ValueError(f"Document not found: {document_id}") document = self.document_cache[document_id] # Combine all chunk texts to create full content content_parts = [] chunk_source = document.chunks if not chunk_source: try: chunk_source = await self.vector_store.get_document_chunks(document_id) except Exception as e: logger.debug(f"Failed to fetch chunks for preview {document_id}: {e}") chunk_source = [] for chunk in sorted(chunk_source, key=lambda c: getattr(c, "chunk_index", 0)): content_parts.append(getattr(chunk, "text", "")) content = "\n\n".join(content_parts) return DocumentPreviewResponse( document_id=document_id, title=document.title, content=content, page_count=document.page_count, content_type="text/plain", ) except Exception as e: logger.error(f"Error getting document preview: {e}") raise def _create_in_progress_document( self, job_id: str, filename: str, file_size: int = 0, temp_path: Optional[str] = None ) -> DocumentSummary: """Create a placeholder document for in-progress processing. Args: job_id: Background job ID filename: Original filename file_size: File size in bytes temp_path: Temporary file path (for size calculation) Returns: DocumentSummary for the in-progress document """ # Try to get file size if not provided if file_size == 0 and temp_path: try: file_size = Path(temp_path).stat().st_size except Exception: file_size = 0 # Generate a temporary document ID based on job_id document_id = f"processing_{job_id}" return DocumentSummary( id=document_id, title=None, filename=filename, path=f"processing/{filename}", # Placeholder path file_size=file_size, page_count=0, # Unknown until processed chunk_count=0, # Unknown until processed added_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), has_embeddings=False, # Will be True once processed checksum="", # Unknown until processed processing_status=ProcessingStatus.PROCESSING, job_id=job_id, processing_error=None, ) def _remove_in_progress_document(self, job_id: str) -> None: """Remove an in-progress document from tracking. Args: job_id: Job ID to remove """ # Find and remove the in-progress document for this job to_remove = [] for doc_id, doc_summary in self.in_progress_documents.items(): if doc_summary.job_id == job_id: to_remove.append(doc_id) for doc_id in to_remove: del self.in_progress_documents[doc_id] logger.debug(f"Removed in-progress document {doc_id} for job {job_id}") def _update_in_progress_document_status( self, job_id: str, status: ProcessingStatus, error_msg: Optional[str] = None ) -> None: """Update the status of an in-progress document. Args: job_id: Job ID to update status: New processing status error_msg: Error message if status is FAILED """ # Find and update the in-progress document for this job for doc_id, doc_summary in self.in_progress_documents.items(): if doc_summary.job_id == job_id: doc_summary.processing_status = status if error_msg: doc_summary.processing_error = error_msg doc_summary.updated_at = datetime.now(timezone.utc) logger.debug(f"Updated in-progress document {doc_id} status to {status}") break async def upload_document( self, file_content: bytes, filename: str, metadata: Optional[Dict[str, Any]] = None ) -> DocumentUploadResponse: """Upload and process a document from file content. Args: file_content: Raw file content filename: Original filename metadata: Optional metadata Returns: DocumentUploadResponse with processing results """ # If background queue is available, queue the job if self.background_queue: try: start_time = time.time() # Create temporary file in uploads directory with unique name uploads_dir = self._get_uploads_directory() # Add timestamp and UUID to make filename unique file_stem = Path(filename).stem file_suffix = Path(filename).suffix unique_id = str(uuid.uuid4())[:8] timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") unique_filename = f"{file_stem}_{timestamp}_{unique_id}{file_suffix}" temp_path = uploads_dir / unique_filename temp_path.write_bytes(file_content) # Create job metadata job_metadata = { "operation": "upload_document", "filename": filename, "temp_path": str(temp_path), "original_metadata": metadata or {}, "created_at": datetime.now(timezone.utc).isoformat(), } # Add job to queue job_id = await self.background_queue.add_job( job_type=JobType.WEB_UPLOAD, metadata=job_metadata, priority=Priority.NORMAL, processor=self._process_upload_job, ) # Create in-progress document placeholder in_progress_doc = self._create_in_progress_document( job_id=job_id, filename=filename, file_size=len(file_content), temp_path=str(temp_path) ) self.in_progress_documents[in_progress_doc.id] = in_progress_doc # Store job info for status tracking self.job_metadata[job_id] = { "filename": filename, "operation": "upload_document", "created_at": datetime.now(timezone.utc), "temp_path": str(temp_path), "in_progress_doc_id": in_progress_doc.id, } processing_time = time.time() - start_time logger.info(f"Created in-progress document {in_progress_doc.id} for upload job {job_id}") return DocumentUploadResponse( success=True, job_id=job_id, filename=filename, processing_time=processing_time, message=f"Document upload queued for processing. Job ID: {job_id}", ) except Exception as e: logger.error(f"Error queuing upload job: {e}") return DocumentUploadResponse( success=False, filename=filename, processing_time=0.0, error=str(e), ) # Fallback to synchronous processing if no queue return await self._process_upload_synchronously(file_content, filename, metadata) async def add_document_by_path( self, file_path: str, metadata: Optional[Dict[str, Any]] = None ) -> DocumentUploadResponse: """Add a document by file path. Args: file_path: Path to the PDF file metadata: Optional metadata Returns: DocumentUploadResponse with processing results """ # If background queue is available, queue the job if self.background_queue: try: start_time = time.time() path = Path(file_path) if not path.exists(): return DocumentUploadResponse( success=False, filename=path.name, processing_time=0.0, error=f"File not found: {file_path}", ) # Create job metadata job_metadata = { "operation": "add_document_by_path", "filename": path.name, "file_path": file_path, "original_metadata": metadata or {}, "created_at": datetime.now(timezone.utc).isoformat(), } # Add job to queue job_id = await self.background_queue.add_job( job_type=JobType.WEB_UPLOAD, metadata=job_metadata, priority=Priority.NORMAL, processor=self._process_path_job, ) # Create in-progress document placeholder try: file_size = path.stat().st_size except Exception: file_size = 0 in_progress_doc = self._create_in_progress_document( job_id=job_id, filename=path.name, file_size=file_size, temp_path=file_path ) in_progress_doc.path = file_path # Use actual path instead of placeholder self.in_progress_documents[in_progress_doc.id] = in_progress_doc # Store job info for status tracking self.job_metadata[job_id] = { "filename": path.name, "operation": "add_document_by_path", "created_at": datetime.now(timezone.utc), "file_path": file_path, "in_progress_doc_id": in_progress_doc.id, } processing_time = time.time() - start_time logger.info(f"Created in-progress document {in_progress_doc.id} for path job {job_id}") return DocumentUploadResponse( success=True, job_id=job_id, filename=path.name, processing_time=processing_time, message=f"Document processing queued. Job ID: {job_id}", ) except Exception as e: logger.error(f"Error queuing path job: {e}") return DocumentUploadResponse( success=False, filename=Path(file_path).name, processing_time=0.0, error=str(e), ) # Fallback to synchronous processing if no queue return await self._process_path_synchronously(file_path, metadata) def _is_file_watcher_managed_document(self, document) -> bool: """Check if a document is managed by the file watcher (exists in knowledgebase directory). Args: document: Document to check Returns: True if the document is managed by file watcher and should not be removed via API """ try: from pathlib import Path if not hasattr(document, "path") or not document.path: return False doc_path = Path(document.path).resolve() kb_path = self.document_processor.config.knowledgebase_path.resolve() uploads_path = (kb_path / "uploads").resolve() # Check if document is within knowledgebase directory try: doc_path.relative_to(kb_path) except ValueError: # Document path is not within knowledgebase directory return False # Check if document is NOT in uploads directory (uploads are user-managed) try: doc_path.relative_to(uploads_path) # Document is in uploads directory, so it's user-managed return False except ValueError: # Document is not in uploads, so it could be file-watcher-managed pass # Check if the original file still exists if doc_path.exists(): return True return False except Exception as e: logger.error(f"Error checking if document is file-watcher-managed: {e}") return False async def remove_document(self, document_id: str) -> Dict[str, Any]: """Remove a document from the knowledgebase. Args: document_id: Document ID to remove Returns: Removal result dictionary """ try: if document_id not in self.document_cache: raise ValueError(f"Document not found: {document_id}") document = self.document_cache[document_id] # Check if document is managed by file watcher if self._is_file_watcher_managed_document(document): error_msg = ( f"Cannot remove document '{getattr(document, 'filename', None) or document_id}' as it exists " f"in the knowledgebase directory ({document.path}). To remove this document, delete the " f"file from the filesystem directly." ) logger.warning(f"Attempted to remove file-watcher-managed document via web interface: {document_id}") return { "success": False, "error": error_msg, "error_type": "file_watcher_managed", "document_path": document.path, } start_time = time.time() # Remove from vector store await self.vector_store.delete_document(document_id) # For user-uploaded documents, also remove the physical file document_path = document.path try: file_path = Path(document_path) uploads_dir = self._get_uploads_directory() # Check if this is an uploaded file (in uploads directory) try: file_path.relative_to(uploads_dir) # It's in uploads directory, safe to delete if file_path.exists(): file_path.unlink() logger.info(f"Deleted uploaded file: {file_path}") else: logger.warning(f"Uploaded file not found for deletion: {file_path}") except ValueError: # File is not in uploads directory, don't delete it logger.debug(f"Document file not in uploads directory, preserving: {file_path}") except Exception as e: logger.warning(f"Failed to delete uploaded file {document_path}: {e}") # Remove from document cache del self.document_cache[document_id] if self.save_cache_callback: await self.save_cache_callback() processing_time = time.time() - start_time return { "success": True, "document_id": document_id, "document_path": document.path, "processing_time": processing_time, "message": f"Document {document_id} removed successfully", } except Exception as e: logger.error(f"Error removing document: {e}") return { "success": False, "error": str(e), } async def get_job_status(self, job_id: str) -> JobStatusResponse: """Get the status of a background job. Args: job_id: Job identifier Returns: JobStatusResponse with job status information """ if not self.background_queue: return JobStatusResponse( success=False, job_id=job_id, status=JobStatus.FAILED, error="Background queue not available", ) try: status = await self.background_queue.get_status(job_id) if status is None: return JobStatusResponse( success=False, job_id=job_id, status=JobStatus.FAILED, error="Job not found", ) # Get job metadata if available job_info = self.job_metadata.get(job_id, {}) return JobStatusResponse( success=True, job_id=job_id, status=status, created_at=job_info.get("created_at"), metadata=self._clean_metadata(job_info), ) except Exception as e: logger.error(f"Error getting job status: {e}") return JobStatusResponse( success=False, job_id=job_id, status=JobStatus.FAILED, error=str(e), ) async def cancel_job(self, job_id: str) -> JobCancelResponse: """Cancel a background job. Args: job_id: Job identifier Returns: JobCancelResponse with cancellation result """ if not self.background_queue: return JobCancelResponse( success=False, job_id=job_id, status=JobStatus.FAILED, message="Background queue not available", ) try: success = await self.background_queue.cancel_job(job_id) if success: status = await self.background_queue.get_status(job_id) return JobCancelResponse( success=True, job_id=job_id, status=status or JobStatus.CANCELED, message="Job cancelled successfully", ) else: status = await self.background_queue.get_status(job_id) if status is None: message = "Job not found" elif status in {JobStatus.COMPLETED, JobStatus.FAILED}: message = f"Job already {status.name.lower()}" else: message = "Job could not be cancelled" return JobCancelResponse( success=False, job_id=job_id, status=status or JobStatus.FAILED, message=message, ) except Exception as e: logger.error(f"Error cancelling job: {e}") return JobCancelResponse( success=False, job_id=job_id, status=JobStatus.FAILED, message=f"Error cancelling job: {e}", ) async def _process_upload_job(self, job: Job) -> None: """Process a document upload job in the background. Args: job: Job instance to process """ try: metadata = job.metadata filename = metadata["filename"] temp_path = Path(metadata["temp_path"]) original_metadata = metadata["original_metadata"] logger.info(f"Processing upload job {job.job_id} for file: {filename}") # Send WebSocket notification that processing started if self.websocket_manager: await self.websocket_manager.broadcast_processing_started(filename, job.job_id) # Process the document result = await self.document_processor.process_document(temp_path, original_metadata) if result.success and result.document: # Clean document metadata before storing if result.document.metadata: result.document.metadata = self._clean_metadata(result.document.metadata) # Add to vector store await self.vector_store.add_document(result.document) # Update document cache self.document_cache[result.document.id] = result.document if self.save_cache_callback: await self.save_cache_callback() # Update job metadata with result if job.job_id in self.job_metadata: self.job_metadata[job.job_id].update( { "document_id": result.document.id, "chunks_created": result.chunks_created, "embeddings_generated": result.embeddings_generated, "completed_at": datetime.now(timezone.utc), } ) # Remove in-progress document since processing completed successfully self._remove_in_progress_document(job.job_id) # Send WebSocket notifications if self.websocket_manager: # Processing completed (includes document added information) await self.websocket_manager.broadcast_processing_completed( { "job_id": job.job_id, "document_id": result.document.id, "filename": filename, "title": result.document.title, "path": result.document.path, "chunks_created": result.chunks_created, } ) logger.info(f"Upload job {job.job_id} completed successfully") else: # Job failed error_msg = result.error or "Unknown processing error" if job.job_id in self.job_metadata: self.job_metadata[job.job_id].update( { "error": error_msg, "failed_at": datetime.now(timezone.utc), } ) if self.websocket_manager: await self.websocket_manager.broadcast_processing_failed(filename, error_msg) logger.error(f"Upload job {job.job_id} failed: {error_msg}") raise RuntimeError(error_msg) finally: # Note: We DO NOT clean up uploaded files - they should remain in the uploads directory # as they are user-managed documents that can be removed via the API temp_path = Path(job.metadata.get("temp_path", "")) logger.debug(f"Upload job completed, file preserved at: {temp_path}") async def _process_path_job(self, job: Job) -> None: """Process a document path job in the background. Args: job: Job instance to process """ try: metadata = job.metadata filename = metadata["filename"] file_path = metadata["file_path"] original_metadata = metadata["original_metadata"] logger.info(f"Processing path job {job.job_id} for file: {file_path}") # Send WebSocket notification that processing started if self.websocket_manager: await self.websocket_manager.broadcast_processing_started(filename, job.job_id) path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"File not found: {file_path}") # Process the document result = await self.document_processor.process_document(path, original_metadata) if result.success and result.document: # Clean document metadata before storing if result.document.metadata: result.document.metadata = self._clean_metadata(result.document.metadata) # Add to vector store await self.vector_store.add_document(result.document) # Update document cache self.document_cache[result.document.id] = result.document if self.save_cache_callback: await self.save_cache_callback() # Update job metadata with result if job.job_id in self.job_metadata: self.job_metadata[job.job_id].update( { "document_id": result.document.id, "chunks_created": result.chunks_created, "embeddings_generated": result.embeddings_generated, "completed_at": datetime.now(timezone.utc), } ) # Send WebSocket notifications if self.websocket_manager: # Processing completed (includes document added information) await self.websocket_manager.broadcast_processing_completed( { "job_id": job.job_id, "document_id": result.document.id, "filename": filename, "title": result.document.title, "path": result.document.path, "chunks_created": result.chunks_created, } ) logger.info(f"Path job {job.job_id} completed successfully") else: # Job failed error_msg = result.error or "Unknown processing error" if job.job_id in self.job_metadata: self.job_metadata[job.job_id].update( { "error": error_msg, "failed_at": datetime.now(timezone.utc), } ) if self.websocket_manager: await self.websocket_manager.broadcast_processing_failed(filename, error_msg) logger.error(f"Path job {job.job_id} failed: {error_msg}") raise RuntimeError(error_msg) except Exception as e: error_msg = str(e) if job.job_id in self.job_metadata: self.job_metadata[job.job_id].update( { "error": error_msg, "failed_at": datetime.now(timezone.utc), } ) if self.websocket_manager: filename = job.metadata.get("filename", "unknown") await self.websocket_manager.broadcast_processing_failed(filename, error_msg) logger.error(f"Path job {job.job_id} failed: {error_msg}") raise async def _process_upload_synchronously( self, file_content: bytes, filename: str, metadata: Optional[Dict[str, Any]] = None ) -> DocumentUploadResponse: """Fallback method to process upload synchronously when no queue is available.""" try: start_time = time.time() # Create temporary file in uploads directory with unique name uploads_dir = self._get_uploads_directory() # Add timestamp and UUID to make filename unique file_stem = Path(filename).stem file_suffix = Path(filename).suffix unique_id = str(uuid.uuid4())[:8] timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") unique_filename = f"{file_stem}_{timestamp}_{unique_id}{file_suffix}" temp_path = uploads_dir / unique_filename temp_path.write_bytes(file_content) try: # Process the document result = await self.document_processor.process_document(temp_path, metadata) processing_time = time.time() - start_time if not result.success: return DocumentUploadResponse( success=False, filename=filename, processing_time=processing_time, error=result.error, ) # Add to vector store if successful if result.document: # Clean document metadata before storing if result.document.metadata: result.document.metadata = self._clean_metadata(result.document.metadata) await self.vector_store.add_document(result.document) # Update document cache self.document_cache[result.document.id] = result.document if self.save_cache_callback: await self.save_cache_callback() return DocumentUploadResponse( success=True, document_id=result.document.id if result.document else None, filename=filename, processing_time=processing_time, chunks_created=result.chunks_created, embeddings_generated=result.embeddings_generated, ) finally: # Note: We DO NOT clean up uploaded files - they should remain in the uploads directory # as they are user-managed documents that can be removed via the API logger.debug(f"Upload completed synchronously, file preserved at: {temp_path}") except Exception as e: logger.error(f"Error uploading document synchronously: {e}") return DocumentUploadResponse( success=False, filename=filename, processing_time=time.time() - start_time if "start_time" in locals() else 0.0, error=str(e), ) async def _process_path_synchronously( self, file_path: str, metadata: Optional[Dict[str, Any]] = None ) -> DocumentUploadResponse: """Fallback method to process path synchronously when no queue is available.""" try: start_time = time.time() path = Path(file_path) if not path.exists(): return DocumentUploadResponse( success=False, filename=path.name, processing_time=0.0, error=f"File not found: {file_path}", ) # Process the document result = await self.document_processor.process_document(path, metadata) processing_time = time.time() - start_time if not result.success: return DocumentUploadResponse( success=False, filename=path.name, processing_time=processing_time, error=result.error, ) # Add to vector store if successful if result.document: # Clean document metadata before storing if result.document.metadata: result.document.metadata = self._clean_metadata(result.document.metadata) await self.vector_store.add_document(result.document) # Update document cache self.document_cache[result.document.id] = result.document if self.save_cache_callback: await self.save_cache_callback() return DocumentUploadResponse( success=True, document_id=result.document.id if result.document else None, filename=path.name, processing_time=processing_time, chunks_created=result.chunks_created, embeddings_generated=result.embeddings_generated, ) except Exception as e: logger.error(f"Error adding document by path synchronously: {e}") return DocumentUploadResponse( success=False, filename=Path(file_path).name, processing_time=time.time() - start_time if "start_time" in locals() else 0.0, error=str(e), ) def _document_to_summary(self, document: Document) -> DocumentSummary: """Convert Document to DocumentSummary. Args: document: Document object Returns: DocumentSummary object """ # Ensure document metadata is clean before creating summary if document.metadata: try: original_metadata = document.metadata cleaned_metadata = self._clean_metadata(document.metadata) if cleaned_metadata != original_metadata: logger.debug(f"Cleaned metadata for document {document.id}") document.metadata = cleaned_metadata except Exception as e: logger.error(f"Error cleaning metadata for document {document.id}: {e}") # If chunk_count > 0, the document must have embeddings (vector store only stores embedded chunks) inferred_has_embeddings = bool(getattr(document, "chunk_count", 0) > 0) or bool( getattr(document, "chunks", None) and all(getattr(c, "embedding", None) for c in document.chunks) ) return DocumentSummary( id=document.id, title=document.title, filename=document.filename, path=document.path, file_size=document.file_size, page_count=document.page_count, chunk_count=document.chunk_count, added_at=document.added_at, updated_at=document.updated_at, has_embeddings=inferred_has_embeddings, checksum=document.checksum, processing_status=ProcessingStatus.COMPLETED, # Existing documents are always completed job_id=None, # No job ID for completed documents processing_error=None, # No error for completed documents )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/juanqui/pdfkb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server