sqlite_store.py•9.37 kB
"""SQLite storage for full-text documents and metadata."""
import aiosqlite
import json
from datetime import datetime
from typing import List, Optional
from pathlib import Path
from .schema import Document, DocumentMetadata
class SQLiteStore:
"""Async SQLite storage for documents with rich metadata."""
def __init__(self, db_path: str):
self.db_path = db_path
self._ensure_db_dir()
def _ensure_db_dir(self):
"""Ensure the database directory exists."""
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
async def initialize(self):
"""Initialize database schema."""
async with aiosqlite.connect(self.db_path) as db:
# Documents table
await db.execute("""
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
namespace TEXT NOT NULL,
content_type TEXT NOT NULL,
title TEXT,
full_text TEXT NOT NULL,
metadata JSON NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
await db.execute(
"CREATE INDEX IF NOT EXISTS idx_namespace ON documents(namespace)"
)
await db.execute(
"CREATE INDEX IF NOT EXISTS idx_created_at ON documents(created_at)"
)
await db.execute(
"CREATE INDEX IF NOT EXISTS idx_content_type ON documents(content_type)"
)
# Chunks table
await db.execute("""
CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL,
chunk_index INTEGER NOT NULL,
chunk_text TEXT NOT NULL,
qdrant_point_id TEXT NOT NULL,
FOREIGN KEY (document_id) REFERENCES documents(id) ON DELETE CASCADE
)
""")
await db.execute(
"CREATE INDEX IF NOT EXISTS idx_document_id ON chunks(document_id)"
)
# Conversations table
await db.execute("""
CREATE TABLE IF NOT EXISTS conversations (
id TEXT PRIMARY KEY,
title TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Messages table
await db.execute("""
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
conversation_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
metadata JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (conversation_id) REFERENCES conversations(id) ON DELETE CASCADE
)
""")
await db.execute(
"CREATE INDEX IF NOT EXISTS idx_conversation_id ON messages(conversation_id)"
)
await db.execute(
"CREATE INDEX IF NOT EXISTS idx_message_created_at ON messages(created_at)"
)
await db.commit()
async def store_document(
self,
document_id: str,
full_text: str,
metadata: DocumentMetadata
) -> str:
"""Store a document with metadata."""
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT INTO documents (id, namespace, content_type, title, full_text, metadata, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
document_id,
metadata.namespace,
metadata.content_type,
metadata.title,
full_text,
json.dumps(metadata.model_dump(), default=str),
metadata.created_at.isoformat(),
metadata.updated_at.isoformat()
)
)
await db.commit()
return document_id
async def store_chunk(
self,
chunk_id: str,
document_id: str,
chunk_index: int,
chunk_text: str,
qdrant_point_id: str
):
"""Store a document chunk."""
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT INTO chunks (id, document_id, chunk_index, chunk_text, qdrant_point_id)
VALUES (?, ?, ?, ?, ?)
""",
(chunk_id, document_id, chunk_index, chunk_text, qdrant_point_id)
)
await db.commit()
async def get_document(self, document_id: str) -> Optional[Document]:
"""Retrieve a document by ID."""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM documents WHERE id = ?",
(document_id,)
) as cursor:
row = await cursor.fetchone()
if not row:
return None
metadata = DocumentMetadata(**json.loads(row["metadata"]))
# Get associated chunks
async with db.execute(
"SELECT id FROM chunks WHERE document_id = ? ORDER BY chunk_index",
(document_id,)
) as chunk_cursor:
chunks = [chunk_row["id"] for chunk_row in await chunk_cursor.fetchall()]
return Document(
id=row["id"],
full_text=row["full_text"],
metadata=metadata,
chunks=chunks
)
async def get_documents_by_ids(self, document_ids: List[str]) -> List[Document]:
"""Retrieve multiple documents by IDs."""
documents = []
for doc_id in document_ids:
doc = await self.get_document(doc_id)
if doc:
documents.append(doc)
return documents
async def get_chunk_text(self, chunk_id: str) -> Optional[str]:
"""Retrieve chunk text by ID."""
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"SELECT chunk_text FROM chunks WHERE id = ?",
(chunk_id,)
) as cursor:
row = await cursor.fetchone()
return row[0] if row else None
async def search_documents(
self,
namespace: Optional[str] = None,
content_type: Optional[str] = None,
limit: int = 10
) -> List[Document]:
"""Search documents by filters."""
query = "SELECT * FROM documents WHERE 1=1"
params = []
if namespace:
query += " AND namespace LIKE ?"
params.append(f"{namespace}%")
if content_type:
query += " AND content_type = ?"
params.append(content_type)
query += " ORDER BY created_at DESC LIMIT ?"
params.append(limit)
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()
documents = []
for row in rows:
metadata = DocumentMetadata(**json.loads(row["metadata"]))
documents.append(
Document(
id=row["id"],
full_text=row["full_text"],
metadata=metadata
)
)
return documents
async def delete_document(self, document_id: str):
"""Delete a document and its chunks."""
async with aiosqlite.connect(self.db_path) as db:
await db.execute("DELETE FROM documents WHERE id = ?", (document_id,))
await db.commit()
async def update_document(
self,
document_id: str,
full_text: Optional[str] = None,
metadata: Optional[DocumentMetadata] = None
):
"""Update a document."""
async with aiosqlite.connect(self.db_path) as db:
if metadata:
metadata.updated_at = datetime.utcnow()
updates = []
params = []
if full_text is not None:
updates.append("full_text = ?")
params.append(full_text)
if metadata:
updates.append("metadata = ?")
params.append(json.dumps(metadata.model_dump(), default=str))
updates.append("updated_at = ?")
params.append(metadata.updated_at.isoformat())
if not updates:
return
query = f"UPDATE documents SET {', '.join(updates)} WHERE id = ?"
params.append(document_id)
await db.execute(query, params)
await db.commit()