"""
Database schema management for PostgreSQL with pg-vector support.
This module provides utilities to initialize, manage, and migrate the PostgreSQL
database schema for the Zotero MCP semantic search functionality.
"""
import logging
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from psycopg2.extras import RealDictCursor
from typing import Dict, Any, Optional, List
import json
import hashlib
logger = logging.getLogger(__name__)
class DatabaseManager:
"""Manages PostgreSQL database schema and migrations for Zotero MCP."""
SCHEMA_VERSION = "1.0.0"
def __init__(self, config: Dict[str, Any], embedding_dimension: Optional[int] = None):
"""
Initialize database manager with connection config.
Args:
config: Database connection configuration dict
embedding_dimension: Vector dimension for embeddings (auto-detected if None)
"""
self.config = config
self.host = config["host"]
self.port = config["port"]
self.database = config["database"]
self.username = config["username"]
self.password = config["password"]
self.schema = config.get("schema", "public")
self.embedding_dimension = embedding_dimension or 1536 # Default fallback
# Connection for management operations
self._connection = None
def set_embedding_dimension(self, dimension: int) -> None:
"""Set the embedding dimension for schema creation."""
self.embedding_dimension = dimension
logger.info(f"Set embedding dimension to {dimension}")
def get_connection_string(self, database: Optional[str] = None) -> str:
"""Get PostgreSQL connection string."""
db_name = database or self.database
return (
f"host={self.host} "
f"port={self.port} "
f"dbname={db_name} "
f"user={self.username} "
f"password={self.password}"
)
def connect(self, database: Optional[str] = None) -> psycopg2.extensions.connection:
"""Create a new database connection."""
conn_str = self.get_connection_string(database)
conn = psycopg2.connect(conn_str)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
return conn
def check_database_exists(self) -> bool:
"""Check if the target database exists."""
try:
# Connect to postgres database to check if target exists
conn = self.connect("postgres")
cursor = conn.cursor()
cursor.execute(
"SELECT 1 FROM pg_database WHERE datname = %s",
(self.database,)
)
exists = cursor.fetchone() is not None
cursor.close()
conn.close()
return exists
except Exception as e:
logger.error(f"Error checking database existence: {e}")
return False
def create_database_if_not_exists(self) -> None:
"""Create database if it doesn't exist."""
if self.check_database_exists():
logger.info(f"Database {self.database} already exists")
return
try:
# Connect to postgres database to create target database
conn = self.connect("postgres")
cursor = conn.cursor()
# Create database
cursor.execute(f'CREATE DATABASE "{self.database}"')
logger.info(f"Created database {self.database}")
cursor.close()
conn.close()
except Exception as e:
logger.error(f"Error creating database: {e}")
raise
def check_pg_vector_extension(self) -> bool:
"""Check if pg-vector extension is available and enabled."""
try:
conn = self.connect()
cursor = conn.cursor()
# Check if extension is installed
cursor.execute(
"SELECT 1 FROM pg_extension WHERE extname = 'vector'"
)
installed = cursor.fetchone() is not None
cursor.close()
conn.close()
return installed
except Exception as e:
logger.error(f"Error checking pg-vector extension: {e}")
return False
def install_pg_vector_extension(self) -> None:
"""Install pg-vector extension."""
try:
conn = self.connect()
cursor = conn.cursor()
cursor.execute("CREATE EXTENSION IF NOT EXISTS vector")
logger.info("pg-vector extension enabled")
cursor.close()
conn.close()
except Exception as e:
logger.error(f"Error installing pg-vector extension: {e}")
raise
def get_current_embedding_dimension(self) -> Optional[int]:
"""Get current embedding dimension from existing table."""
try:
conn = self.connect()
cursor = conn.cursor()
# Query the vector column type to get its dimension
cursor.execute("""
SELECT atttypmod
FROM pg_attribute
WHERE attrelid = 'zotero_embeddings'::regclass
AND attname = 'embedding'
AND NOT attisdropped
""")
result = cursor.fetchone()
cursor.close()
conn.close()
if result and result[0] > 0:
return result[0] # pg-vector stores dimension in atttypmod
return None
except Exception as e:
logger.debug(f"Could not get current embedding dimension: {e}")
return None
def alter_embedding_dimension(self, new_dimension: int) -> None:
"""Alter existing table to use new embedding dimension."""
current_dim = self.get_current_embedding_dimension()
if current_dim == new_dimension:
logger.info(f"Embedding dimension already set to {new_dimension}")
return
logger.warning(f"Changing embedding dimension from {current_dim} to {new_dimension}")
logger.warning("This will require rebuilding all embeddings!")
try:
conn = self.connect()
cursor = conn.cursor()
# Drop existing vector indexes
cursor.execute("DROP INDEX IF EXISTS idx_zotero_embedding_cosine")
# Alter column type
cursor.execute(f"ALTER TABLE zotero_embeddings ALTER COLUMN embedding TYPE vector({new_dimension})")
# Recreate indexes
cursor.execute(f"""
CREATE INDEX idx_zotero_embedding_cosine
ON zotero_embeddings USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100)
""")
# Clear existing embeddings since they're now invalid
cursor.execute("DELETE FROM zotero_embeddings")
cursor.close()
conn.close()
logger.info(f"Successfully changed embedding dimension to {new_dimension}")
logger.info("All existing embeddings have been cleared - run update-db to rebuild")
except Exception as e:
logger.error(f"Error altering embedding dimension: {e}")
raise
def get_schema_sql(self) -> Dict[str, str]:
"""Get SQL statements for schema creation with dynamic embedding dimension."""
return {
"extensions": """
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
""",
"tables": f"""
CREATE TABLE IF NOT EXISTS zotero_embeddings (
id SERIAL PRIMARY KEY,
item_key VARCHAR(50) NOT NULL,
item_type VARCHAR(50) NOT NULL,
title TEXT,
content TEXT NOT NULL,
content_hash VARCHAR(64) NOT NULL,
embedding vector({self.embedding_dimension}),
embedding_model VARCHAR(100) NOT NULL DEFAULT 'unknown',
embedding_provider VARCHAR(50) NOT NULL DEFAULT 'unknown',
metadata JSONB NOT NULL DEFAULT '{{}}',
content_type VARCHAR(20) DEFAULT 'metadata',
parent_item_key VARCHAR(50),
parent_attachment_key VARCHAR(50),
chunk_index INTEGER DEFAULT 0,
chunk_total INTEGER DEFAULT 1,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS zotero_attachments (
id SERIAL PRIMARY KEY,
parent_item_key VARCHAR(50) NOT NULL,
attachment_key VARCHAR(50) UNIQUE NOT NULL,
content_type VARCHAR(100),
filename VARCHAR(500),
file_size BIGINT,
file_hash VARCHAR(64),
full_text_extracted BOOLEAN DEFAULT FALSE,
extraction_status VARCHAR(20) DEFAULT 'pending',
extraction_started_at TIMESTAMP WITH TIME ZONE,
extraction_completed_at TIMESTAMP WITH TIME ZONE,
extraction_error TEXT,
retry_count INTEGER DEFAULT 0,
chunk_count INTEGER DEFAULT 0,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS zotero_config (
id SERIAL PRIMARY KEY,
key VARCHAR(100) UNIQUE NOT NULL,
value JSONB NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS zotero_updates (
id SERIAL PRIMARY KEY,
update_type VARCHAR(50) NOT NULL,
items_processed INTEGER DEFAULT 0,
items_added INTEGER DEFAULT 0,
items_updated INTEGER DEFAULT 0,
items_failed INTEGER DEFAULT 0,
started_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP WITH TIME ZONE,
status VARCHAR(20) DEFAULT 'running',
error_message TEXT
);
""",
"indexes": """
CREATE INDEX IF NOT EXISTS idx_zotero_item_key
ON zotero_embeddings(item_key);
CREATE INDEX IF NOT EXISTS idx_zotero_item_type
ON zotero_embeddings(item_type);
CREATE INDEX IF NOT EXISTS idx_zotero_content_hash
ON zotero_embeddings(content_hash);
CREATE INDEX IF NOT EXISTS idx_zotero_embedding_model
ON zotero_embeddings(embedding_model);
CREATE INDEX IF NOT EXISTS idx_zotero_embedding_provider
ON zotero_embeddings(embedding_provider);
CREATE INDEX IF NOT EXISTS idx_zotero_embedding_cosine
ON zotero_embeddings USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
CREATE INDEX IF NOT EXISTS idx_zotero_metadata_gin
ON zotero_embeddings USING gin(metadata);
CREATE INDEX IF NOT EXISTS idx_zotero_updated_at
ON zotero_embeddings(updated_at);
CREATE INDEX IF NOT EXISTS idx_zotero_updates_status
ON zotero_updates(status, started_at);
-- New indexes for full-text functionality
CREATE INDEX IF NOT EXISTS idx_zotero_content_type
ON zotero_embeddings(content_type);
CREATE INDEX IF NOT EXISTS idx_zotero_parent_item_key
ON zotero_embeddings(parent_item_key);
CREATE INDEX IF NOT EXISTS idx_zotero_parent_attachment_key
ON zotero_embeddings(parent_attachment_key);
CREATE INDEX IF NOT EXISTS idx_zotero_chunk_index
ON zotero_embeddings(parent_item_key, chunk_index);
-- Attachment table indexes
CREATE INDEX IF NOT EXISTS idx_zotero_attachments_parent_item
ON zotero_attachments(parent_item_key);
CREATE INDEX IF NOT EXISTS idx_zotero_attachments_status
ON zotero_attachments(extraction_status);
CREATE INDEX IF NOT EXISTS idx_zotero_attachments_extracted
ON zotero_attachments(full_text_extracted);
CREATE INDEX IF NOT EXISTS idx_zotero_attachments_content_type
ON zotero_attachments(content_type);
CREATE INDEX IF NOT EXISTS idx_zotero_attachments_file_hash
ON zotero_attachments(file_hash);
CREATE INDEX IF NOT EXISTS idx_zotero_attachments_updated_at
ON zotero_attachments(updated_at);
""",
"functions": """
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ language 'plpgsql';
DROP TRIGGER IF EXISTS update_zotero_embeddings_updated_at ON zotero_embeddings;
CREATE TRIGGER update_zotero_embeddings_updated_at
BEFORE UPDATE ON zotero_embeddings
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
DROP TRIGGER IF EXISTS update_zotero_attachments_updated_at ON zotero_attachments;
CREATE TRIGGER update_zotero_attachments_updated_at
BEFORE UPDATE ON zotero_attachments
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
"""
}
def create_schema(self) -> None:
"""Create all required tables, indexes, and functions."""
try:
conn = self.connect()
cursor = conn.cursor()
schema_sql = self.get_schema_sql()
# Execute schema creation in order
for section_name, sql in schema_sql.items():
logger.info(f"Creating {section_name}...")
cursor.execute(sql)
# Set initial schema version and embedding dimension
cursor.execute("""
INSERT INTO zotero_config (key, value)
VALUES ('schema_version', %s)
ON CONFLICT (key) DO UPDATE SET
value = EXCLUDED.value,
updated_at = CURRENT_TIMESTAMP
""", (json.dumps(self.SCHEMA_VERSION),))
cursor.execute("""
INSERT INTO zotero_config (key, value)
VALUES ('embedding_dimension', %s)
ON CONFLICT (key) DO UPDATE SET
value = EXCLUDED.value,
updated_at = CURRENT_TIMESTAMP
""", (json.dumps(self.embedding_dimension),))
cursor.close()
conn.close()
logger.info(f"Database schema created successfully with {self.embedding_dimension}D embeddings")
except Exception as e:
logger.error(f"Error creating schema: {e}")
raise
def get_schema_version(self) -> Optional[str]:
"""Get current schema version from config table."""
try:
conn = self.connect()
cursor = conn.cursor()
cursor.execute(
"SELECT value FROM zotero_config WHERE key = 'schema_version'"
)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
return json.loads(result[0])
return None
except Exception as e:
logger.error(f"Error getting schema version: {e}")
return None
def get_stored_embedding_dimension(self) -> Optional[int]:
"""Get stored embedding dimension from config table."""
try:
conn = self.connect()
cursor = conn.cursor()
cursor.execute(
"SELECT value FROM zotero_config WHERE key = 'embedding_dimension'"
)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
return json.loads(result[0])
return None
except Exception as e:
logger.debug(f"Error getting stored embedding dimension: {e}")
return None
def initialize_database(self, embedding_provider=None) -> None:
"""
Initialize database with required extensions and schema.
This is the main method to set up a fresh database.
Args:
embedding_provider: EmbeddingProvider instance to get dimension from
"""
logger.info("Initializing Zotero MCP database...")
# Auto-detect embedding dimension if provider is available
if embedding_provider is not None:
try:
detected_dimension = embedding_provider.get_embedding_dimension()
if detected_dimension > 0:
self.embedding_dimension = detected_dimension
logger.info(f"Auto-detected embedding dimension: {detected_dimension}")
except Exception as e:
logger.warning(f"Could not auto-detect embedding dimension: {e}")
# Create database if needed
self.create_database_if_not_exists()
# Install extensions
self.install_pg_vector_extension()
# Check if schema needs dimension update
stored_dimension = self.get_stored_embedding_dimension()
current_table_dimension = self.get_current_embedding_dimension()
if current_table_dimension and current_table_dimension != self.embedding_dimension:
logger.info(f"Embedding dimension changed: {current_table_dimension} -> {self.embedding_dimension}")
self.alter_embedding_dimension(self.embedding_dimension)
elif stored_dimension and stored_dimension != self.embedding_dimension:
logger.info(f"Config embedding dimension changed: {stored_dimension} -> {self.embedding_dimension}")
self.alter_embedding_dimension(self.embedding_dimension)
# Create schema
self.create_schema()
logger.info("Database initialization complete")
def drop_schema(self) -> None:
"""Drop all Zotero MCP tables (for testing/cleanup)."""
try:
conn = self.connect()
cursor = conn.cursor()
# Drop tables in reverse dependency order
tables = [
"zotero_updates",
"zotero_config",
"zotero_attachments",
"zotero_embeddings"
]
for table in tables:
cursor.execute(f"DROP TABLE IF EXISTS {table} CASCADE")
logger.info(f"Dropped table {table}")
cursor.close()
conn.close()
except Exception as e:
logger.error(f"Error dropping schema: {e}")
raise
def vacuum_and_reindex(self) -> Dict[str, Any]:
"""Optimize database performance and return statistics."""
try:
conn = self.connect()
cursor = conn.cursor()
# Get table sizes before optimization
cursor.execute("""
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size
FROM pg_tables
WHERE tablename LIKE 'zotero_%'
""")
sizes_before = cursor.fetchall()
# Vacuum and reindex
cursor.execute("VACUUM ANALYZE zotero_embeddings")
cursor.execute("REINDEX TABLE zotero_embeddings")
# Get statistics
cursor.execute("""
SELECT
COUNT(*) as total_embeddings,
COUNT(DISTINCT embedding_model) as unique_models,
COUNT(DISTINCT embedding_provider) as unique_providers,
pg_size_pretty(pg_total_relation_size('zotero_embeddings')) as table_size
FROM zotero_embeddings
""")
stats = cursor.fetchone()
cursor.close()
conn.close()
return {
"total_embeddings": stats[0] if stats else 0,
"unique_models": stats[1] if stats else 0,
"unique_providers": stats[2] if stats else 0,
"table_size": stats[3] if stats else "0 bytes",
"optimization": "completed"
}
except Exception as e:
logger.error(f"Error during vacuum and reindex: {e}")
return {"error": str(e)}
def get_database_status(self) -> Dict[str, Any]:
"""Get comprehensive database status information."""
try:
conn = self.connect()
cursor = conn.cursor(cursor_factory=RealDictCursor)
# Check pg-vector extension
cursor.execute(
"SELECT extversion FROM pg_extension WHERE extname = 'vector'"
)
vector_result = cursor.fetchone()
vector_version = vector_result['extversion'] if vector_result else None
# Get embedding statistics
cursor.execute("""
SELECT
COUNT(*) as total_items,
COUNT(DISTINCT item_type) as unique_types,
COUNT(DISTINCT embedding_model) as unique_models,
COUNT(DISTINCT embedding_provider) as unique_providers,
pg_size_pretty(pg_database_size(current_database())) as database_size
FROM zotero_embeddings
""")
stats = cursor.fetchone()
# Get recent update info
cursor.execute("""
SELECT status, items_processed, started_at, completed_at
FROM zotero_updates
ORDER BY started_at DESC
LIMIT 1
""")
last_update = cursor.fetchone()
# Get embedding dimension info
current_table_dim = self.get_current_embedding_dimension()
stored_config_dim = self.get_stored_embedding_dimension()
cursor.close()
conn.close()
return {
"database": self.database,
"host": f"{self.host}:{self.port}",
"schema_version": self.get_schema_version(),
"pg_vector_version": vector_version,
"embedding_dimension": {
"current_table": current_table_dim,
"stored_config": stored_config_dim,
"manager_setting": self.embedding_dimension
},
"total_items": stats['total_items'] if stats else 0,
"unique_types": stats['unique_types'] if stats else 0,
"unique_models": stats['unique_models'] if stats else 0,
"unique_providers": stats['unique_providers'] if stats else 0,
"database_size": stats['database_size'] if stats else "0 bytes",
"last_update": dict(last_update) if last_update else None,
"status": "healthy"
}
except Exception as e:
logger.error(f"Error getting database status: {e}")
return {
"database": self.database,
"host": f"{self.host}:{self.port}",
"status": "error",
"error": str(e)
}
def upsert_attachment(self, parent_item_key: str, attachment_key: str,
content_type: str, filename: str, file_size: int,
file_hash: str) -> bool:
"""
Insert or update attachment tracking information.
Args:
parent_item_key: The Zotero key of the parent item
attachment_key: The Zotero key of the attachment
content_type: MIME type of the attachment
filename: Original filename
file_size: File size in bytes
file_hash: SHA-256 hash of the file content
Returns:
True if this is a new attachment or file has changed, False if unchanged
"""
try:
conn = self.connect()
cursor = conn.cursor()
# Check if attachment exists and if file has changed
cursor.execute("""
SELECT file_hash, extraction_status FROM zotero_attachments
WHERE attachment_key = %s
""", (attachment_key,))
existing = cursor.fetchone()
file_changed = True
if existing:
old_hash, old_status = existing
if old_hash == file_hash:
file_changed = False
logger.debug(f"Attachment {attachment_key} unchanged")
else:
logger.info(f"Attachment {attachment_key} file changed, will reprocess")
# Upsert attachment record
cursor.execute("""
INSERT INTO zotero_attachments
(parent_item_key, attachment_key, content_type, filename, file_size, file_hash, full_text_extracted, extraction_status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (attachment_key) DO UPDATE SET
parent_item_key = EXCLUDED.parent_item_key,
content_type = EXCLUDED.content_type,
filename = EXCLUDED.filename,
file_size = EXCLUDED.file_size,
file_hash = EXCLUDED.file_hash,
full_text_extracted = CASE
WHEN zotero_attachments.file_hash != EXCLUDED.file_hash
THEN FALSE
ELSE zotero_attachments.full_text_extracted
END,
extraction_status = CASE
WHEN zotero_attachments.file_hash != EXCLUDED.file_hash
THEN 'pending'
ELSE zotero_attachments.extraction_status
END,
retry_count = CASE
WHEN zotero_attachments.file_hash != EXCLUDED.file_hash
THEN 0
ELSE zotero_attachments.retry_count
END,
extraction_error = CASE
WHEN zotero_attachments.file_hash != EXCLUDED.file_hash
THEN NULL
ELSE zotero_attachments.extraction_error
END,
updated_at = CURRENT_TIMESTAMP
""", (parent_item_key, attachment_key, content_type, filename,
file_size, file_hash, False, 'pending'))
cursor.close()
conn.close()
return file_changed or not existing
except Exception as e:
logger.error(f"Error upserting attachment {attachment_key}: {e}")
return False
def get_unprocessed_attachments(self, supported_content_types: List[str],
limit: int = 50) -> List[Dict[str, Any]]:
"""
Get attachments that need full-text processing.
Args:
supported_content_types: List of MIME types to process
limit: Maximum number of attachments to return
Returns:
List of attachment records needing processing
"""
try:
conn = self.connect()
cursor = conn.cursor(cursor_factory=RealDictCursor)
# Get attachments that need processing
cursor.execute("""
SELECT
parent_item_key, attachment_key, content_type,
filename, file_size, file_hash, extraction_status,
retry_count, extraction_error, created_at, updated_at
FROM zotero_attachments
WHERE
content_type = ANY(%s)
AND (
extraction_status = 'pending'
OR (extraction_status = 'failed' AND retry_count < 3)
)
ORDER BY
CASE extraction_status
WHEN 'pending' THEN 1
WHEN 'failed' THEN 2
ELSE 3
END,
created_at ASC
LIMIT %s
""", (supported_content_types, limit))
results = cursor.fetchall()
cursor.close()
conn.close()
return [dict(row) for row in results]
except Exception as e:
logger.error(f"Error getting unprocessed attachments: {e}")
return []
def update_attachment_status(self, attachment_key: str, status: str,
chunk_count: int = 0, error_message: str = None) -> None:
"""
Update attachment processing status.
Args:
attachment_key: The Zotero attachment key
status: New status ('processing', 'completed', 'failed', 'skipped')
chunk_count: Number of text chunks created (if completed)
error_message: Error details (if failed)
"""
try:
conn = self.connect()
cursor = conn.cursor()
if status == 'processing':
cursor.execute("""
UPDATE zotero_attachments
SET extraction_status = %s,
extraction_started_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE attachment_key = %s
""", (status, attachment_key))
elif status == 'completed':
cursor.execute("""
UPDATE zotero_attachments
SET extraction_status = %s,
full_text_extracted = TRUE,
chunk_count = %s,
extraction_completed_at = CURRENT_TIMESTAMP,
extraction_error = NULL,
updated_at = CURRENT_TIMESTAMP
WHERE attachment_key = %s
""", (status, chunk_count, attachment_key))
elif status == 'failed':
cursor.execute("""
UPDATE zotero_attachments
SET extraction_status = %s,
retry_count = retry_count + 1,
extraction_error = %s,
updated_at = CURRENT_TIMESTAMP
WHERE attachment_key = %s
""", (status, error_message, attachment_key))
elif status == 'skipped':
cursor.execute("""
UPDATE zotero_attachments
SET extraction_status = %s,
extraction_completed_at = CURRENT_TIMESTAMP,
extraction_error = %s,
updated_at = CURRENT_TIMESTAMP
WHERE attachment_key = %s
""", (status, error_message, attachment_key))
cursor.close()
conn.close()
logger.debug(f"Updated attachment {attachment_key} status to {status}")
except Exception as e:
logger.error(f"Error updating attachment status for {attachment_key}: {e}")
def get_attachment_status(self, attachment_key: str) -> Optional[Dict[str, Any]]:
"""
Get processing status for a specific attachment.
Args:
attachment_key: The Zotero attachment key
Returns:
Attachment status information or None if not found
"""
try:
conn = self.connect()
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute("""
SELECT
parent_item_key, attachment_key, content_type, filename,
file_size, file_hash, full_text_extracted, extraction_status,
extraction_started_at, extraction_completed_at, extraction_error,
retry_count, chunk_count, created_at, updated_at
FROM zotero_attachments
WHERE attachment_key = %s
""", (attachment_key,))
result = cursor.fetchone()
cursor.close()
conn.close()
return dict(result) if result else None
except Exception as e:
logger.error(f"Error getting attachment status for {attachment_key}: {e}")
return None
def get_attachment_by_key(self, attachment_key: str) -> Optional[Dict[str, Any]]:
"""
Get attachment record by attachment key.
Args:
attachment_key: The Zotero attachment key
Returns:
Attachment record or None if not found
"""
try:
conn = self.connect()
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute("""
SELECT
parent_item_key, attachment_key, content_type, filename,
file_size, file_hash, full_text_extracted, extraction_status,
extraction_started_at, extraction_completed_at, extraction_error,
retry_count, chunk_count, created_at, updated_at
FROM zotero_attachments
WHERE attachment_key = %s
""", (attachment_key,))
result = cursor.fetchone()
cursor.close()
conn.close()
return dict(result) if result else None
except Exception as e:
logger.error(f"Error getting attachment by key {attachment_key}: {e}")
return None
def delete_attachment(self, attachment_key: str) -> bool:
"""
Delete attachment tracking record and associated embeddings.
Args:
attachment_key: The Zotero attachment key
Returns:
True if deleted successfully
"""
try:
conn = self.connect()
cursor = conn.cursor()
# Delete associated full-text embeddings
cursor.execute("""
DELETE FROM zotero_embeddings
WHERE parent_attachment_key = %s
""", (attachment_key,))
# Delete attachment record
cursor.execute("""
DELETE FROM zotero_attachments
WHERE attachment_key = %s
""", (attachment_key,))
deleted_count = cursor.rowcount
cursor.close()
conn.close()
logger.info(f"Deleted attachment {attachment_key} and associated embeddings")
return deleted_count > 0
except Exception as e:
logger.error(f"Error deleting attachment {attachment_key}: {e}")
return False
def count_tracked_attachments(self) -> int:
"""
Get count of tracked attachments in database.
Returns:
Total number of tracked attachments
"""
try:
conn = self.connect()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM zotero_attachments")
result = cursor.fetchone()
cursor.close()
conn.close()
return result[0] if result else 0
except Exception as e:
logger.error(f"Error counting tracked attachments: {e}")
return 0
def get_attachment_statistics(self) -> Dict[str, Any]:
"""
Get statistics about attachment processing.
Returns:
Dictionary with processing statistics
"""
try:
conn = self.connect()
cursor = conn.cursor(cursor_factory=RealDictCursor)
# Get overall statistics
cursor.execute("""
SELECT
COUNT(*) as total_attachments,
COUNT(*) FILTER (WHERE full_text_extracted = TRUE) as extracted,
COUNT(*) FILTER (WHERE extraction_status = 'pending') as pending,
COUNT(*) FILTER (WHERE extraction_status = 'processing') as processing,
COUNT(*) FILTER (WHERE extraction_status = 'failed') as failed,
COUNT(*) FILTER (WHERE extraction_status = 'skipped') as skipped,
COUNT(DISTINCT content_type) as unique_content_types,
SUM(file_size) as total_file_size,
SUM(chunk_count) as total_chunks
FROM zotero_attachments
""")
stats = cursor.fetchone()
# Get content type breakdown
cursor.execute("""
SELECT
content_type,
COUNT(*) as count,
COUNT(*) FILTER (WHERE full_text_extracted = TRUE) as extracted_count,
SUM(file_size) as total_size,
SUM(chunk_count) as total_chunks
FROM zotero_attachments
GROUP BY content_type
ORDER BY count DESC
""")
content_types = cursor.fetchall()
# Get recent activity
cursor.execute("""
SELECT
DATE_TRUNC('day', extraction_completed_at) as date,
COUNT(*) as completed_count
FROM zotero_attachments
WHERE extraction_completed_at > CURRENT_DATE - INTERVAL '7 days'
AND extraction_status = 'completed'
GROUP BY DATE_TRUNC('day', extraction_completed_at)
ORDER BY date DESC
""")
recent_activity = cursor.fetchall()
cursor.close()
conn.close()
return {
"total_attachments": stats['total_attachments'] if stats else 0,
"extracted": stats['extracted'] if stats else 0,
"pending": stats['pending'] if stats else 0,
"processing": stats['processing'] if stats else 0,
"failed": stats['failed'] if stats else 0,
"skipped": stats['skipped'] if stats else 0,
"unique_content_types": stats['unique_content_types'] if stats else 0,
"total_file_size": stats['total_file_size'] if stats else 0,
"total_chunks": stats['total_chunks'] if stats else 0,
"content_type_breakdown": [dict(row) for row in content_types],
"recent_activity": [dict(row) for row in recent_activity]
}
except Exception as e:
logger.error(f"Error getting attachment statistics: {e}")
return {"error": str(e)}
def create_database_manager(config: Dict[str, Any], embedding_dimension: Optional[int] = None) -> DatabaseManager:
"""
Create a DatabaseManager instance from configuration.
Args:
config: Database configuration dictionary
embedding_dimension: Vector dimension for embeddings (auto-detected if None)
Returns:
Configured DatabaseManager instance
"""
return DatabaseManager(config, embedding_dimension)