"""
Local Zotero database reader for semantic search.
Provides direct SQLite access to Zotero's local database for faster semantic search
when running in local mode.
"""
import os
import sqlite3
import platform
import logging
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
from .utils import is_local_mode
@dataclass
class ZoteroItem:
"""Represents a Zotero item with text content for semantic search."""
item_id: int
key: str
item_type_id: int
item_type: Optional[str] = None
doi: Optional[str] = None
title: Optional[str] = None
abstract: Optional[str] = None
creators: Optional[str] = None
fulltext: Optional[str] = None
fulltext_source: Optional[str] = None # 'pdf' or 'html'
notes: Optional[str] = None
extra: Optional[str] = None
date_added: Optional[str] = None
date_modified: Optional[str] = None
def get_searchable_text(self) -> str:
"""
Combine all text fields into a single searchable string.
Returns:
Combined text content for semantic search indexing.
"""
parts = []
if self.title:
parts.append(f"Title: {self.title}")
if self.creators:
parts.append(f"Authors: {self.creators}")
if self.abstract:
parts.append(f"Abstract: {self.abstract}")
if self.extra:
parts.append(f"Extra: {self.extra}")
if self.notes:
parts.append(f"Notes: {self.notes}")
if self.fulltext:
# Truncate fulltext to avoid overly long documents
truncated_fulltext = self.fulltext[:5000] + "..." if len(self.fulltext) > 5000 else self.fulltext
parts.append(f"Content: {truncated_fulltext}")
return "\n\n".join(parts)
class LocalZoteroReader:
"""
Direct SQLite reader for Zotero's local database.
Provides fast access to item metadata and fulltext for semantic search
without going through the Zotero API.
"""
def __init__(self, db_path: Optional[str] = None, pdf_max_pages: Optional[int] = None):
"""
Initialize the local database reader.
Args:
db_path: Optional path to zotero.sqlite. If None, auto-detect.
"""
self.db_path = db_path or self._find_zotero_db()
self._connection: Optional[sqlite3.Connection] = None
self.pdf_max_pages: Optional[int] = pdf_max_pages
# Reduce noise from pdfminer warnings
try:
logging.getLogger("pdfminer").setLevel(logging.ERROR)
except Exception:
pass
def _find_zotero_db(self) -> str:
"""
Auto-detect the Zotero database location based on OS.
Returns:
Path to zotero.sqlite file.
Raises:
FileNotFoundError: If database cannot be located.
"""
system = platform.system()
if system == "Darwin": # macOS
db_path = Path.home() / "Zotero" / "zotero.sqlite"
elif system == "Windows":
# Try Windows 7+ location first
db_path = Path.home() / "Zotero" / "zotero.sqlite"
if not db_path.exists():
# Fallback to XP/2000 location
db_path = Path(os.path.expanduser("~/Documents and Settings")) / os.getenv("USERNAME", "") / "Zotero" / "zotero.sqlite"
else: # Linux and others
db_path = Path.home() / "Zotero" / "zotero.sqlite"
if not db_path.exists():
raise FileNotFoundError(
f"Zotero database not found at {db_path}. "
"Please ensure Zotero is installed and has been run at least once."
)
return str(db_path)
def _get_connection(self) -> sqlite3.Connection:
"""Get database connection, creating if needed."""
if self._connection is None:
# Open in read-only mode for safety
uri = f"file:{self.db_path}?mode=ro"
self._connection = sqlite3.connect(uri, uri=True)
self._connection.row_factory = sqlite3.Row
return self._connection
def _get_storage_dir(self) -> Path:
"""Return the Zotero storage directory path."""
# Default Zotero data dir on macOS/Linux is ~/Zotero
return Path.home() / "Zotero" / "storage"
def _iter_parent_attachments(self, parent_item_id: int):
"""Yield tuples (attachment_key, path, content_type) for a parent item."""
conn = self._get_connection()
query = (
"""
SELECT ia.itemID as attachmentItemID,
ia.parentItemID as parentItemID,
ia.path as path,
ia.contentType as contentType,
att.key as attachmentKey
FROM itemAttachments ia
JOIN items att ON att.itemID = ia.itemID
WHERE ia.parentItemID = ?
"""
)
for row in conn.execute(query, (parent_item_id,)):
yield row["attachmentKey"], row["path"], row["contentType"]
def _resolve_attachment_path(self, attachment_key: str, zotero_path: str) -> Optional[Path]:
"""Resolve a Zotero attachment path like 'storage:filename.pdf' to a filesystem path."""
if not zotero_path:
return None
storage_dir = self._get_storage_dir()
if zotero_path.startswith("storage:"):
rel = zotero_path.split(":", 1)[1]
# Handle nested paths if present
parts = [p for p in rel.split("/") if p]
return storage_dir / attachment_key / Path(*parts)
# External links not supported in first pass
return None
def _extract_text_from_pdf(self, file_path: Path) -> str:
"""Extract text from a PDF using pdfminer with a page cap to avoid stalls."""
try:
from pdfminer.high_level import extract_text # type: ignore
# Determine page cap: config value > env > default (10)
if isinstance(self.pdf_max_pages, int) and self.pdf_max_pages > 0:
maxpages = self.pdf_max_pages
else:
max_pages_env = os.getenv("ZOTERO_PDF_MAXPAGES")
try:
maxpages = int(max_pages_env) if max_pages_env else 10
except ValueError:
maxpages = 10
text = extract_text(str(file_path), maxpages=maxpages)
return text or ""
except Exception:
return ""
def _extract_text_from_html(self, file_path: Path) -> str:
"""Extract text from HTML using markitdown if available; fallback to stripping tags."""
# Try markitdown first
try:
from markitdown import MarkItDown
md = MarkItDown()
result = md.convert(str(file_path))
return result.text_content or ""
except Exception:
pass
# Fallback using a simple parser
try:
from bs4 import BeautifulSoup # type: ignore
html = file_path.read_text(errors="ignore")
return BeautifulSoup(html, "html.parser").get_text(" ")
except Exception:
return ""
def _extract_text_from_file(self, file_path: Path) -> str:
"""Extract text content from a file based on extension, with fallbacks."""
suffix = file_path.suffix.lower()
if suffix == ".pdf":
return self._extract_text_from_pdf(file_path)
if suffix in {".html", ".htm"}:
return self._extract_text_from_html(file_path)
# Generic best-effort
try:
return file_path.read_text(errors="ignore")
except Exception:
return ""
def _extract_fulltext_for_item(self, item_id: int) -> Optional[tuple[str, str]]:
"""Attempt to extract fulltext and source from the item's best attachment.
Preference: use PDF when available; fall back to HTML when no PDF exists.
Returns (text, source) where source is 'pdf' or 'html'.
"""
best_pdf = None
best_html = None
for key, path, ctype in self._iter_parent_attachments(item_id):
resolved = self._resolve_attachment_path(key, path or "")
if not resolved or not resolved.exists():
continue
if ctype == "application/pdf" and best_pdf is None:
best_pdf = resolved
elif (ctype or "").startswith("text/html") and best_html is None:
best_html = resolved
# Prefer PDF, otherwise fall back to HTML
target = best_pdf or best_html
if not target:
return None
text = self._extract_text_from_file(target)
if not text:
return None
# Truncate to keep embeddings reasonable
source = "pdf" if target.suffix.lower() == ".pdf" else ("html" if target.suffix.lower() in {".html", ".htm"} else "file")
return (text[:10000], source)
def close(self):
"""Close database connection."""
if self._connection:
self._connection.close()
self._connection = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def get_item_count(self) -> int:
"""
Get total count of non-attachment items.
Returns:
Number of items in the library.
"""
conn = self._get_connection()
cursor = conn.execute(
"""
SELECT COUNT(*)
FROM items i
JOIN itemTypes it ON i.itemTypeID = it.itemTypeID
WHERE it.typeName NOT IN ('attachment', 'note', 'annotation')
"""
)
return cursor.fetchone()[0]
def get_items_with_text(self, limit: Optional[int] = None, include_fulltext: bool = False) -> List[ZoteroItem]:
"""
Get all items with their text content for semantic search.
Args:
limit: Optional limit on number of items to return.
Returns:
List of ZoteroItem objects with text content.
"""
conn = self._get_connection()
# Query to get items with their text content (simplified for now)
query = """
SELECT
i.itemID,
i.key,
i.itemTypeID,
it.typeName as item_type,
i.dateAdded,
i.dateModified,
title_val.value as title,
abstract_val.value as abstract,
extra_val.value as extra,
doi_val.value as doi,
GROUP_CONCAT(n.note, ' ') as notes,
GROUP_CONCAT(
CASE
WHEN c.firstName IS NOT NULL AND c.lastName IS NOT NULL
THEN c.lastName || ', ' || c.firstName
WHEN c.lastName IS NOT NULL
THEN c.lastName
ELSE NULL
END, '; '
) as creators
FROM items i
JOIN itemTypes it ON i.itemTypeID = it.itemTypeID
-- Get title
LEFT JOIN itemData title_data ON i.itemID = title_data.itemID AND title_data.fieldID = 1
LEFT JOIN itemDataValues title_val ON title_data.valueID = title_val.valueID
-- Get abstract
LEFT JOIN itemData abstract_data ON i.itemID = abstract_data.itemID AND abstract_data.fieldID = 2
LEFT JOIN itemDataValues abstract_val ON abstract_data.valueID = abstract_val.valueID
-- Get extra field
LEFT JOIN itemData extra_data ON i.itemID = extra_data.itemID AND extra_data.fieldID = 16
LEFT JOIN itemDataValues extra_val ON extra_data.valueID = extra_val.valueID
-- Get DOI field via fields table
LEFT JOIN fields doi_f ON doi_f.fieldName = 'DOI'
LEFT JOIN itemData doi_data ON i.itemID = doi_data.itemID AND doi_data.fieldID = doi_f.fieldID
LEFT JOIN itemDataValues doi_val ON doi_data.valueID = doi_val.valueID
-- Get notes
LEFT JOIN itemNotes n ON i.itemID = n.parentItemID OR i.itemID = n.itemID
-- Get creators
LEFT JOIN itemCreators ic ON i.itemID = ic.itemID
LEFT JOIN creators c ON ic.creatorID = c.creatorID
WHERE it.typeName NOT IN ('attachment', 'note', 'annotation')
GROUP BY i.itemID, i.key, i.itemTypeID, it.typeName, i.dateAdded, i.dateModified,
title_val.value, abstract_val.value, extra_val.value
ORDER BY i.dateModified DESC
"""
if limit:
query += f" LIMIT {limit}"
cursor = conn.execute(query)
items = []
for row in cursor:
item = ZoteroItem(
item_id=row['itemID'],
key=row['key'],
item_type_id=row['itemTypeID'],
item_type=row['item_type'],
doi=row['doi'],
title=row['title'],
abstract=row['abstract'],
creators=row['creators'],
fulltext=(res := (self._extract_fulltext_for_item(row['itemID']) if include_fulltext else None)) and res[0],
fulltext_source=res[1] if include_fulltext and res else None,
notes=row['notes'],
extra=row['extra'],
date_added=row['dateAdded'],
date_modified=row['dateModified']
)
items.append(item)
return items
# Public helper to extract fulltext on demand for a specific item
def extract_fulltext_for_item(self, item_id: int) -> Optional[tuple[str, str]]:
return self._extract_fulltext_for_item(item_id)
def get_item_by_key(self, key: str) -> Optional[ZoteroItem]:
"""
Get a specific item by its Zotero key.
Args:
key: The Zotero item key.
Returns:
ZoteroItem if found, None otherwise.
"""
items = self.get_items_with_text()
for item in items:
if item.key == key:
return item
return None
def search_items_by_text(self, query: str, limit: int = 50) -> List[ZoteroItem]:
"""
Simple text search through item content.
Args:
query: Search query string.
limit: Maximum number of results.
Returns:
List of matching ZoteroItem objects.
"""
items = self.get_items_with_text()
matching_items = []
query_lower = query.lower()
for item in items:
searchable_text = item.get_searchable_text().lower()
if query_lower in searchable_text:
matching_items.append(item)
if len(matching_items) >= limit:
break
return matching_items
def get_local_zotero_reader() -> Optional[LocalZoteroReader]:
"""
Get a LocalZoteroReader instance if in local mode.
Returns:
LocalZoteroReader instance if in local mode and database exists,
None otherwise.
"""
if not is_local_mode():
return None
try:
return LocalZoteroReader()
except FileNotFoundError:
return None
def is_local_db_available() -> bool:
"""
Check if local Zotero database is available.
Returns:
True if local database can be accessed, False otherwise.
"""
reader = get_local_zotero_reader()
if reader:
reader.close()
return True
return False