calibre_content_reader.py•15.1 kB
"""Calibre Content Reader
Handles extraction of text content from various ebook formats.
Supports EPUB, PDF, TXT, and MOBI formats.
"""
import logging
import re
from pathlib import Path
from typing import Dict, Any, Optional, List
logger = logging.getLogger(__name__)
class CalibreContentReader:
"""Handles content extraction from ebook files."""
def __init__(self, library_path: Path):
"""Initialize content reader."""
self.library_path = library_path
def _extract_epub_text(self, file_path: str) -> str:
"""Extract text content from EPUB file."""
try:
import ebooklib
from ebooklib import epub
from bs4 import BeautifulSoup
book = epub.read_epub(file_path)
content = []
for item in book.get_items():
if item.get_type() == ebooklib.ITEM_DOCUMENT:
soup = BeautifulSoup(item.get_content(), 'html.parser')
text = soup.get_text()
if text.strip():
content.append(text)
return '\n\n'.join(content)
except ImportError:
return "Error: ebooklib and beautifulsoup4 required for EPUB support. Install with: pip install ebooklib beautifulsoup4"
except Exception as e:
return f"Error reading EPUB: {str(e)}"
def _extract_epub_chapters(self, file_path: str) -> List[Dict[str, str]]:
"""Extract text content from EPUB file with chapter information for search."""
try:
import ebooklib
from ebooklib import epub
from bs4 import BeautifulSoup
book = epub.read_epub(file_path)
chapters = []
for item in book.get_items():
if item.get_type() == ebooklib.ITEM_DOCUMENT:
soup = BeautifulSoup(item.get_content(), 'html.parser')
text = soup.get_text()
if text.strip():
# Try to extract chapter title from the item or content
title = item.get_name() or f"Chapter {len(chapters) + 1}"
if soup.find('h1'):
title = soup.find('h1').get_text().strip()
elif soup.find('h2'):
title = soup.find('h2').get_text().strip()
chapters.append({
"title": title,
"content": text,
"item_name": item.get_name()
})
return chapters
except ImportError:
return []
except Exception as e:
logger.error(f"Error extracting EPUB chapters: {e}")
return []
def _extract_pdf_text(self, file_path: str) -> str:
"""Extract text content from PDF file."""
try:
import PyPDF2
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
content = []
for page in reader.pages:
text = page.extract_text()
if text.strip():
content.append(text)
return '\n\n'.join(content)
except ImportError:
return "Error: PyPDF2 required for PDF support. Install with: pip install PyPDF2"
except Exception as e:
return f"Error reading PDF: {str(e)}"
def _extract_txt_content(self, file_path: str) -> str:
"""Extract text content from plain text file."""
try:
# Try different encodings
encodings = ['utf-8', 'utf-16', 'latin-1', 'cp1252']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as file:
return file.read()
except UnicodeDecodeError:
continue
return "Error: Could not decode text file with any supported encoding"
except Exception as e:
return f"Error reading TXT: {str(e)}"
def _extract_mobi_text(self, file_path: str) -> str:
"""Extract text content from MOBI file."""
try:
import kindle_unpack
# MOBI extraction is complex - this is a simplified approach
# You might want to use calibre's built-in conversion instead
with open(file_path, 'rb') as file:
# This is a placeholder - real MOBI extraction requires specialized libraries
return "MOBI content extraction not yet implemented. Consider converting to EPUB first."
except ImportError:
# Alternative approach using calibre command-line tools
try:
import subprocess
import tempfile
# Use calibre's ebook-convert to extract text
with tempfile.NamedTemporaryFile(suffix='.txt', delete=False) as tmp_file:
cmd = ['ebook-convert', file_path, tmp_file.name]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
with open(tmp_file.name, 'r', encoding='utf-8') as f:
return f.read()
else:
return f"Error converting MOBI: {result.stderr}"
except Exception as e:
return f"Error reading MOBI: {str(e)}. Consider installing kindle_unpack or using calibre's ebook-convert."
except Exception as e:
return f"Error reading MOBI: {str(e)}"
def get_book_content(self, formats: Dict[str, str], preferred_format: Optional[str] = None, max_length: int = 50000) -> Dict[str, Any]:
"""Extract text content from a book file."""
if not formats:
return {
"status": "error",
"message": "No formats available for this book"
}
# Determine which format to use
format_priority = ['TXT', 'EPUB', 'PDF', 'MOBI']
if preferred_format and preferred_format.upper() in formats:
format_to_use = preferred_format.upper()
else:
format_to_use = None
for fmt in format_priority:
if fmt in formats:
format_to_use = fmt
break
if not format_to_use:
return {
"status": "error",
"message": f"No supported formats found. Available: {list(formats.keys())}"
}
file_path = formats[format_to_use]
# Extract content based on format
extractors = {
'EPUB': self._extract_epub_text,
'PDF': self._extract_pdf_text,
'TXT': self._extract_txt_content,
'MOBI': self._extract_mobi_text
}
if format_to_use not in extractors:
return {
"status": "error",
"message": f"Format {format_to_use} not supported yet"
}
try:
content = extractors[format_to_use](file_path)
# Truncate if too long
if len(content) > max_length:
content = content[:max_length] + f"\n\n... [Content truncated at {max_length} characters]"
return {
"status": "success",
"format_used": format_to_use,
"file_path": file_path,
"content": content,
"content_length": len(content),
"truncated": len(content) >= max_length
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to extract content: {str(e)}"
}
def get_book_sample(self, formats: Dict[str, str], sample_type: str = "beginning", sample_size: int = 5000) -> Dict[str, Any]:
"""Get a sample of book content for LLM analysis."""
# First get the full content (with a reasonable limit)
content_result = self.get_book_content(formats, max_length=100000)
if content_result["status"] != "success":
return content_result
full_content = content_result["content"]
if sample_type == "beginning":
sample = full_content[:sample_size]
elif sample_type == "end":
sample = full_content[-sample_size:]
elif sample_type == "middle":
mid_point = len(full_content) // 2
start = max(0, mid_point - sample_size // 2)
sample = full_content[start:start + sample_size]
elif sample_type == "overview":
# Get beginning, middle, and end samples
chunk_size = sample_size // 3
beginning = full_content[:chunk_size]
mid_point = len(full_content) // 2
middle_start = max(0, mid_point - chunk_size // 2)
middle = full_content[middle_start:middle_start + chunk_size]
end = full_content[-chunk_size:]
sample = f"=== BEGINNING ===\n{beginning}\n\n=== MIDDLE ===\n{middle}\n\n=== END ===\n{end}"
else:
return {
"status": "error",
"message": f"Unknown sample type: {sample_type}. Use: beginning, end, middle, overview"
}
return {
"status": "success",
"format_used": content_result["format_used"],
"sample_type": sample_type,
"sample": sample,
"sample_length": len(sample),
"full_content_length": len(full_content)
}
def search_content(self, formats: Dict[str, str], query: str, case_sensitive: bool = False) -> Dict[str, Any]:
"""Search for content in the book files."""
if not formats:
return {
"status": "error",
"message": "No formats available for this book"
}
# Normalize the query for case-insensitive search
if not case_sensitive:
query = query.lower()
# Collect results from all formats
all_results = []
for fmt, file_path in formats.items():
try:
if fmt.upper() == 'EPUB':
# For EPUB, extract chapters with content
chapters = self._extract_epub_chapters(file_path)
for chapter in chapters:
title = chapter["title"]
content = chapter["content"]
# Normalize content for case-insensitive search
if not case_sensitive:
content = content.lower()
# Search for the query in the content
if query in content:
all_results.append({
"format": "EPUB",
"file_path": file_path,
"title": title,
"snippet": self._get_text_snippet(content, query),
"chapter_content": content
})
elif fmt.upper() == 'PDF':
# For PDF, extract text and search
text = self._extract_pdf_text(file_path)
# Normalize text for case-insensitive search
if not case_sensitive:
text = text.lower()
if query in text:
all_results.append({
"format": "PDF",
"file_path": file_path,
"title": "PDF Content",
"snippet": self._get_text_snippet(text, query),
"chapter_content": text
})
elif fmt.upper() == 'TXT':
# For TXT, extract text and search
text = self._extract_txt_content(file_path)
# Normalize text for case-insensitive search
if not case_sensitive:
text = text.lower()
if query in text:
all_results.append({
"format": "TXT",
"file_path": file_path,
"title": "Text File Content",
"snippet": self._get_text_snippet(text, query),
"chapter_content": text
})
elif fmt.upper() == 'MOBI':
# For MOBI, attempt extraction or conversion
mobi_text = self._extract_mobi_text(file_path)
# Normalize MOBI text for case-insensitive search
if not case_sensitive:
mobi_text = mobi_text.lower()
if query in mobi_text:
all_results.append({
"format": "MOBI",
"file_path": file_path,
"title": "MOBI Content",
"snippet": self._get_text_snippet(mobi_text, query),
"chapter_content": mobi_text
})
except Exception as e:
logger.error(f"Error searching content in {fmt} file: {e}")
return {
"status": "success",
"results": all_results
}
def _get_text_snippet(self, text: str, query: str, snippet_length: int = 75) -> str:
"""Get a snippet of text around the query for preview."""
try:
# Find the position of the query in the text
start = text.find(query)
if start == -1:
return ""
# Calculate start and end positions for the snippet
start = max(0, start - snippet_length)
end = min(len(text), start + len(query) + snippet_length)
# Extract and return the snippet
return text[start:end].strip()
except Exception as e:
logger.error(f"Error getting text snippet: {e}")
return ""