"""Ingest and index Shopify Liquid documentation."""
import sqlite3
from pathlib import Path
from typing import List, Dict
import logging
from .config import DB_PATH, DOCS_PATH, FTS_TABLE, DOCS_TABLE
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def read_markdown_file(file_path: Path) -> Dict[str, str]:
"""Read a markdown file and extract metadata."""
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# Extract title from first # heading
title = ""
for line in content.split("\n"):
if line.startswith("# "):
title = line[2:].strip()
break
# Determine category from path
category = file_path.parent.name
if category == "shopify-liquid-docs":
category = "overview"
# Get the item name from filename
name = file_path.stem
return {
"name": name,
"title": title or name,
"category": category,
"content": content,
"path": str(file_path.relative_to(DOCS_PATH.parent)),
}
def create_database():
"""Create SQLite database with FTS5 indexing."""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create main docs table
cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS {DOCS_TABLE} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
title TEXT NOT NULL,
category TEXT NOT NULL,
content TEXT NOT NULL,
path TEXT NOT NULL,
UNIQUE(category, name)
)
"""
)
# Create FTS5 virtual table for full-text search
cursor.execute(
f"""
CREATE VIRTUAL TABLE IF NOT EXISTS {FTS_TABLE} USING fts5(
name,
title,
category,
content,
content={DOCS_TABLE},
content_rowid=id
)
"""
)
# Create triggers to keep FTS index in sync
cursor.execute(
f"""
CREATE TRIGGER IF NOT EXISTS {DOCS_TABLE}_ai AFTER INSERT ON {DOCS_TABLE} BEGIN
INSERT INTO {FTS_TABLE}(rowid, name, title, category, content)
VALUES (new.id, new.name, new.title, new.category, new.content);
END
"""
)
cursor.execute(
f"""
CREATE TRIGGER IF NOT EXISTS {DOCS_TABLE}_ad AFTER DELETE ON {DOCS_TABLE} BEGIN
INSERT INTO {FTS_TABLE}({FTS_TABLE}, rowid, name, title, category, content)
VALUES('delete', old.id, old.name, old.title, old.category, old.content);
END
"""
)
cursor.execute(
f"""
CREATE TRIGGER IF NOT EXISTS {DOCS_TABLE}_au AFTER UPDATE ON {DOCS_TABLE} BEGIN
INSERT INTO {FTS_TABLE}({FTS_TABLE}, rowid, name, title, category, content)
VALUES('delete', old.id, old.name, old.title, old.category, old.content);
INSERT INTO {FTS_TABLE}(rowid, name, title, category, content)
VALUES (new.id, new.name, new.title, new.category, new.content);
END
"""
)
conn.commit()
return conn
def index_documentation(force: bool = False) -> int:
"""Index all markdown documentation files.
Args:
force: If True, clear existing docs and reindex everything
Returns:
Number of documents indexed
"""
if not DOCS_PATH.exists():
logger.error(f"Documentation path does not exist: {DOCS_PATH}")
return 0
conn = create_database()
cursor = conn.cursor()
# Check if already indexed
cursor.execute(f"SELECT COUNT(*) FROM {DOCS_TABLE}")
existing_count = cursor.fetchone()[0]
if existing_count > 0 and not force:
logger.info(
f"Database already contains {existing_count} documents. Use force=True to reindex."
)
conn.close()
return existing_count
# Clear existing data if forcing reindex
if force and existing_count > 0:
logger.info("Clearing existing documentation...")
cursor.execute(f"DELETE FROM {DOCS_TABLE}")
conn.commit()
# Find all markdown files
md_files: List[Path] = []
for pattern in ["tags/*.md", "filters/*.md", "objects/*.md", "*.md"]:
md_files.extend(DOCS_PATH.glob(pattern))
# Filter out index/report files
md_files = [
f
for f in md_files
if f.stem
not in [
"INDEX",
"README",
"MASTER_INDEX",
"DOWNLOAD_REPORT",
"DOWNLOAD_STATUS",
"REMAINING_URLS",
"COMPLETION_REPORT",
"00-overview",
"01-basics",
]
]
logger.info(f"Found {len(md_files)} documentation files to index...")
# Index each file
indexed = 0
for md_file in md_files:
try:
doc = read_markdown_file(md_file)
cursor.execute(
f"""
INSERT OR REPLACE INTO {DOCS_TABLE} (name, title, category, content, path)
VALUES (?, ?, ?, ?, ?)
""",
(
doc["name"],
doc["title"],
doc["category"],
doc["content"],
doc["path"],
),
)
indexed += 1
if indexed % 20 == 0:
logger.info(f"Indexed {indexed}/{len(md_files)} documents...")
except Exception as e:
logger.error(f"Error indexing {md_file}: {e}")
conn.commit()
conn.close()
logger.info(f"Successfully indexed {indexed} documents into {DB_PATH}")
return indexed
def search_documentation(queries: List[str], limit: int = 10) -> List[Dict[str, str]]:
"""Search documentation using FTS5.
Args:
queries: List of search terms
limit: Maximum number of results to return
Returns:
List of matching documents with metadata
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Build FTS5 query
search_query = " OR ".join(queries[:3]) # Limit to 3 queries like Gemini example
cursor.execute(
f"""
SELECT d.name, d.title, d.category, d.content, d.path,
snippet({FTS_TABLE}, 3, '<mark>', '</mark>', '...', 64) as snippet
FROM {FTS_TABLE} fts
JOIN {DOCS_TABLE} d ON fts.rowid = d.id
WHERE {FTS_TABLE} MATCH ?
ORDER BY rank
LIMIT ?
""",
(search_query, limit),
)
results = []
for row in cursor.fetchall():
results.append(
{
"name": row[0],
"title": row[1],
"category": row[2],
"content": row[3],
"path": row[4],
"snippet": row[5],
}
)
conn.close()
return results
def get_by_category(category: str) -> List[Dict[str, str]]:
"""Get all documents in a category.
Args:
category: Category name (tags, filters, or objects)
Returns:
List of documents in that category
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute(
f"""
SELECT name, title, category, content, path
FROM {DOCS_TABLE}
WHERE category = ?
ORDER BY name
""",
(category,),
)
results = []
for row in cursor.fetchall():
results.append(
{
"name": row[0],
"title": row[1],
"category": row[2],
"content": row[3],
"path": row[4],
}
)
conn.close()
return results
def get_document(category: str, name: str) -> Dict[str, str] | None:
"""Get a specific document.
Args:
category: Category name (tags, filters, or objects)
name: Document name
Returns:
Document data or None if not found
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute(
f"""
SELECT name, title, category, content, path
FROM {DOCS_TABLE}
WHERE category = ? AND name = ?
""",
(category, name),
)
row = cursor.fetchone()
conn.close()
if row:
return {
"name": row[0],
"title": row[1],
"category": row[2],
"content": row[3],
"path": row[4],
}
return None
if __name__ == "__main__":
# Index documentation when run directly
count = index_documentation(force=True)
print(f"Indexed {count} documents")