calibre_database.py•32.6 kB
"""Calibre Database Operations
Handles all SQLite database queries for metadata retrieval.
Performance optimized for large libraries (107K+ books).
"""
import logging
import sqlite3
import json
import time
from pathlib import Path
from typing import Dict, Any, List, Optional
from fuzzy_matcher import SimpleFuzzyMatcher
logger = logging.getLogger(__name__)
class CalibreDatabase:
"""Handles all database operations for Calibre library."""
def __init__(self, db_path: Path, config_path: Optional[Path] = None):
"""Initialize database handler with performance configuration."""
self.db_path = db_path
if not self.db_path.exists():
raise FileNotFoundError(f"Database not found: {db_path}")
# Load configuration with performance settings
self.config = self._load_config(config_path)
self.query_count = 0 # For N+1 detection
logger.info(f"Database initialized with batch_queries={self.config.get('enable_batch_queries', True)}")
def _load_config(self, config_path: Optional[Path]) -> Dict[str, Any]:
"""Load configuration with performance defaults."""
defaults = {
'max_batch_books': 100,
'max_search_results': 1000,
'max_random_books': 50,
'slow_query_threshold_ms': 1000,
'optimize_group_concat': True,
'enable_batch_queries': True,
'default_search_limit': 20
}
if not config_path:
return defaults
# Convert to Path if it's a string
if isinstance(config_path, str):
config_path = Path(config_path)
if not config_path.exists():
return defaults
try:
with open(config_path, 'r') as f:
config = json.load(f)
# Merge with defaults
for key, value in defaults.items():
if key not in config:
config[key] = value
return config
except Exception as e:
logger.warning(f"Failed to load config: {e}, using defaults")
return defaults
def _execute_with_timing(self, query: str, params: List[Any]) -> List[tuple]:
"""Execute query with timing and N+1 detection."""
start_time = time.time()
self.query_count += 1
with sqlite3.connect(str(self.db_path)) as conn:
cursor = conn.cursor()
cursor.execute(query, params)
results = cursor.fetchall()
elapsed_ms = (time.time() - start_time) * 1000
if elapsed_ms > self.config.get('slow_query_threshold_ms', 1000):
logger.warning(f"Slow query ({elapsed_ms:.1f}ms): {query[:100]}...")
return results
def get_total_books(self) -> Dict[str, Any]:
"""Get total number of books using simple SQL query."""
try:
with sqlite3.connect(str(self.db_path)) as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM books")
total = cursor.fetchone()[0]
return {
"status": "success",
"total_books": total,
"library_path": str(self.db_path.parent)
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to get book count: {str(e)}"
}
def search_by_author(self, author_name: str, offset: int = 0, limit: int = 50, fuzzy: bool = False) -> Dict[str, Any]:
"""Search for books by author name with pagination and optional fuzzy matching."""
try:
search_terms = [author_name]
fuzzy_matches = []
if fuzzy:
# Generate fuzzy search variations
fuzzy_terms = SimpleFuzzyMatcher.expand_search_terms(author_name)
search_terms.extend(fuzzy_terms)
fuzzy_matches = [(term, 1.0) for term in fuzzy_terms if term != author_name]
with sqlite3.connect(str(self.db_path)) as conn:
cursor = conn.cursor()
# Build OR conditions for all search terms
or_conditions = []
params = []
for term in search_terms:
or_conditions.append("a.name LIKE ? COLLATE NOCASE")
params.append(f"%{term}%")
where_clause = " OR ".join(or_conditions)
# First get total count
count_query = f"""
SELECT COUNT(DISTINCT b.id)
FROM books b
LEFT JOIN books_authors_link bal ON b.id = bal.book
LEFT JOIN authors a ON bal.author = a.id
WHERE ({where_clause})
"""
cursor.execute(count_query, params)
total_count = cursor.fetchone()[0]
# Get results with pagination
query = f"""
SELECT b.id, b.title, GROUP_CONCAT(a.name, ', ') as authors,
b.series_index, s.name as series_name, b.pubdate
FROM books b
LEFT JOIN books_authors_link bal ON b.id = bal.book
LEFT JOIN authors a ON bal.author = a.id
LEFT JOIN books_series_link bsl ON b.id = bsl.book
LEFT JOIN series s ON bsl.series = s.id
WHERE ({where_clause})
GROUP BY b.id, b.title, b.series_index, s.name, b.pubdate
ORDER BY a.name, b.title
LIMIT ? OFFSET ?
"""
params.extend([limit, offset])
cursor.execute(query, params)
rows = cursor.fetchall()
results = []
for row in rows:
book_id, title, authors, series_index, series_name, pubdate = row
results.append({
"book_id": book_id,
"title": title,
"authors": authors.split(", ") if authors else [],
"series": {
"name": series_name,
"index": float(series_index) if series_index else None
} if series_name else None,
"published_date": pubdate
})
response = {
"status": "success",
"results": results,
"pagination": {
"offset": offset,
"limit": limit,
"total_results": total_count,
"has_more": (offset + limit) < total_count
},
"search_term": author_name
}
if fuzzy and fuzzy_matches:
response["fuzzy_matches"] = fuzzy_matches[:3] # Limit suggestions
return response
except sqlite3.DatabaseError as e:
logger.error(f"Database error in author search: {e}")
return {
"status": "error",
"message": f"Database error during author search: {str(e)}",
"error_type": "database_error"
}
except Exception as e:
logger.error(f"Unexpected error in author search: {e}")
return {
"status": "error",
"message": f"Failed to search by author: {str(e)}",
"error_type": "search_error"
}
def search_by_title(self, title: str, offset: int = 0, limit: int = 50, fuzzy: bool = False) -> Dict[str, Any]:
"""Search for books by title with pagination and optional fuzzy matching."""
try:
with sqlite3.connect(str(self.db_path)) as conn:
cursor = conn.cursor()
# First get total count
count_query = """
SELECT COUNT(DISTINCT b.id)
FROM books b
WHERE b.title LIKE ? COLLATE NOCASE
"""
cursor.execute(count_query, (f"%{title}%",))
total_count = cursor.fetchone()[0]
# Get results with pagination
query = """
SELECT b.id, b.title, GROUP_CONCAT(a.name, ', ') as authors,
b.series_index, s.name as series_name, b.pubdate
FROM books b
LEFT JOIN books_authors_link bal ON b.id = bal.book
LEFT JOIN authors a ON bal.author = a.id
LEFT JOIN books_series_link bsl ON b.id = bsl.book
LEFT JOIN series s ON bsl.series = s.id
WHERE b.title LIKE ? COLLATE NOCASE
GROUP BY b.id, b.title, b.series_index, s.name, b.pubdate
ORDER BY b.title
LIMIT ? OFFSET ?
"""
cursor.execute(query, (f"%{title}%", limit, offset))
rows = cursor.fetchall()
results = []
for row in rows:
book_id, book_title, authors, series_index, series_name, pubdate = row
results.append({
"book_id": book_id,
"title": book_title,
"authors": authors.split(", ") if authors else [],
"series": {
"name": series_name,
"index": float(series_index) if series_index else None
} if series_name else None,
"published_date": pubdate
})
return {
"status": "success",
"results": results,
"pagination": {
"offset": offset,
"limit": limit,
"total_results": total_count,
"has_more": (offset + limit) < total_count
},
"search_term": title
}
except sqlite3.DatabaseError as e:
logger.error(f"Database error in title search: {e}")
return {
"status": "error",
"message": f"Database error during title search: {str(e)}",
"error_type": "database_error"
}
except Exception as e:
logger.error(f"Unexpected error in title search: {e}")
return {
"status": "error",
"message": f"Failed to search by title: {str(e)}",
"error_type": "search_error"
}
def get_book_details(self, book_id: int) -> Dict[str, Any]:
"""Get detailed metadata for a specific book by book_id."""
try:
with sqlite3.connect(str(self.db_path)) as conn:
cursor = conn.cursor()
query = """
SELECT b.id, b.title, b.timestamp, b.pubdate, b.isbn,
GROUP_CONCAT(a.name, ', ') as authors,
b.series_index, s.name as series_name,
c.text as comments, b.path
FROM books b
LEFT JOIN books_authors_link bal ON b.id = bal.book
LEFT JOIN authors a ON bal.author = a.id
LEFT JOIN books_series_link bsl ON b.id = bsl.book
LEFT JOIN series s ON bsl.series = s.id
LEFT JOIN comments c ON b.id = c.book
WHERE b.id = ?
GROUP BY b.id
"""
cursor.execute(query, (book_id,))
row = cursor.fetchone()
if not row:
return {
"status": "error",
"message": f"Book with ID {book_id} not found"
}
book_id, title, timestamp, pubdate, isbn, authors, series_index, series_name, comments, path = row
result = {
"book_id": book_id,
"title": title,
"authors": authors.split(", ") if authors else [],
"series": series_name,
"series_index": float(series_index) if series_index else None,
"isbn": isbn,
"pubdate": pubdate,
"timestamp": timestamp,
"comments": comments,
"path": path
}
return {
"status": "success",
"result": result
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to get book details: {str(e)}"
}
def get_book_formats(self, book_id: int, library_path: Path) -> Dict[str, Any]:
"""Get all available formats and their file paths for a book."""
try:
with sqlite3.connect(str(self.db_path)) as conn:
cursor = conn.cursor()
# Get book path and available formats
query = """
SELECT b.path, d.format, d.name
FROM books b
LEFT JOIN data d ON b.id = d.book
WHERE b.id = ?
"""
cursor.execute(query, (book_id,))
rows = cursor.fetchall()
if not rows or not rows[0][0]:
return {"status": "error", "message": "Book not found"}
book_path = rows[0][0]
formats = {}
for row in rows:
if row[1] and row[2]: # format and filename exist
format_name = row[1].upper()
file_name = row[2]
# Add file extension if not present
if not file_name.lower().endswith(f'.{format_name.lower()}'):
file_name = f"{file_name}.{format_name.lower()}"
full_path = library_path / book_path / file_name
if full_path.exists():
formats[format_name] = str(full_path)
return {
"status": "success",
"book_id": book_id,
"book_path": book_path,
"formats": formats
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to get book formats: {str(e)}"
}
def get_recent_books(self, limit: int = 20) -> Dict[str, Any]:
"""Get recently added books."""
try:
with sqlite3.connect(str(self.db_path)) as conn:
cursor = conn.cursor()
query = """
SELECT b.id, b.title, ba.name as author, b.timestamp, b.series_index,
s.name as series_name
FROM books b
LEFT JOIN books_authors_link bal ON b.id = bal.book
LEFT JOIN authors ba ON bal.author = ba.id
LEFT JOIN books_series_link bsl ON b.id = bsl.book
LEFT JOIN series s ON bsl.series = s.id
ORDER BY b.timestamp DESC
LIMIT ?
"""
cursor.execute(query, (limit,))
rows = cursor.fetchall()
books = []
for row in rows:
book_id, title, author, timestamp, series_index, series_name = row
books.append({
"book_id": book_id,
"title": title,
"author": author or "Unknown",
"timestamp": timestamp,
"series": series_name,
"series_index": series_index
})
return {
"status": "success",
"results": books,
"count": len(books)
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to get recent books: {str(e)}"
}
def get_random_books(self, count: int = 5) -> Dict[str, Any]:
"""Get random books."""
try:
with sqlite3.connect(str(self.db_path)) as conn:
cursor = conn.cursor()
query = """
SELECT b.id, b.title, ba.name as author, s.name as series_name,
b.series_index
FROM books b
LEFT JOIN books_authors_link bal ON b.id = bal.book
LEFT JOIN authors ba ON bal.author = ba.id
LEFT JOIN books_series_link bsl ON b.id = bsl.book
LEFT JOIN series s ON bsl.series = s.id
ORDER BY RANDOM()
LIMIT ?
"""
cursor.execute(query, (count,))
rows = cursor.fetchall()
books = []
for row in rows:
book_id, title, author, series_name, series_index = row
books.append({
"book_id": book_id,
"title": title,
"author": author or "Unknown",
"series": series_name,
"series_index": series_index
})
return {
"status": "success",
"results": books,
"count": len(books)
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to get random books: {str(e)}"
}
def get_all_series(self, limit: int = 100) -> Dict[str, Any]:
"""Get all series with book counts."""
try:
with sqlite3.connect(str(self.db_path)) as conn:
cursor = conn.cursor()
query = """
SELECT s.name, COUNT(bsl.book) as book_count
FROM series s
LEFT JOIN books_series_link bsl ON s.id = bsl.series
GROUP BY s.id, s.name
HAVING book_count > 0
ORDER BY s.name
LIMIT ?
"""
cursor.execute(query, (limit,))
rows = cursor.fetchall()
series = []
for row in rows:
name, book_count = row
series.append({
"series_name": name,
"book_count": book_count
})
return {
"status": "success",
"results": series,
"count": len(series)
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to get series: {str(e)}"
}
def get_all_authors(self, limit: int = 100) -> Dict[str, Any]:
"""Get all authors with book counts."""
try:
with sqlite3.connect(str(self.db_path)) as conn:
cursor = conn.cursor()
query = """
SELECT a.name, COUNT(bal.book) as book_count
FROM authors a
LEFT JOIN books_authors_link bal ON a.id = bal.author
GROUP BY a.id, a.name
HAVING book_count > 0
ORDER BY a.name
LIMIT ?
"""
cursor.execute(query, (limit,))
rows = cursor.fetchall()
authors = []
for row in rows:
name, book_count = row
authors.append({
"author_name": name,
"book_count": book_count
})
return {
"status": "success",
"results": authors,
"count": len(authors)
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to get authors: {str(e)}"
}
def get_books_batch(self, book_ids: List[int], include_full_metadata: bool = True) -> Dict[str, Any]:
"""Get detailed metadata for multiple books in a single optimized query."""
if not book_ids:
return {
"status": "error",
"message": "book_ids cannot be empty"
}
# Use configurable batch limit
max_batch = self.config.get('max_batch_books', 100)
if len(book_ids) > max_batch:
logger.warning(f"Batch size {len(book_ids)} exceeds limit {max_batch}, truncating")
book_ids = book_ids[:max_batch]
try:
self.query_count = 0 # Reset for N+1 detection
placeholders = ",".join(["?"] * len(book_ids))
# Optimize query based on metadata requirements
if include_full_metadata and self.config.get('optimize_group_concat', True):
# Full metadata with all GROUP_CONCAT operations
query = f"""
SELECT b.id, b.title, b.timestamp, b.pubdate, b.isbn,
GROUP_CONCAT(a.name, ', ') as authors,
b.series_index, s.name as series_name,
c.text as comments, b.path,
GROUP_CONCAT(t.name, ', ') as tags,
GROUP_CONCAT(d.format, ', ') as formats
FROM books b
LEFT JOIN books_authors_link bal ON b.id = bal.book
LEFT JOIN authors a ON bal.author = a.id
LEFT JOIN books_series_link bsl ON b.id = bsl.book
LEFT JOIN series s ON bsl.series = s.id
LEFT JOIN comments c ON b.id = c.book
LEFT JOIN books_tags_link btl ON b.id = btl.book
LEFT JOIN tags t ON btl.tag = t.id
LEFT JOIN data d ON b.id = d.book
WHERE b.id IN ({placeholders})
GROUP BY b.id
ORDER BY b.title
"""
else:
# Minimal metadata for better performance
query = f"""
SELECT b.id, b.title, b.timestamp, b.pubdate, b.isbn,
GROUP_CONCAT(a.name, ', ') as authors,
b.series_index, s.name as series_name,
NULL as comments, b.path,
NULL as tags, NULL as formats
FROM books b
LEFT JOIN books_authors_link bal ON b.id = bal.book
LEFT JOIN authors a ON bal.author = a.id
LEFT JOIN books_series_link bsl ON b.id = bsl.book
LEFT JOIN series s ON bsl.series = s.id
WHERE b.id IN ({placeholders})
GROUP BY b.id
ORDER BY b.title
"""
rows = self._execute_with_timing(query, book_ids)
results = []
found_ids = set()
for row in rows:
book_id, title, timestamp, pubdate, isbn, authors, series_index, series_name, comments, path, tags, formats = row
found_ids.add(book_id)
results.append({
"book_id": book_id,
"title": title,
"authors": authors.split(", ") if authors else [],
"series": {
"name": series_name,
"index": float(series_index) if series_index else None
} if series_name else None,
"isbn": isbn,
"published_date": pubdate,
"timestamp": timestamp,
"description": comments,
"path": path,
"tags": tags.split(", ") if tags and include_full_metadata else [],
"formats": formats.split(", ") if formats and include_full_metadata else []
})
# N+1 query detection
if self.query_count > 1:
logger.warning(f"Potential N+1 query pattern: {self.query_count} queries for {len(book_ids)} books")
# Track not found books
not_found = [bid for bid in book_ids if bid not in found_ids]
errors = [{"book_id": bid, "error": "Book not found"} for bid in not_found]
return {
"status": "success",
"results": results,
"errors": errors,
"summary": {
"requested": len(book_ids),
"found": len(results),
"errors": len(errors)
},
"performance": {
"queries_executed": self.query_count,
"full_metadata": include_full_metadata
}
}
except sqlite3.DatabaseError as e:
logger.error(f"Database error in get_books_batch: {e}")
return {
"status": "error",
"message": f"Database error during batch book retrieval: {str(e)}",
"error_type": "database_error",
"sql_error_details": str(e)
}
except Exception as e:
logger.error(f"get_books_batch failed: {e}")
return {
"status": "error",
"message": f"Failed to get books batch: {str(e)}",
"error_type": "batch_error"
}
def unified_search(self, query: str = "", author: str = "", title: str = "", series: str = "",
formats: List[str] = None, date_range: Dict[str, str] = None,
offset: int = 0, limit: int = 50, sort_by: str = "title") -> Dict[str, Any]:
"""Unified search with multiple filters and pagination."""
try:
with sqlite3.connect(str(self.db_path)) as conn:
cursor = conn.cursor()
conditions = []
params = []
# Count query first
count_base = """
SELECT COUNT(DISTINCT b.id)
FROM books b
LEFT JOIN books_authors_link bal ON b.id = bal.book
LEFT JOIN authors ba ON bal.author = ba.id
LEFT JOIN books_series_link bsl ON b.id = bsl.book
LEFT JOIN series s ON bsl.series = s.id
LEFT JOIN data d ON b.id = d.book
"""
# Main query
base_query = """
SELECT b.id, b.title, GROUP_CONCAT(ba.name, ', ') as authors,
s.name as series_name, b.series_index, b.pubdate,
GROUP_CONCAT(d.format, ', ') as formats
FROM books b
LEFT JOIN books_authors_link bal ON b.id = bal.book
LEFT JOIN authors ba ON bal.author = ba.id
LEFT JOIN books_series_link bsl ON b.id = bsl.book
LEFT JOIN series s ON bsl.series = s.id
LEFT JOIN data d ON b.id = d.book
"""
# Build conditions
if query:
conditions.append("(b.title LIKE ? OR ba.name LIKE ?)")
params.extend([f"%{query}%", f"%{query}%"])
if author:
conditions.append("ba.name LIKE ?")
params.append(f"%{author}%")
if title:
conditions.append("b.title LIKE ?")
params.append(f"%{title}%")
if series:
conditions.append("s.name LIKE ?")
params.append(f"%{series}%")
if formats:
format_conditions = []
for fmt in formats:
format_conditions.append("d.format = ?")
params.append(fmt.upper())
if format_conditions:
conditions.append(f"({' OR '.join(format_conditions)})")
if date_range:
if date_range.get('start'):
conditions.append("b.pubdate >= ?")
params.append(date_range['start'])
if date_range.get('end'):
conditions.append("b.pubdate <= ?")
params.append(date_range['end'])
where_clause = ""
if conditions:
where_clause = " WHERE " + " AND ".join(conditions)
# Get total count
cursor.execute(count_base + where_clause, params)
total_count = cursor.fetchone()[0]
# Sort options
sort_mapping = {
"title": "b.title",
"author": "ba.name, b.title",
"date": "b.pubdate DESC, b.title",
"series": "s.name, b.series_index"
}
sort_clause = sort_mapping.get(sort_by, "b.title")
# Execute main query with pagination
full_query = base_query + where_clause + f" GROUP BY b.id ORDER BY {sort_clause} LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor.execute(full_query, params)
rows = cursor.fetchall()
results = []
for row in rows:
book_id, title, authors, series_name, series_index, pubdate, book_formats = row
results.append({
"book_id": book_id,
"title": title,
"authors": authors.split(", ") if authors else [],
"series": {
"name": series_name,
"index": float(series_index) if series_index else None
} if series_name else None,
"published_date": pubdate,
"formats": book_formats.split(", ") if book_formats else []
})
return {
"status": "success",
"results": results,
"pagination": {
"offset": offset,
"limit": limit,
"total_results": total_count,
"has_more": (offset + limit) < total_count
},
"applied_filters": {
"query": query,
"author": author,
"title": title,
"series": series,
"formats": formats,
"date_range": date_range,
"sort_by": sort_by
}
}
except sqlite3.DatabaseError as e:
logger.error(f"Database error in unified search: {e}")
return {
"status": "error",
"message": f"Database error during unified search: {str(e)}",
"error_type": "database_error",
"sql_error_details": str(e)
}
except Exception as e:
logger.error(f"Unexpected error in unified search: {e}")
return {
"status": "error",
"message": f"Failed to perform unified search: {str(e)}",
"error_type": "search_error"
}