Skip to main content
Glama
text.py9.54 kB
# Copyright © 2025 Dr.-Ing. Paul Wilhelm <paul@wilhelm.dev> # This file is part of Archive Agent. See LICENSE for details. from logging import Logger import os from typing import Set, Optional, List import io from PIL import Image import zipfile import pypandoc from charset_normalizer import from_path from archive_agent.ai.AiManagerFactory import AiManagerFactory from archive_agent.util.format import format_file from archive_agent.data.loader.image import ImageToTextCallback from archive_agent.data.processor.VisionProcessor import VisionProcessor, VisionRequest from archive_agent.data.loader.image import is_image from archive_agent.util.text_util import utf8_tempfile from archive_agent.util.LineTextBuilder import LineTextBuilder from archive_agent.core.ProgressManager import ProgressInfo from archive_agent.data.DocumentContent import DocumentContent def is_plaintext(file_path: str) -> bool: """ Check for valid plaintext extension. :param file_path: File path. :return: True if valid plaintext extension, False otherwise. """ extensions: Set[str] = {".txt", ".md", ".markdown"} return any(file_path.lower().endswith(ext) for ext in extensions) def load_plaintext( logger: Logger, file_path: str, ) -> Optional[DocumentContent]: """ Load plaintext. :param logger: Logger. :param file_path: File path. :return: Document content if successful, None otherwise. """ try: matches = from_path(file_path) except IOError as e: logger.error(f"Failed to read {format_file(file_path)}: {e}") return None best_match = matches.best() if best_match is None: logger.error(f"Failed to decode {format_file(file_path)}: Best match is None") return None text = str(best_match) return LineTextBuilder(text=text).getDocumentContent() def is_ascii_document(file_path: str) -> bool: """ Check for valid ASCII document extension. :param file_path: File path. :return: True if valid ASCII document extension, False otherwise. """ extensions: Set[str] = {".html", ".htm"} return any(file_path.lower().endswith(ext) for ext in extensions) def load_ascii_document( logger: Logger, file_path: str, ) -> Optional[DocumentContent]: """ Load ASCII document (using Pandoc). :param logger: Logger. :param file_path: File path. :return: Document content if successful, None otherwise. """ file_ext = os.path.splitext(file_path)[1].lower() raw_text_doc = load_plaintext(logger=logger, file_path=file_path) if raw_text_doc is None: return None raw_text = raw_text_doc.text # Only text, not lines, as Pandoc will re-wrap lines # Pandoc refuses `.htm` extension, so make it `.html`. if file_ext == ".htm": file_ext = ".html" tmp_path = utf8_tempfile(raw_text, suffix=file_ext) try: text = pypandoc.convert_file(tmp_path, to="plain", format=file_ext.lstrip("."), extra_args=["--wrap=preserve"]) text = text.encode("utf-8", errors="replace").decode("utf-8") return LineTextBuilder(text=text).getDocumentContent() except Exception as e: logger.error(f"Failed to convert {format_file(file_path)} via Pandoc: {e}") return None finally: try: if tmp_path is not None: os.remove(tmp_path) except Exception as e: logger.error(f"Failed to delete temporary file {tmp_path}: {e}") def is_binary_document(file_path: str) -> bool: """ Check for valid binary document extension. :param file_path: File path. :return: True if valid binary document extension, False otherwise. """ extensions: Set[str] = {".odt", ".docx"} return any(file_path.lower().endswith(ext) for ext in extensions) def load_binary_document( ai_factory: AiManagerFactory, logger: Logger, verbose: bool, file_path: str, max_workers_vision: int, image_to_text_callback: Optional[ImageToTextCallback], progress_info: ProgressInfo, ) -> Optional[DocumentContent]: """ Load binary document (using Pandoc). :param ai_factory: AI manager factory. :param logger: Logger. :param verbose: Enable verbose output. :param file_path: File path. :param max_workers_vision: Max. workers for vision. :param image_to_text_callback: Optional image-to-text callback. :param progress_info: Progress tracking information :return: Document content if successful, None otherwise. """ file_ext = os.path.splitext(file_path)[1].lower() try: text = pypandoc.convert_file(file_path, to="plain", format=file_ext.lstrip("."), extra_args=["--wrap=preserve"]) text = text.encode("utf-8", errors="replace").decode("utf-8") except Exception as e: logger.error(f"Failed to convert {format_file(file_path)} via Pandoc: {e}") return None # Stage 1: Text extraction (unchanged) builder = LineTextBuilder(text=text) # Stage 2: Image extraction (unchanged) images = load_binary_document_images(logger=logger, file_path=file_path) # Stage 3: Vision processing (new function, same logic) image_texts = extract_binary_image_texts( ai_factory, logger, verbose, file_path, max_workers_vision, images, image_to_text_callback, progress_info, ) # Stage 4: Assembly (new function) return build_binary_document_with_images(builder, image_texts) def extract_binary_image_texts( ai_factory: AiManagerFactory, logger: Logger, verbose: bool, file_path: str, max_workers_vision: int, images: List[Image.Image], image_to_text_callback: Optional[ImageToTextCallback], progress_info: ProgressInfo, ) -> List[str]: """ Extract text from binary document images with parallel processing. :param ai_factory: AI manager factory. :param logger: Logger. :param verbose: Enable verbose output. :param file_path: File path. :param max_workers_vision: Max. workers for vision. :param images: List of PIL Images. :param image_to_text_callback: Optional image-to-text callback. :param progress_info: Progress tracking information. :return: List of formatted image texts. """ image_texts = [] if not images: return image_texts if image_to_text_callback is None: logger.warning(f"Image vision is DISABLED in your current configuration") logger.warning(f"IGNORING ({len(images)}) document image(s)") return image_texts # Create VisionProcessor for batch processing vision_processor = VisionProcessor(ai_factory, logger, verbose, file_path, max_workers_vision) vision_requests = [] # Collect all vision requests for image_index, image in enumerate(images): log_header = f"Image ({image_index + 1}) / ({len(images)})" if verbose: logger.info(f"{log_header}: Queueing") # Formatter lambda with consistent bracket logic for binary docs formatter = lambda vision_result: f"[{vision_result}]" if vision_result is not None else "[Unprocessable Image]" vision_request = VisionRequest( image_data=image, callback=image_to_text_callback, formatter=formatter, log_header=f"{log_header}: Converting to text", image_index=image_index, page_index=0 # Binary docs are single-page ) vision_requests.append(vision_request) if not vision_requests: return image_texts # Update progress total now that we know the number of vision requests progress_info.progress_manager.set_total(progress_info.parent_key, len(vision_requests)) # Process all requests in parallel vision_results = vision_processor.process_vision_requests_parallel( vision_requests, progress_info ) return vision_results def build_binary_document_with_images( base_builder: LineTextBuilder, image_texts: List[str] ) -> Optional[DocumentContent]: """ Assemble final document with image texts appended. :param base_builder: Base document builder with text content. :param image_texts: List of formatted image texts. :return: Document content. """ # Append image texts to builder for image_text in image_texts: base_builder.push() base_builder.push(image_text) # Already formatted base_builder.push() return base_builder.getDocumentContent() def load_binary_document_images( logger: Logger, file_path: str, ) -> List[Image.Image]: """ Extract images from binary document. :param logger: Logger. :param file_path: File path. :return: Images. """ images = [] try: with zipfile.ZipFile(file_path, "r") as archive: for zip_file_path in archive.namelist(): if is_image(zip_file_path): # noinspection PyUnresolvedReferences with archive.open(zip_file_path) as image_stream: image = Image.open(io.BytesIO(image_stream.read())) image.load() # Prevent lazy I/O; load into memory NOW. images.append(image) except Exception as e: logger.error(f"Failed to extract image(s) from {format_file(file_path)}: {e}") return images

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shredEngineer/Archive-Agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server