Skip to main content
Glama

Neolibrarian MCP

by pshap
calibre_database.py32.6 kB
"""Calibre Database Operations Handles all SQLite database queries for metadata retrieval. Performance optimized for large libraries (107K+ books). """ import logging import sqlite3 import json import time from pathlib import Path from typing import Dict, Any, List, Optional from fuzzy_matcher import SimpleFuzzyMatcher logger = logging.getLogger(__name__) class CalibreDatabase: """Handles all database operations for Calibre library.""" def __init__(self, db_path: Path, config_path: Optional[Path] = None): """Initialize database handler with performance configuration.""" self.db_path = db_path if not self.db_path.exists(): raise FileNotFoundError(f"Database not found: {db_path}") # Load configuration with performance settings self.config = self._load_config(config_path) self.query_count = 0 # For N+1 detection logger.info(f"Database initialized with batch_queries={self.config.get('enable_batch_queries', True)}") def _load_config(self, config_path: Optional[Path]) -> Dict[str, Any]: """Load configuration with performance defaults.""" defaults = { 'max_batch_books': 100, 'max_search_results': 1000, 'max_random_books': 50, 'slow_query_threshold_ms': 1000, 'optimize_group_concat': True, 'enable_batch_queries': True, 'default_search_limit': 20 } if not config_path: return defaults # Convert to Path if it's a string if isinstance(config_path, str): config_path = Path(config_path) if not config_path.exists(): return defaults try: with open(config_path, 'r') as f: config = json.load(f) # Merge with defaults for key, value in defaults.items(): if key not in config: config[key] = value return config except Exception as e: logger.warning(f"Failed to load config: {e}, using defaults") return defaults def _execute_with_timing(self, query: str, params: List[Any]) -> List[tuple]: """Execute query with timing and N+1 detection.""" start_time = time.time() self.query_count += 1 with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.cursor() cursor.execute(query, params) results = cursor.fetchall() elapsed_ms = (time.time() - start_time) * 1000 if elapsed_ms > self.config.get('slow_query_threshold_ms', 1000): logger.warning(f"Slow query ({elapsed_ms:.1f}ms): {query[:100]}...") return results def get_total_books(self) -> Dict[str, Any]: """Get total number of books using simple SQL query.""" try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM books") total = cursor.fetchone()[0] return { "status": "success", "total_books": total, "library_path": str(self.db_path.parent) } except Exception as e: return { "status": "error", "message": f"Failed to get book count: {str(e)}" } def search_by_author(self, author_name: str, offset: int = 0, limit: int = 50, fuzzy: bool = False) -> Dict[str, Any]: """Search for books by author name with pagination and optional fuzzy matching.""" try: search_terms = [author_name] fuzzy_matches = [] if fuzzy: # Generate fuzzy search variations fuzzy_terms = SimpleFuzzyMatcher.expand_search_terms(author_name) search_terms.extend(fuzzy_terms) fuzzy_matches = [(term, 1.0) for term in fuzzy_terms if term != author_name] with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.cursor() # Build OR conditions for all search terms or_conditions = [] params = [] for term in search_terms: or_conditions.append("a.name LIKE ? COLLATE NOCASE") params.append(f"%{term}%") where_clause = " OR ".join(or_conditions) # First get total count count_query = f""" SELECT COUNT(DISTINCT b.id) FROM books b LEFT JOIN books_authors_link bal ON b.id = bal.book LEFT JOIN authors a ON bal.author = a.id WHERE ({where_clause}) """ cursor.execute(count_query, params) total_count = cursor.fetchone()[0] # Get results with pagination query = f""" SELECT b.id, b.title, GROUP_CONCAT(a.name, ', ') as authors, b.series_index, s.name as series_name, b.pubdate FROM books b LEFT JOIN books_authors_link bal ON b.id = bal.book LEFT JOIN authors a ON bal.author = a.id LEFT JOIN books_series_link bsl ON b.id = bsl.book LEFT JOIN series s ON bsl.series = s.id WHERE ({where_clause}) GROUP BY b.id, b.title, b.series_index, s.name, b.pubdate ORDER BY a.name, b.title LIMIT ? OFFSET ? """ params.extend([limit, offset]) cursor.execute(query, params) rows = cursor.fetchall() results = [] for row in rows: book_id, title, authors, series_index, series_name, pubdate = row results.append({ "book_id": book_id, "title": title, "authors": authors.split(", ") if authors else [], "series": { "name": series_name, "index": float(series_index) if series_index else None } if series_name else None, "published_date": pubdate }) response = { "status": "success", "results": results, "pagination": { "offset": offset, "limit": limit, "total_results": total_count, "has_more": (offset + limit) < total_count }, "search_term": author_name } if fuzzy and fuzzy_matches: response["fuzzy_matches"] = fuzzy_matches[:3] # Limit suggestions return response except sqlite3.DatabaseError as e: logger.error(f"Database error in author search: {e}") return { "status": "error", "message": f"Database error during author search: {str(e)}", "error_type": "database_error" } except Exception as e: logger.error(f"Unexpected error in author search: {e}") return { "status": "error", "message": f"Failed to search by author: {str(e)}", "error_type": "search_error" } def search_by_title(self, title: str, offset: int = 0, limit: int = 50, fuzzy: bool = False) -> Dict[str, Any]: """Search for books by title with pagination and optional fuzzy matching.""" try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.cursor() # First get total count count_query = """ SELECT COUNT(DISTINCT b.id) FROM books b WHERE b.title LIKE ? COLLATE NOCASE """ cursor.execute(count_query, (f"%{title}%",)) total_count = cursor.fetchone()[0] # Get results with pagination query = """ SELECT b.id, b.title, GROUP_CONCAT(a.name, ', ') as authors, b.series_index, s.name as series_name, b.pubdate FROM books b LEFT JOIN books_authors_link bal ON b.id = bal.book LEFT JOIN authors a ON bal.author = a.id LEFT JOIN books_series_link bsl ON b.id = bsl.book LEFT JOIN series s ON bsl.series = s.id WHERE b.title LIKE ? COLLATE NOCASE GROUP BY b.id, b.title, b.series_index, s.name, b.pubdate ORDER BY b.title LIMIT ? OFFSET ? """ cursor.execute(query, (f"%{title}%", limit, offset)) rows = cursor.fetchall() results = [] for row in rows: book_id, book_title, authors, series_index, series_name, pubdate = row results.append({ "book_id": book_id, "title": book_title, "authors": authors.split(", ") if authors else [], "series": { "name": series_name, "index": float(series_index) if series_index else None } if series_name else None, "published_date": pubdate }) return { "status": "success", "results": results, "pagination": { "offset": offset, "limit": limit, "total_results": total_count, "has_more": (offset + limit) < total_count }, "search_term": title } except sqlite3.DatabaseError as e: logger.error(f"Database error in title search: {e}") return { "status": "error", "message": f"Database error during title search: {str(e)}", "error_type": "database_error" } except Exception as e: logger.error(f"Unexpected error in title search: {e}") return { "status": "error", "message": f"Failed to search by title: {str(e)}", "error_type": "search_error" } def get_book_details(self, book_id: int) -> Dict[str, Any]: """Get detailed metadata for a specific book by book_id.""" try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.cursor() query = """ SELECT b.id, b.title, b.timestamp, b.pubdate, b.isbn, GROUP_CONCAT(a.name, ', ') as authors, b.series_index, s.name as series_name, c.text as comments, b.path FROM books b LEFT JOIN books_authors_link bal ON b.id = bal.book LEFT JOIN authors a ON bal.author = a.id LEFT JOIN books_series_link bsl ON b.id = bsl.book LEFT JOIN series s ON bsl.series = s.id LEFT JOIN comments c ON b.id = c.book WHERE b.id = ? GROUP BY b.id """ cursor.execute(query, (book_id,)) row = cursor.fetchone() if not row: return { "status": "error", "message": f"Book with ID {book_id} not found" } book_id, title, timestamp, pubdate, isbn, authors, series_index, series_name, comments, path = row result = { "book_id": book_id, "title": title, "authors": authors.split(", ") if authors else [], "series": series_name, "series_index": float(series_index) if series_index else None, "isbn": isbn, "pubdate": pubdate, "timestamp": timestamp, "comments": comments, "path": path } return { "status": "success", "result": result } except Exception as e: return { "status": "error", "message": f"Failed to get book details: {str(e)}" } def get_book_formats(self, book_id: int, library_path: Path) -> Dict[str, Any]: """Get all available formats and their file paths for a book.""" try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.cursor() # Get book path and available formats query = """ SELECT b.path, d.format, d.name FROM books b LEFT JOIN data d ON b.id = d.book WHERE b.id = ? """ cursor.execute(query, (book_id,)) rows = cursor.fetchall() if not rows or not rows[0][0]: return {"status": "error", "message": "Book not found"} book_path = rows[0][0] formats = {} for row in rows: if row[1] and row[2]: # format and filename exist format_name = row[1].upper() file_name = row[2] # Add file extension if not present if not file_name.lower().endswith(f'.{format_name.lower()}'): file_name = f"{file_name}.{format_name.lower()}" full_path = library_path / book_path / file_name if full_path.exists(): formats[format_name] = str(full_path) return { "status": "success", "book_id": book_id, "book_path": book_path, "formats": formats } except Exception as e: return { "status": "error", "message": f"Failed to get book formats: {str(e)}" } def get_recent_books(self, limit: int = 20) -> Dict[str, Any]: """Get recently added books.""" try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.cursor() query = """ SELECT b.id, b.title, ba.name as author, b.timestamp, b.series_index, s.name as series_name FROM books b LEFT JOIN books_authors_link bal ON b.id = bal.book LEFT JOIN authors ba ON bal.author = ba.id LEFT JOIN books_series_link bsl ON b.id = bsl.book LEFT JOIN series s ON bsl.series = s.id ORDER BY b.timestamp DESC LIMIT ? """ cursor.execute(query, (limit,)) rows = cursor.fetchall() books = [] for row in rows: book_id, title, author, timestamp, series_index, series_name = row books.append({ "book_id": book_id, "title": title, "author": author or "Unknown", "timestamp": timestamp, "series": series_name, "series_index": series_index }) return { "status": "success", "results": books, "count": len(books) } except Exception as e: return { "status": "error", "message": f"Failed to get recent books: {str(e)}" } def get_random_books(self, count: int = 5) -> Dict[str, Any]: """Get random books.""" try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.cursor() query = """ SELECT b.id, b.title, ba.name as author, s.name as series_name, b.series_index FROM books b LEFT JOIN books_authors_link bal ON b.id = bal.book LEFT JOIN authors ba ON bal.author = ba.id LEFT JOIN books_series_link bsl ON b.id = bsl.book LEFT JOIN series s ON bsl.series = s.id ORDER BY RANDOM() LIMIT ? """ cursor.execute(query, (count,)) rows = cursor.fetchall() books = [] for row in rows: book_id, title, author, series_name, series_index = row books.append({ "book_id": book_id, "title": title, "author": author or "Unknown", "series": series_name, "series_index": series_index }) return { "status": "success", "results": books, "count": len(books) } except Exception as e: return { "status": "error", "message": f"Failed to get random books: {str(e)}" } def get_all_series(self, limit: int = 100) -> Dict[str, Any]: """Get all series with book counts.""" try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.cursor() query = """ SELECT s.name, COUNT(bsl.book) as book_count FROM series s LEFT JOIN books_series_link bsl ON s.id = bsl.series GROUP BY s.id, s.name HAVING book_count > 0 ORDER BY s.name LIMIT ? """ cursor.execute(query, (limit,)) rows = cursor.fetchall() series = [] for row in rows: name, book_count = row series.append({ "series_name": name, "book_count": book_count }) return { "status": "success", "results": series, "count": len(series) } except Exception as e: return { "status": "error", "message": f"Failed to get series: {str(e)}" } def get_all_authors(self, limit: int = 100) -> Dict[str, Any]: """Get all authors with book counts.""" try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.cursor() query = """ SELECT a.name, COUNT(bal.book) as book_count FROM authors a LEFT JOIN books_authors_link bal ON a.id = bal.author GROUP BY a.id, a.name HAVING book_count > 0 ORDER BY a.name LIMIT ? """ cursor.execute(query, (limit,)) rows = cursor.fetchall() authors = [] for row in rows: name, book_count = row authors.append({ "author_name": name, "book_count": book_count }) return { "status": "success", "results": authors, "count": len(authors) } except Exception as e: return { "status": "error", "message": f"Failed to get authors: {str(e)}" } def get_books_batch(self, book_ids: List[int], include_full_metadata: bool = True) -> Dict[str, Any]: """Get detailed metadata for multiple books in a single optimized query.""" if not book_ids: return { "status": "error", "message": "book_ids cannot be empty" } # Use configurable batch limit max_batch = self.config.get('max_batch_books', 100) if len(book_ids) > max_batch: logger.warning(f"Batch size {len(book_ids)} exceeds limit {max_batch}, truncating") book_ids = book_ids[:max_batch] try: self.query_count = 0 # Reset for N+1 detection placeholders = ",".join(["?"] * len(book_ids)) # Optimize query based on metadata requirements if include_full_metadata and self.config.get('optimize_group_concat', True): # Full metadata with all GROUP_CONCAT operations query = f""" SELECT b.id, b.title, b.timestamp, b.pubdate, b.isbn, GROUP_CONCAT(a.name, ', ') as authors, b.series_index, s.name as series_name, c.text as comments, b.path, GROUP_CONCAT(t.name, ', ') as tags, GROUP_CONCAT(d.format, ', ') as formats FROM books b LEFT JOIN books_authors_link bal ON b.id = bal.book LEFT JOIN authors a ON bal.author = a.id LEFT JOIN books_series_link bsl ON b.id = bsl.book LEFT JOIN series s ON bsl.series = s.id LEFT JOIN comments c ON b.id = c.book LEFT JOIN books_tags_link btl ON b.id = btl.book LEFT JOIN tags t ON btl.tag = t.id LEFT JOIN data d ON b.id = d.book WHERE b.id IN ({placeholders}) GROUP BY b.id ORDER BY b.title """ else: # Minimal metadata for better performance query = f""" SELECT b.id, b.title, b.timestamp, b.pubdate, b.isbn, GROUP_CONCAT(a.name, ', ') as authors, b.series_index, s.name as series_name, NULL as comments, b.path, NULL as tags, NULL as formats FROM books b LEFT JOIN books_authors_link bal ON b.id = bal.book LEFT JOIN authors a ON bal.author = a.id LEFT JOIN books_series_link bsl ON b.id = bsl.book LEFT JOIN series s ON bsl.series = s.id WHERE b.id IN ({placeholders}) GROUP BY b.id ORDER BY b.title """ rows = self._execute_with_timing(query, book_ids) results = [] found_ids = set() for row in rows: book_id, title, timestamp, pubdate, isbn, authors, series_index, series_name, comments, path, tags, formats = row found_ids.add(book_id) results.append({ "book_id": book_id, "title": title, "authors": authors.split(", ") if authors else [], "series": { "name": series_name, "index": float(series_index) if series_index else None } if series_name else None, "isbn": isbn, "published_date": pubdate, "timestamp": timestamp, "description": comments, "path": path, "tags": tags.split(", ") if tags and include_full_metadata else [], "formats": formats.split(", ") if formats and include_full_metadata else [] }) # N+1 query detection if self.query_count > 1: logger.warning(f"Potential N+1 query pattern: {self.query_count} queries for {len(book_ids)} books") # Track not found books not_found = [bid for bid in book_ids if bid not in found_ids] errors = [{"book_id": bid, "error": "Book not found"} for bid in not_found] return { "status": "success", "results": results, "errors": errors, "summary": { "requested": len(book_ids), "found": len(results), "errors": len(errors) }, "performance": { "queries_executed": self.query_count, "full_metadata": include_full_metadata } } except sqlite3.DatabaseError as e: logger.error(f"Database error in get_books_batch: {e}") return { "status": "error", "message": f"Database error during batch book retrieval: {str(e)}", "error_type": "database_error", "sql_error_details": str(e) } except Exception as e: logger.error(f"get_books_batch failed: {e}") return { "status": "error", "message": f"Failed to get books batch: {str(e)}", "error_type": "batch_error" } def unified_search(self, query: str = "", author: str = "", title: str = "", series: str = "", formats: List[str] = None, date_range: Dict[str, str] = None, offset: int = 0, limit: int = 50, sort_by: str = "title") -> Dict[str, Any]: """Unified search with multiple filters and pagination.""" try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.cursor() conditions = [] params = [] # Count query first count_base = """ SELECT COUNT(DISTINCT b.id) FROM books b LEFT JOIN books_authors_link bal ON b.id = bal.book LEFT JOIN authors ba ON bal.author = ba.id LEFT JOIN books_series_link bsl ON b.id = bsl.book LEFT JOIN series s ON bsl.series = s.id LEFT JOIN data d ON b.id = d.book """ # Main query base_query = """ SELECT b.id, b.title, GROUP_CONCAT(ba.name, ', ') as authors, s.name as series_name, b.series_index, b.pubdate, GROUP_CONCAT(d.format, ', ') as formats FROM books b LEFT JOIN books_authors_link bal ON b.id = bal.book LEFT JOIN authors ba ON bal.author = ba.id LEFT JOIN books_series_link bsl ON b.id = bsl.book LEFT JOIN series s ON bsl.series = s.id LEFT JOIN data d ON b.id = d.book """ # Build conditions if query: conditions.append("(b.title LIKE ? OR ba.name LIKE ?)") params.extend([f"%{query}%", f"%{query}%"]) if author: conditions.append("ba.name LIKE ?") params.append(f"%{author}%") if title: conditions.append("b.title LIKE ?") params.append(f"%{title}%") if series: conditions.append("s.name LIKE ?") params.append(f"%{series}%") if formats: format_conditions = [] for fmt in formats: format_conditions.append("d.format = ?") params.append(fmt.upper()) if format_conditions: conditions.append(f"({' OR '.join(format_conditions)})") if date_range: if date_range.get('start'): conditions.append("b.pubdate >= ?") params.append(date_range['start']) if date_range.get('end'): conditions.append("b.pubdate <= ?") params.append(date_range['end']) where_clause = "" if conditions: where_clause = " WHERE " + " AND ".join(conditions) # Get total count cursor.execute(count_base + where_clause, params) total_count = cursor.fetchone()[0] # Sort options sort_mapping = { "title": "b.title", "author": "ba.name, b.title", "date": "b.pubdate DESC, b.title", "series": "s.name, b.series_index" } sort_clause = sort_mapping.get(sort_by, "b.title") # Execute main query with pagination full_query = base_query + where_clause + f" GROUP BY b.id ORDER BY {sort_clause} LIMIT ? OFFSET ?" params.extend([limit, offset]) cursor.execute(full_query, params) rows = cursor.fetchall() results = [] for row in rows: book_id, title, authors, series_name, series_index, pubdate, book_formats = row results.append({ "book_id": book_id, "title": title, "authors": authors.split(", ") if authors else [], "series": { "name": series_name, "index": float(series_index) if series_index else None } if series_name else None, "published_date": pubdate, "formats": book_formats.split(", ") if book_formats else [] }) return { "status": "success", "results": results, "pagination": { "offset": offset, "limit": limit, "total_results": total_count, "has_more": (offset + limit) < total_count }, "applied_filters": { "query": query, "author": author, "title": title, "series": series, "formats": formats, "date_range": date_range, "sort_by": sort_by } } except sqlite3.DatabaseError as e: logger.error(f"Database error in unified search: {e}") return { "status": "error", "message": f"Database error during unified search: {str(e)}", "error_type": "database_error", "sql_error_details": str(e) } except Exception as e: logger.error(f"Unexpected error in unified search: {e}") return { "status": "error", "message": f"Failed to perform unified search: {str(e)}", "error_type": "search_error" }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pshap/mcp-neolibrarian'

If you have feedback or need assistance with the MCP directory API, please join our Discord server