Skip to main content
Glama
FileData.py18.2 kB
# archive_agent/data/FileData.py # Copyright © 2025 Dr.-Ing. Paul Wilhelm <paul@wilhelm.dev> # This file is part of Archive Agent. See LICENSE for details. from archive_agent import __version__ import uuid from typing import List, Optional, Dict, Callable, Any from archive_agent.core.ProgressManager import ProgressManager, ProgressInfo from PIL import Image from qdrant_client.models import PointStruct from archive_agent.data.processor.EmbedProcessor import EmbedProcessor from archive_agent.db.QdrantSchema import QdrantPayload from archive_agent.ai.AiManager import AiManager from archive_agent.ai.AiManagerFactory import AiManagerFactory from archive_agent.ai.chunk.AiChunk import ChunkSchema from archive_agent.ai.vision.AiVisionEntity import AiVisionEntity from archive_agent.ai.vision.AiVisionOCR import AiVisionOCR from archive_agent.ai.vision.AiVisionSchema import VisionSchema from archive_agent.config.DecoderSettings import DecoderSettings from archive_agent.data.DocumentContent import DocumentContent from archive_agent.util.format import get_point_page_line_info, format_filename_short from archive_agent.data.loader.pdf import is_pdf_document, load_pdf_document from archive_agent.data.loader.image import is_image, load_image from archive_agent.data.loader.text import is_plaintext, load_plaintext from archive_agent.data.loader.text import is_ascii_document, load_ascii_document from archive_agent.data.loader.text import is_binary_document, load_binary_document from archive_agent.util.image_util import image_resize_safe, image_to_base64 from archive_agent.data.chunk import get_chunks_with_reference_ranges, get_sentences_with_reference_ranges DecoderCallable = Callable[[ProgressInfo], Optional[DocumentContent]] class FileData: def __init__( self, ai_factory: AiManagerFactory, decoder_settings: DecoderSettings, file_path: str, file_meta: Dict[str, Any], max_workers_vision: int, max_workers_embed: int, ): """ Initialize file data. :param ai_factory: AI manager factory for creating instances. :param decoder_settings: Decoder settings. :param file_path: Path to the file. :param file_meta: File metadata. :param max_workers_vision: Max. workers for vision. :param max_workers_embed: Max. workers for embedding. """ # Core dependencies and configuration self.ai_factory = ai_factory self.ai = ai_factory.get_ai() # Primary AI instance for vision, chunking, config self.decoder_settings = decoder_settings self.chunk_lines_block = self.ai.chunk_lines_block self.max_workers_vision = max_workers_vision self.max_workers_embed = max_workers_embed # File metadata and logging self.file_path = file_path self.file_meta = file_meta self.logger = self.ai.cli.get_prefixed_logger(prefix=format_filename_short(self.file_path)) # Processing components self.chunk_processor = EmbedProcessor(ai_factory, self.logger, file_path, self.max_workers_embed) self.points: List[PointStruct] = [] # Vision callback configuration based on AI provider capabilities self.image_to_text_callback_combined = self.image_to_text_combined if self.ai.ai_provider.supports_vision else None self.image_to_text_callback_entity = self.image_to_text_entity if self.ai.ai_provider.supports_vision else None self.image_to_text_callback_ocr = self.image_to_text_ocr if self.ai.ai_provider.supports_vision else None self.image_to_text_callback_page = self.image_to_text_callback_ocr # Select appropriate vision callback based on decoder settings if self.decoder_settings.image_ocr and self.decoder_settings.image_entity_extract: self.image_to_text_callback_image = self.image_to_text_callback_combined elif self.decoder_settings.image_ocr: self.image_to_text_callback_image = self.image_to_text_ocr elif self.decoder_settings.image_entity_extract: self.image_to_text_callback_image = self.image_to_text_callback_entity else: self.image_to_text_callback_image = None # Determine decoder function based on file type self.decoder_func: Optional[DecoderCallable] = self.get_decoder_func() def get_decoder_func(self) -> Optional[DecoderCallable]: """ Determine the appropriate decoder function based on file type. :return: Decoder function or None if unsupported. """ if is_image(self.file_path): return lambda progress_info: load_image( ai_factory=self.ai_factory, logger=self.logger, file_path=self.file_path, image_to_text_callback=self.image_to_text_callback_image, progress_info=progress_info, ) elif is_plaintext(self.file_path): return lambda progress_info: load_plaintext( logger=self.logger, file_path=self.file_path, ) elif is_ascii_document(self.file_path): return lambda progress_info: load_ascii_document( logger=self.logger, file_path=self.file_path, ) elif is_binary_document(self.file_path): return lambda progress_info: load_binary_document( ai_factory=self.ai_factory, logger=self.logger, verbose=self.ai.cli.VERBOSE_LOADER, file_path=self.file_path, max_workers_vision=self.max_workers_vision, image_to_text_callback=self.image_to_text_callback_image, progress_info=progress_info, ) elif is_pdf_document(self.file_path): return lambda progress_info: load_pdf_document( ai_factory=self.ai_factory, logger=self.logger, verbose=self.ai.cli.VERBOSE_LOADER, file_path=self.file_path, max_workers_vision=self.max_workers_vision, image_to_text_callback_page=self.image_to_text_callback_page, image_to_text_callback_image=self.image_to_text_callback_image, decoder_settings=self.decoder_settings, progress_info=progress_info, ) return None def is_processable(self) -> bool: """ Check if the file is processable based on decoder availability. :return: True if processable, False otherwise. """ return self.decoder_func is not None # IMAGE PROCESSING AND VISION CALLBACKS def image_to_text(self, ai: AiManager, image: Image.Image) -> Optional[VisionSchema]: """ Convert image to RGB if needed, resize, and process with AI vision. :param ai: AI manager. :param image: PIL Image object. :return: VisionSchema result or None if failed. """ if image.mode != "RGB": self.logger.debug(f"Converted image from '{image.mode}' to 'RGB'") image = image.convert("RGB") image_possibly_resized = image_resize_safe(image=image, logger=self.logger, verbose=self.ai.cli.VERBOSE_VISION) if image_possibly_resized is None: self.logger.warning(f"Failed to resize image") return None image_base64 = image_to_base64(image_possibly_resized) vision_result = ai.vision(image_base64) if vision_result.is_rejected: self.logger.error(f"⚠️ Image rejected: \"{vision_result.rejection_reason}\"") return None return vision_result def image_to_text_ocr(self, ai: AiManager, image: Image.Image, progress_info: ProgressInfo) -> Optional[str]: """ Request vision with OCR on the image and format the result. :param ai: AI manager. :param image: PIL Image object. :param progress_info: Progress tracking information. :return: OCR text or None if failed. """ if self.ai.cli.VERBOSE_VISION: self.logger.info("Requesting vision feature: OCR") ai.request_ocr() vision_result = self.image_to_text(ai=ai, image=image) progress_info.progress_manager.update_task(progress_info.parent_key, advance=1) if vision_result is not None: return AiVisionOCR.format_vision_answer(vision_result=vision_result) else: return None def image_to_text_entity(self, ai: AiManager, image: Image.Image, progress_info: ProgressInfo) -> Optional[str]: """ Request vision with entity extraction on the image and format the result. :param ai: AI manager. :param image: PIL Image object. :param progress_info: Progress tracking information. :return: Entity text or None if failed. """ if self.ai.cli.VERBOSE_VISION: self.logger.info("Requesting vision feature: Entity Extraction") ai.request_entity() vision_result = self.image_to_text(ai=ai, image=image) progress_info.progress_manager.update_task(progress_info.parent_key, advance=1) if vision_result is not None: return AiVisionEntity.format_vision_answer(vision_result=vision_result) else: return None def image_to_text_combined(self, ai: AiManager, image: Image.Image, progress_info: ProgressInfo) -> Optional[str]: """ Request vision with OCR and entity extraction on the image, format and join the results. :param ai: AI manager. :param image: PIL Image object. :param progress_info: Progress tracking information. :return: Combined text or None if any part failed. """ if self.ai.cli.VERBOSE_VISION: self.logger.info("Requesting vision features: OCR, Entity Extraction") ai.request_ocr() vision_result_ocr = self.image_to_text(ai=ai, image=image) progress_info.progress_manager.update_task(progress_info.parent_key, advance=1) if vision_result_ocr is None: return None text_ocr = AiVisionOCR.format_vision_answer(vision_result=vision_result_ocr) ai.request_entity() vision_result_entity = self.image_to_text(ai=ai, image=image) progress_info.progress_manager.update_task(progress_info.parent_key, advance=1) if vision_result_entity is None: return None text_entity = AiVisionEntity.format_vision_answer(vision_result=vision_result_entity) # Join with a single space return text_ocr + " " + text_entity def decode(self, progress_info: ProgressInfo) -> Optional[DocumentContent]: """ Decode the file using the determined decoder function. :param progress_info: Progress tracking information :return: DocumentContent or None if failed or unsupported """ if self.decoder_func is not None: try: return self.decoder_func(progress_info) except Exception as e: self.logger.error(f"Failed to process file: {e}") return None self.logger.warning(f"Cannot process file") return None # AI CHUNKING CALLBACK # noinspection PyMethodMayBeStatic def chunk_callback(self, ai: AiManager, block_of_sentences: List[str]) -> ChunkSchema: """ Callback for chunking a block of sentences using AI. :param ai: AI manager. :param block_of_sentences: List of sentences to chunk. :return: ChunkSchema result. """ chunk_result = ai.chunk(block_of_sentences) return chunk_result def process(self, progress_manager: ProgressManager, file_progress_key: str) -> bool: """ Process the file through the complete RAG pipeline: Phase 1: Document decoding and vision processing (PDF/Binary only) Phase 2: Sentence extraction and AI chunking Phase 3: Reference range analysis and setup Phase 4: Parallel embedding and vector point creation :param progress_manager: Progress manager for progress reporting. :param file_progress_key: File progress key for progress tracking. :return: True if successful, False otherwise. """ # PHASE 1: Document Decoding and Image Processing vision_progress_key = None if is_pdf_document(self.file_path) or is_binary_document(self.file_path): # Use generic interface - create child task under file vision_progress_key = progress_manager.start_task("Image Processing", parent=file_progress_key, weight=0.33) progress_manager.activate_task(vision_progress_key) # Call the loader function assigned to this file data. # NOTE: DocumentContent is an array of text lines, mapped to page or line numbers. # Create ProgressInfo for clean parameter passing if vision_progress_key: vision_progress_info = progress_manager.create_progress_info(vision_progress_key) else: # For text-only files, decode() reports against file-level progress vision_progress_info = progress_manager.create_progress_info(file_progress_key) doc_content: Optional[DocumentContent] = self.decode(vision_progress_info) # Complete image processing phase if it was created if vision_progress_key is not None: progress_manager.complete_task(vision_progress_key) # Decoder may fail, e.g. on I/O error, exhausted AI attempts, … if doc_content is None: self.logger.warning(f"Failed to decode document") return False # PHASE 2: Sentence Extraction and AI Chunking doc_content.strip_lines() # Use preprocessing and NLP (spaCy) to split text into sentences, keeping track of references. if self.ai.cli.VERBOSE_CHUNK: self.logger.info(f"Extracting sentences across ({len(doc_content.lines)}) lines") sentences_with_reference_ranges = get_sentences_with_reference_ranges(doc_content) # Create chunking phase - use generic interface has_vision = is_pdf_document(self.file_path) or is_binary_document(self.file_path) chunking_weight = 0.34 if has_vision else 0.50 chunking_progress_key = progress_manager.start_task("Chunking", parent=file_progress_key, weight=chunking_weight) progress_manager.activate_task(chunking_progress_key) # Group sentences into chunks, keeping track of references. if self.ai.cli.VERBOSE_CHUNK: self.logger.info(f"Extracting chunks across ({len(sentences_with_reference_ranges)}) sentences") chunks = get_chunks_with_reference_ranges( ai_factory=self.ai_factory, sentences_with_references=sentences_with_reference_ranges, chunk_callback=self.chunk_callback, chunk_lines_block=self.chunk_lines_block, file_path=self.file_path, progress_info=progress_manager.create_progress_info(chunking_progress_key), logger=self.logger, verbose=self.ai.cli.VERBOSE_CHUNK, ) # Complete chunking phase progress_manager.complete_task(chunking_progress_key) # PHASE 3: Reference Range Analysis and Point Creation Setup is_page_based = doc_content.pages_per_line is not None if is_page_based: max_page = max(doc_content.pages_per_line) if doc_content.pages_per_line else 0 reference_total_info = f"{max_page}" else: max_line = max(doc_content.lines_per_line) if doc_content.lines_per_line else 0 reference_total_info = f"{max_line}" # Create embedding phase - use generic interface embedding_weight = 0.33 if has_vision else 0.50 embedding_progress_key = progress_manager.start_task( "Embedding", parent=file_progress_key, weight=embedding_weight, total=len(chunks) ) progress_manager.activate_task(embedding_progress_key) # PHASE 4: Parallel Embedding and Vector Point Creation # Process chunks in parallel for embedding embedding_results = self.chunk_processor.process_chunks_parallel( chunks=chunks, verbose=self.ai.cli.VERBOSE_CHUNK, progress_info=progress_manager.create_progress_info(embedding_progress_key) ) # Process results and create points for chunk_index, (chunk, vector) in enumerate(embedding_results): if vector is None: self.logger.warning(f"Failed to embed chunk ({chunk_index + 1}) / ({len(chunks)})") continue payload_model = QdrantPayload( file_path=self.file_path, file_mtime=self.file_meta['mtime'], chunk_index=chunk_index, chunks_total=len(chunks), chunk_text=chunk.text, version=f"v{__version__}", page_range=None, line_range=None, ) min_r, max_r = chunk.reference_range range_list = [min_r, max_r] if min_r != max_r else [min_r] if is_page_based: payload_model.page_range = range_list else: payload_model.line_range = range_list payload = payload_model.model_dump() point = PointStruct( id=str(uuid.uuid4()), vector=vector, payload=payload, ) if self.ai.cli.VERBOSE_CHUNK: self.logger.info( f"Reference for chunk ({chunk_index + 1}) / ({len(chunks)}): " f"{get_point_page_line_info(point)} " f"of {reference_total_info}" ) self.points.append(point) # Complete embedding phase progress_manager.complete_task(embedding_progress_key) return True

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shredEngineer/Archive-Agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server