Skip to main content
Glama
sqlite_store.py9.37 kB
"""SQLite storage for full-text documents and metadata.""" import aiosqlite import json from datetime import datetime from typing import List, Optional from pathlib import Path from .schema import Document, DocumentMetadata class SQLiteStore: """Async SQLite storage for documents with rich metadata.""" def __init__(self, db_path: str): self.db_path = db_path self._ensure_db_dir() def _ensure_db_dir(self): """Ensure the database directory exists.""" Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) async def initialize(self): """Initialize database schema.""" async with aiosqlite.connect(self.db_path) as db: # Documents table await db.execute(""" CREATE TABLE IF NOT EXISTS documents ( id TEXT PRIMARY KEY, namespace TEXT NOT NULL, content_type TEXT NOT NULL, title TEXT, full_text TEXT NOT NULL, metadata JSON NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) await db.execute( "CREATE INDEX IF NOT EXISTS idx_namespace ON documents(namespace)" ) await db.execute( "CREATE INDEX IF NOT EXISTS idx_created_at ON documents(created_at)" ) await db.execute( "CREATE INDEX IF NOT EXISTS idx_content_type ON documents(content_type)" ) # Chunks table await db.execute(""" CREATE TABLE IF NOT EXISTS chunks ( id TEXT PRIMARY KEY, document_id TEXT NOT NULL, chunk_index INTEGER NOT NULL, chunk_text TEXT NOT NULL, qdrant_point_id TEXT NOT NULL, FOREIGN KEY (document_id) REFERENCES documents(id) ON DELETE CASCADE ) """) await db.execute( "CREATE INDEX IF NOT EXISTS idx_document_id ON chunks(document_id)" ) # Conversations table await db.execute(""" CREATE TABLE IF NOT EXISTS conversations ( id TEXT PRIMARY KEY, title TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Messages table await db.execute(""" CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, conversation_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, metadata JSON, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (conversation_id) REFERENCES conversations(id) ON DELETE CASCADE ) """) await db.execute( "CREATE INDEX IF NOT EXISTS idx_conversation_id ON messages(conversation_id)" ) await db.execute( "CREATE INDEX IF NOT EXISTS idx_message_created_at ON messages(created_at)" ) await db.commit() async def store_document( self, document_id: str, full_text: str, metadata: DocumentMetadata ) -> str: """Store a document with metadata.""" async with aiosqlite.connect(self.db_path) as db: await db.execute( """ INSERT INTO documents (id, namespace, content_type, title, full_text, metadata, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( document_id, metadata.namespace, metadata.content_type, metadata.title, full_text, json.dumps(metadata.model_dump(), default=str), metadata.created_at.isoformat(), metadata.updated_at.isoformat() ) ) await db.commit() return document_id async def store_chunk( self, chunk_id: str, document_id: str, chunk_index: int, chunk_text: str, qdrant_point_id: str ): """Store a document chunk.""" async with aiosqlite.connect(self.db_path) as db: await db.execute( """ INSERT INTO chunks (id, document_id, chunk_index, chunk_text, qdrant_point_id) VALUES (?, ?, ?, ?, ?) """, (chunk_id, document_id, chunk_index, chunk_text, qdrant_point_id) ) await db.commit() async def get_document(self, document_id: str) -> Optional[Document]: """Retrieve a document by ID.""" async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM documents WHERE id = ?", (document_id,) ) as cursor: row = await cursor.fetchone() if not row: return None metadata = DocumentMetadata(**json.loads(row["metadata"])) # Get associated chunks async with db.execute( "SELECT id FROM chunks WHERE document_id = ? ORDER BY chunk_index", (document_id,) ) as chunk_cursor: chunks = [chunk_row["id"] for chunk_row in await chunk_cursor.fetchall()] return Document( id=row["id"], full_text=row["full_text"], metadata=metadata, chunks=chunks ) async def get_documents_by_ids(self, document_ids: List[str]) -> List[Document]: """Retrieve multiple documents by IDs.""" documents = [] for doc_id in document_ids: doc = await self.get_document(doc_id) if doc: documents.append(doc) return documents async def get_chunk_text(self, chunk_id: str) -> Optional[str]: """Retrieve chunk text by ID.""" async with aiosqlite.connect(self.db_path) as db: async with db.execute( "SELECT chunk_text FROM chunks WHERE id = ?", (chunk_id,) ) as cursor: row = await cursor.fetchone() return row[0] if row else None async def search_documents( self, namespace: Optional[str] = None, content_type: Optional[str] = None, limit: int = 10 ) -> List[Document]: """Search documents by filters.""" query = "SELECT * FROM documents WHERE 1=1" params = [] if namespace: query += " AND namespace LIKE ?" params.append(f"{namespace}%") if content_type: query += " AND content_type = ?" params.append(content_type) query += " ORDER BY created_at DESC LIMIT ?" params.append(limit) async with aiosqlite.connect(self.db_path) as db: db.row_factory = aiosqlite.Row async with db.execute(query, params) as cursor: rows = await cursor.fetchall() documents = [] for row in rows: metadata = DocumentMetadata(**json.loads(row["metadata"])) documents.append( Document( id=row["id"], full_text=row["full_text"], metadata=metadata ) ) return documents async def delete_document(self, document_id: str): """Delete a document and its chunks.""" async with aiosqlite.connect(self.db_path) as db: await db.execute("DELETE FROM documents WHERE id = ?", (document_id,)) await db.commit() async def update_document( self, document_id: str, full_text: Optional[str] = None, metadata: Optional[DocumentMetadata] = None ): """Update a document.""" async with aiosqlite.connect(self.db_path) as db: if metadata: metadata.updated_at = datetime.utcnow() updates = [] params = [] if full_text is not None: updates.append("full_text = ?") params.append(full_text) if metadata: updates.append("metadata = ?") params.append(json.dumps(metadata.model_dump(), default=str)) updates.append("updated_at = ?") params.append(metadata.updated_at.isoformat()) if not updates: return query = f"UPDATE documents SET {', '.join(updates)} WHERE id = ?" params.append(document_id) await db.execute(query, params) await db.commit()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/timerickson/personal-rag-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server