Skip to main content
Glama

LoreKeeper MCP

by frap129
plan.md63.1 kB
# Entity-Based Cache Implementation Plan **Goal:** Replace URL-based caching with entity-centric storage enabling parallel queries, offline fallback, and infinite TTL for D&D game data. **Architecture:** Store each entity (spell, monster, item) individually in type-specific SQLite tables with slug as primary key. Query cache in parallel with API calls, merge results, and serve from cache during network failures. No TTL expiration—entities cached indefinitely unless explicitly updated. **Tech Stack:** SQLite with WAL mode, aiosqlite for async operations, asyncio.gather for parallel queries, JSON blob storage with indexed filter fields. --- ## Phase 1: Cache Schema & Core Operations ### Task 1.1: Create entity table schema definitions **Files:** - Create: `src/lorekeeper_mcp/cache/schema.py` - Test: `tests/test_cache/test_schema.py` **Step 1: Write the failing test** ```python # tests/test_cache/test_schema.py """Tests for cache schema definitions.""" import pytest from lorekeeper_mcp.cache.schema import ( ENTITY_TYPES, SCHEMA_VERSION, get_table_name, get_create_table_sql, ) def test_schema_version_exists(): """Schema version constant is defined.""" assert isinstance(SCHEMA_VERSION, int) assert SCHEMA_VERSION > 0 def test_entity_types_defined(): """All entity types are listed.""" assert "spells" in ENTITY_TYPES assert "monsters" in ENTITY_TYPES assert "weapons" in ENTITY_TYPES assert "armor" in ENTITY_TYPES assert len(ENTITY_TYPES) >= 4 # At least these core types def test_get_table_name(): """Table names match entity types.""" assert get_table_name("spells") == "spells" assert get_table_name("monsters") == "monsters" def test_get_create_table_sql_for_spells(): """Spells table has correct schema.""" sql = get_create_table_sql("spells") assert "CREATE TABLE spells" in sql assert "slug TEXT PRIMARY KEY" in sql assert "name TEXT NOT NULL" in sql assert "data TEXT NOT NULL" in sql # JSON blob assert "source_api TEXT NOT NULL" in sql assert "created_at REAL NOT NULL" in sql assert "updated_at REAL NOT NULL" in sql # Spell-specific indexed fields assert "level INTEGER" in sql assert "school TEXT" in sql assert "concentration BOOLEAN" in sql assert "ritual BOOLEAN" in sql def test_get_create_table_sql_for_monsters(): """Monsters table has correct schema with monster-specific fields.""" sql = get_create_table_sql("monsters") assert "CREATE TABLE monsters" in sql assert "slug TEXT PRIMARY KEY" in sql # Monster-specific indexed fields assert "challenge_rating TEXT" in sql assert "type TEXT" in sql assert "size TEXT" in sql ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_cache/test_schema.py -v` Expected: FAIL with "ModuleNotFoundError: No module named 'lorekeeper_mcp.cache.schema'" **Step 3: Write minimal implementation** ```python # src/lorekeeper_mcp/cache/schema.py """Cache schema definitions for entity-based tables.""" SCHEMA_VERSION = 1 ENTITY_TYPES = [ "spells", "monsters", "weapons", "armor", "classes", "races", "backgrounds", "feats", "conditions", "rules", "rule_sections", ] # Indexed fields per entity type for filtering INDEXED_FIELDS = { "spells": ["level", "school", "concentration", "ritual"], "monsters": ["challenge_rating", "type", "size"], "weapons": ["category", "damage_type"], "armor": ["category", "armor_class"], "classes": ["hit_die"], "races": ["size"], "backgrounds": [], "feats": [], "conditions": [], "rules": ["parent"], "rule_sections": ["parent"], } def get_table_name(entity_type: str) -> str: """Get table name for entity type.""" return entity_type def get_create_table_sql(entity_type: str) -> str: """Generate CREATE TABLE SQL for entity type.""" if entity_type not in ENTITY_TYPES: raise ValueError(f"Unknown entity type: {entity_type}") indexed_fields = INDEXED_FIELDS.get(entity_type, []) # Build indexed field definitions field_definitions = [] for field in indexed_fields: # Determine field type based on common patterns if field in ["level", "hit_die", "armor_class"]: field_type = "INTEGER" elif field in ["concentration", "ritual"]: field_type = "BOOLEAN" else: field_type = "TEXT" field_definitions.append(f" {field} {field_type}") fields_sql = ",\n".join(field_definitions) if fields_sql: fields_sql = ",\n" + fields_sql return f"""CREATE TABLE IF NOT EXISTS {entity_type} ( slug TEXT PRIMARY KEY, name TEXT NOT NULL, data TEXT NOT NULL, source_api TEXT NOT NULL, created_at REAL NOT NULL, updated_at REAL NOT NULL{fields_sql} )""" ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_cache/test_schema.py -v` Expected: PASS (all tests pass) **Step 5: Commit** ```bash git add tests/test_cache/test_schema.py src/lorekeeper_mcp/cache/schema.py git commit -m "feat(cache): add entity table schema definitions" ``` --- ### Task 1.2: Implement schema initialization **Files:** - Modify: `src/lorekeeper_mcp/cache/schema.py` - Test: `tests/test_cache/test_schema.py` **Step 1: Write the failing test** ```python # Add to tests/test_cache/test_schema.py import aiosqlite import pytest from pathlib import Path import tempfile from lorekeeper_mcp.cache.schema import init_entity_cache, get_index_sql @pytest.mark.asyncio async def test_init_entity_cache_creates_tables(): """Initialize creates all entity tables.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.db" await init_entity_cache(str(db_path)) # Verify database exists assert db_path.exists() # Verify tables created async with aiosqlite.connect(db_path) as db: cursor = await db.execute( "SELECT name FROM sqlite_master WHERE type='table'" ) tables = {row[0] for row in await cursor.fetchall()} assert "spells" in tables assert "monsters" in tables assert "weapons" in tables assert "armor" in tables @pytest.mark.asyncio async def test_init_entity_cache_creates_indexes(): """Initialize creates indexes on filtered fields.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.db" await init_entity_cache(str(db_path)) async with aiosqlite.connect(db_path) as db: cursor = await db.execute( "SELECT name FROM sqlite_master WHERE type='index'" ) indexes = {row[0] for row in await cursor.fetchall()} # Check for spell indexes assert "idx_spells_level" in indexes assert "idx_spells_school" in indexes # Check for monster indexes assert "idx_monsters_challenge_rating" in indexes assert "idx_monsters_type" in indexes @pytest.mark.asyncio async def test_init_entity_cache_enables_wal_mode(): """Initialize enables WAL mode for concurrent access.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.db" await init_entity_cache(str(db_path)) async with aiosqlite.connect(db_path) as db: cursor = await db.execute("PRAGMA journal_mode") mode = (await cursor.fetchone())[0] assert mode.lower() == "wal" def test_get_index_sql(): """Index SQL generated correctly.""" indexes = get_index_sql("spells") assert len(indexes) > 0 assert any("idx_spells_level" in sql for sql in indexes) assert any("ON spells(level)" in sql for sql in indexes) ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_cache/test_schema.py::test_init_entity_cache_creates_tables -v` Expected: FAIL with "AttributeError: ... has no attribute 'init_entity_cache'" **Step 3: Write minimal implementation** ```python # Add to src/lorekeeper_mcp/cache/schema.py from pathlib import Path import aiosqlite def get_index_sql(entity_type: str) -> list[str]: """Generate CREATE INDEX statements for entity type.""" indexed_fields = INDEXED_FIELDS.get(entity_type, []) indexes = [] for field in indexed_fields: index_name = f"idx_{entity_type}_{field}" sql = f"CREATE INDEX IF NOT EXISTS {index_name} ON {entity_type}({field})" indexes.append(sql) return indexes async def init_entity_cache(db_path: str) -> None: """Initialize entity cache tables and indexes. Args: db_path: Path to SQLite database file """ # Ensure parent directory exists path = Path(db_path) path.parent.mkdir(parents=True, exist_ok=True) async with aiosqlite.connect(db_path) as db: # Enable WAL mode for concurrent access await db.execute("PRAGMA journal_mode=WAL") # Create all entity tables for entity_type in ENTITY_TYPES: table_sql = get_create_table_sql(entity_type) await db.execute(table_sql) # Create indexes for this table for index_sql in get_index_sql(entity_type): await db.execute(index_sql) await db.commit() ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_cache/test_schema.py -v` Expected: PASS (all tests pass) **Step 5: Commit** ```bash git add tests/test_cache/test_schema.py src/lorekeeper_mcp/cache/schema.py git commit -m "feat(cache): implement schema initialization with indexes" ``` --- ### Task 1.3: Implement entity storage operations **Files:** - Modify: `src/lorekeeper_mcp/cache/db.py` - Test: `tests/test_cache/test_db.py` **Step 1: Write the failing test** ```python # tests/test_cache/test_db.py (rewrite completely) """Tests for cache database operations.""" import json import tempfile from pathlib import Path import pytest from lorekeeper_mcp.cache.db import ( bulk_cache_entities, get_cached_entity, init_db, ) from lorekeeper_mcp.cache.schema import init_entity_cache @pytest.fixture async def test_db(): """Create a test database.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.db" await init_entity_cache(str(db_path)) yield str(db_path) @pytest.mark.asyncio async def test_bulk_cache_entities_inserts_new(test_db): """Bulk cache inserts new entities.""" entities = [ {"slug": "fireball", "name": "Fireball", "level": 3, "school": "Evocation"}, {"slug": "magic-missile", "name": "Magic Missile", "level": 1, "school": "Evocation"}, ] count = await bulk_cache_entities(entities, "spells", test_db, "open5e") assert count == 2 @pytest.mark.asyncio async def test_get_cached_entity_retrieves_by_slug(test_db): """Get cached entity retrieves by slug.""" entities = [ {"slug": "fireball", "name": "Fireball", "level": 3, "school": "Evocation"}, ] await bulk_cache_entities(entities, "spells", test_db, "open5e") entity = await get_cached_entity("spells", "fireball", test_db) assert entity is not None assert entity["slug"] == "fireball" assert entity["name"] == "Fireball" assert entity["level"] == 3 assert entity["school"] == "Evocation" @pytest.mark.asyncio async def test_get_cached_entity_returns_none_for_missing(test_db): """Get cached entity returns None for non-existent slug.""" entity = await get_cached_entity("spells", "nonexistent", test_db) assert entity is None @pytest.mark.asyncio async def test_bulk_cache_entities_handles_upsert(test_db): """Bulk cache updates existing entities.""" # Insert initial entity entities = [ {"slug": "fireball", "name": "Fireball", "level": 3, "school": "Evocation"}, ] await bulk_cache_entities(entities, "spells", test_db, "open5e") # Update with new data updated_entities = [ {"slug": "fireball", "name": "Fireball (Updated)", "level": 3, "school": "Evocation"}, ] count = await bulk_cache_entities(updated_entities, "spells", test_db, "open5e") assert count == 1 # Verify updated entity = await get_cached_entity("spells", "fireball", test_db) assert entity["name"] == "Fireball (Updated)" @pytest.mark.asyncio async def test_bulk_cache_entities_extracts_indexed_fields(test_db): """Bulk cache extracts indexed fields from entity data.""" entities = [ { "slug": "goblin", "name": "Goblin", "type": "humanoid", "size": "Small", "challenge_rating": "1/4", }, ] await bulk_cache_entities(entities, "monsters", test_db, "open5e") entity = await get_cached_entity("monsters", "goblin", test_db) assert entity["type"] == "humanoid" assert entity["size"] == "Small" assert entity["challenge_rating"] == "1/4" ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_cache/test_db.py::test_bulk_cache_entities_inserts_new -v` Expected: FAIL with import errors or function not found **Step 3: Write minimal implementation** ```python # Modify src/lorekeeper_mcp/cache/db.py # Replace the entire file contents: """Database cache layer for entity-based caching using SQLite.""" import json import time from pathlib import Path from typing import Any import aiosqlite from lorekeeper_mcp.cache.schema import INDEXED_FIELDS, get_table_name from lorekeeper_mcp.config import settings async def init_db() -> None: """Initialize the database schema. Imports and delegates to init_entity_cache for new schema. """ from lorekeeper_mcp.cache.schema import init_entity_cache await init_entity_cache(settings.db_path) async def bulk_cache_entities( entities: list[dict[str, Any]], entity_type: str, db_path: str | None = None, source_api: str = "unknown", ) -> int: """Bulk insert or update entities in cache. Args: entities: List of entity dictionaries entity_type: Type of entities (spells, monsters, etc.) db_path: Optional database path (defaults to settings.db_path) source_api: Source API identifier Returns: Number of entities processed """ if not entities: return 0 db_path = db_path or settings.db_path table_name = get_table_name(entity_type) indexed_fields = INDEXED_FIELDS.get(entity_type, []) # Build INSERT OR REPLACE query base_columns = ["slug", "name", "data", "source_api", "created_at", "updated_at"] all_columns = base_columns + indexed_fields placeholders = ", ".join(["?"] * len(all_columns)) columns_str = ", ".join(all_columns) sql = f""" INSERT OR REPLACE INTO {table_name} ({columns_str}) VALUES ({placeholders}) """ async with aiosqlite.connect(db_path) as db: now = time.time() rows = [] for entity in entities: slug = entity.get("slug") name = entity.get("name", "") if not slug: continue # Skip entities without slug # Check if entity already exists to preserve created_at cursor = await db.execute( f"SELECT created_at FROM {table_name} WHERE slug = ?", (slug,) ) existing = await cursor.fetchone() created_at = existing[0] if existing else now # Build row with base columns row = [ slug, name, json.dumps(entity), # Full entity as JSON source_api, created_at, now, # updated_at ] # Add indexed field values for field in indexed_fields: row.append(entity.get(field)) rows.append(row) await db.executemany(sql, rows) await db.commit() return len(rows) async def get_cached_entity( entity_type: str, slug: str, db_path: str | None = None, ) -> dict[str, Any] | None: """Retrieve a single cached entity by slug. Args: entity_type: Type of entity slug: Entity slug db_path: Optional database path Returns: Entity data dictionary or None if not found """ db_path = db_path or settings.db_path table_name = get_table_name(entity_type) async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( f"SELECT data FROM {table_name} WHERE slug = ?", (slug,) ) row = await cursor.fetchone() if row is None: return None return json.loads(row["data"]) # Keep existing functions for backward compatibility async def get_cached(key: str) -> dict[str, Any] | None: """Retrieve cached data if not expired (legacy URL-based cache).""" # Legacy function - will be removed after migration return None async def set_cached( key: str, data: dict[str, Any], content_type: str, ttl_seconds: int, source_api: str = "unknown", ) -> None: """Store data in cache with TTL (legacy URL-based cache).""" # Legacy function - will be removed after migration pass async def cleanup_expired() -> int: """Remove expired cache entries (legacy URL-based cache).""" # Legacy function - will be removed after migration return 0 ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_cache/test_db.py -v` Expected: PASS (all tests pass) **Step 5: Commit** ```bash git add tests/test_cache/test_db.py src/lorekeeper_mcp/cache/db.py git commit -m "feat(cache): implement entity storage and retrieval operations" ``` --- ### Task 1.4: Implement entity query operations with filters **Files:** - Modify: `src/lorekeeper_mcp/cache/db.py` - Test: `tests/test_cache/test_db.py` **Step 1: Write the failing test** ```python # Add to tests/test_cache/test_db.py from lorekeeper_mcp.cache.db import query_cached_entities @pytest.mark.asyncio async def test_query_cached_entities_returns_all(test_db): """Query without filters returns all entities.""" entities = [ {"slug": "fireball", "name": "Fireball", "level": 3, "school": "Evocation"}, {"slug": "magic-missile", "name": "Magic Missile", "level": 1, "school": "Evocation"}, {"slug": "cure-wounds", "name": "Cure Wounds", "level": 1, "school": "Necromancy"}, ] await bulk_cache_entities(entities, "spells", test_db, "open5e") results = await query_cached_entities("spells", test_db) assert len(results) == 3 @pytest.mark.asyncio async def test_query_cached_entities_filters_by_single_field(test_db): """Query filters by single field.""" entities = [ {"slug": "fireball", "name": "Fireball", "level": 3, "school": "Evocation"}, {"slug": "magic-missile", "name": "Magic Missile", "level": 1, "school": "Evocation"}, {"slug": "cure-wounds", "name": "Cure Wounds", "level": 1, "school": "Necromancy"}, ] await bulk_cache_entities(entities, "spells", test_db, "open5e") results = await query_cached_entities("spells", test_db, level=1) assert len(results) == 2 assert all(e["level"] == 1 for e in results) @pytest.mark.asyncio async def test_query_cached_entities_filters_by_multiple_fields(test_db): """Query filters by multiple fields with AND logic.""" entities = [ {"slug": "fireball", "name": "Fireball", "level": 3, "school": "Evocation"}, {"slug": "magic-missile", "name": "Magic Missile", "level": 1, "school": "Evocation"}, {"slug": "cure-wounds", "name": "Cure Wounds", "level": 1, "school": "Necromancy"}, ] await bulk_cache_entities(entities, "spells", test_db, "open5e") results = await query_cached_entities("spells", test_db, level=1, school="Evocation") assert len(results) == 1 assert results[0]["slug"] == "magic-missile" @pytest.mark.asyncio async def test_query_cached_entities_returns_empty_for_no_matches(test_db): """Query with no matches returns empty list.""" entities = [ {"slug": "fireball", "name": "Fireball", "level": 3, "school": "Evocation"}, ] await bulk_cache_entities(entities, "spells", test_db, "open5e") results = await query_cached_entities("spells", test_db, level=9) assert results == [] ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_cache/test_db.py::test_query_cached_entities_returns_all -v` Expected: FAIL with "ImportError: cannot import name 'query_cached_entities'" **Step 3: Write minimal implementation** ```python # Add to src/lorekeeper_mcp/cache/db.py async def query_cached_entities( entity_type: str, db_path: str | None = None, **filters: Any, ) -> list[dict[str, Any]]: """Query cached entities with optional filters. Args: entity_type: Type of entities to query db_path: Optional database path **filters: Field filters (e.g., level=3, school="Evocation") Returns: List of matching entity dictionaries """ db_path = db_path or settings.db_path table_name = get_table_name(entity_type) # Build WHERE clause where_clauses = [] params = [] for field, value in filters.items(): where_clauses.append(f"{field} = ?") params.append(value) where_sql = "" if where_clauses: where_sql = " WHERE " + " AND ".join(where_clauses) query = f"SELECT data FROM {table_name}{where_sql}" async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(query, params) rows = await cursor.fetchall() return [json.loads(row["data"]) for row in rows] ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_cache/test_db.py -v` Expected: PASS (all tests pass) **Step 5: Commit** ```bash git add tests/test_cache/test_db.py src/lorekeeper_mcp/cache/db.py git commit -m "feat(cache): implement filtered entity queries" ``` --- ## Phase 2: Cache Statistics & Observability ### Task 2.1: Implement entity count statistics **Files:** - Modify: `src/lorekeeper_mcp/cache/db.py` - Test: `tests/test_cache/test_db.py` **Step 1: Write the failing test** ```python # Add to tests/test_cache/test_db.py from lorekeeper_mcp.cache.db import get_entity_count @pytest.mark.asyncio async def test_get_entity_count_returns_count(test_db): """Get entity count returns correct count.""" entities = [ {"slug": "fireball", "name": "Fireball"}, {"slug": "magic-missile", "name": "Magic Missile"}, ] await bulk_cache_entities(entities, "spells", test_db, "open5e") count = await get_entity_count("spells", test_db) assert count == 2 @pytest.mark.asyncio async def test_get_entity_count_returns_zero_for_empty(test_db): """Get entity count returns 0 for empty table.""" count = await get_entity_count("spells", test_db) assert count == 0 @pytest.mark.asyncio async def test_get_entity_count_handles_nonexistent_table(test_db): """Get entity count returns 0 for non-existent table.""" count = await get_entity_count("invalid_type", test_db) assert count == 0 ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_cache/test_db.py::test_get_entity_count_returns_count -v` Expected: FAIL with "ImportError: cannot import name 'get_entity_count'" **Step 3: Write minimal implementation** ```python # Add to src/lorekeeper_mcp/cache/db.py async def get_entity_count( entity_type: str, db_path: str | None = None, ) -> int: """Get count of cached entities for a type. Args: entity_type: Type of entities to count db_path: Optional database path Returns: Number of cached entities, or 0 if table doesn't exist """ db_path = db_path or settings.db_path table_name = get_table_name(entity_type) try: async with aiosqlite.connect(db_path) as db: cursor = await db.execute(f"SELECT COUNT(*) FROM {table_name}") row = await cursor.fetchone() return row[0] if row else 0 except aiosqlite.OperationalError: # Table doesn't exist return 0 ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_cache/test_db.py::test_get_entity_count_returns_count tests/test_cache/test_db.py::test_get_entity_count_returns_zero_for_empty tests/test_cache/test_db.py::test_get_entity_count_handles_nonexistent_table -v` Expected: PASS (all 3 tests pass) **Step 5: Commit** ```bash git add tests/test_cache/test_db.py src/lorekeeper_mcp/cache/db.py git commit -m "feat(cache): add entity count statistics" ``` --- ### Task 2.2: Implement comprehensive cache statistics **Files:** - Modify: `src/lorekeeper_mcp/cache/db.py` - Test: `tests/test_cache/test_db.py` **Step 1: Write the failing test** ```python # Add to tests/test_cache/test_db.py from lorekeeper_mcp.cache.db import get_cache_stats @pytest.mark.asyncio async def test_get_cache_stats_returns_complete_stats(test_db): """Get cache stats returns comprehensive statistics.""" # Add some test data spells = [{"slug": "fireball", "name": "Fireball"}] monsters = [{"slug": "goblin", "name": "Goblin"}, {"slug": "dragon", "name": "Dragon"}] await bulk_cache_entities(spells, "spells", test_db, "open5e") await bulk_cache_entities(monsters, "monsters", test_db, "open5e") stats = await get_cache_stats(test_db) assert "entity_counts" in stats assert stats["entity_counts"]["spells"] == 1 assert stats["entity_counts"]["monsters"] == 2 assert "db_size_bytes" in stats assert stats["db_size_bytes"] > 0 assert "schema_version" in stats assert "table_count" in stats ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_cache/test_db.py::test_get_cache_stats_returns_complete_stats -v` Expected: FAIL with "ImportError: cannot import name 'get_cache_stats'" **Step 3: Write minimal implementation** ```python # Add to src/lorekeeper_mcp/cache/db.py from lorekeeper_mcp.cache.schema import ENTITY_TYPES, SCHEMA_VERSION async def get_cache_stats(db_path: str | None = None) -> dict[str, Any]: """Get comprehensive cache statistics. Args: db_path: Optional database path Returns: Dictionary with cache statistics """ db_path = db_path or settings.db_path # Get entity counts per type entity_counts = {} for entity_type in ENTITY_TYPES: count = await get_entity_count(entity_type, db_path) entity_counts[entity_type] = count # Get database file size db_size = 0 path = Path(db_path) if path.exists(): db_size = path.stat().st_size # Get table count async with aiosqlite.connect(db_path) as db: cursor = await db.execute( "SELECT COUNT(*) FROM sqlite_master WHERE type='table'" ) row = await cursor.fetchone() table_count = row[0] if row else 0 return { "entity_counts": entity_counts, "db_size_bytes": db_size, "schema_version": SCHEMA_VERSION, "table_count": table_count, } ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_cache/test_db.py::test_get_cache_stats_returns_complete_stats -v` Expected: PASS **Step 5: Commit** ```bash git add tests/test_cache/test_db.py src/lorekeeper_mcp/cache/db.py git commit -m "feat(cache): add comprehensive cache statistics" ``` --- ## Phase 3: Base Client Integration ### Task 3.1: Add parallel cache query support to BaseHttpClient **Files:** - Modify: `src/lorekeeper_mcp/api_clients/base.py` - Test: `tests/test_api_clients/test_base.py` **Step 1: Write the failing test** ```python # Add to tests/test_api_clients/test_base.py import pytest import asyncio from unittest.mock import AsyncMock, patch from lorekeeper_mcp.api_clients.base import BaseHttpClient @pytest.mark.asyncio async def test_query_cache_parallel_executes_concurrently(): """Cache query runs in parallel with API call.""" client = BaseHttpClient("https://api.example.com") # Mock cache query to take 0.1 seconds async def mock_cache_query(*args, **kwargs): await asyncio.sleep(0.1) return [{"slug": "fireball", "name": "Fireball"}] with patch("lorekeeper_mcp.api_clients.base.query_cached_entities", side_effect=mock_cache_query): start = asyncio.get_event_loop().time() result = await client._query_cache_parallel("spells", level=3) elapsed = asyncio.get_event_loop().time() - start # Should complete in roughly 0.1s, not block caller assert elapsed < 0.2 assert len(result) == 1 @pytest.mark.asyncio async def test_query_cache_parallel_handles_cache_error(): """Cache query errors don't crash parallel operation.""" client = BaseHttpClient("https://api.example.com") async def mock_error(*args, **kwargs): raise Exception("Cache error") with patch("lorekeeper_mcp.api_clients.base.query_cached_entities", side_effect=mock_error): result = await client._query_cache_parallel("spells") # Should return empty list on error assert result == [] ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_api_clients/test_base.py::test_query_cache_parallel_executes_concurrently -v` Expected: FAIL with "AttributeError: ... has no attribute '\_query_cache_parallel'" **Step 3: Write minimal implementation** ```python # Add to src/lorekeeper_mcp/api_clients/base.py import logging from lorekeeper_mcp.cache.db import query_cached_entities logger = logging.getLogger(__name__) # Add this method to BaseHttpClient class: async def _query_cache_parallel( self, entity_type: str, **filters: Any, ) -> list[dict[str, Any]]: """Query cache for entities in parallel. Args: entity_type: Type of entities to query **filters: Filter parameters Returns: List of cached entities, or empty list on error """ try: return await query_cached_entities(entity_type, **filters) except Exception as e: logger.warning(f"Cache query failed: {e}") return [] ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_api_clients/test_base.py::test_query_cache_parallel_executes_concurrently tests/test_api_clients/test_base.py::test_query_cache_parallel_handles_cache_error -v` Expected: PASS (both tests pass) **Step 5: Commit** ```bash git add tests/test_api_clients/test_base.py src/lorekeeper_mcp/api_clients/base.py git commit -m "feat(base-client): add parallel cache query support" ``` --- ### Task 3.2: Implement entity caching from API responses **Files:** - Modify: `src/lorekeeper_mcp/api_clients/base.py` - Test: `tests/test_api_clients/test_base.py` **Step 1: Write the failing test** ```python # Add to tests/test_api_clients/test_base.py from lorekeeper_mcp.cache.db import get_cached_entity, init_entity_cache import tempfile from pathlib import Path @pytest.fixture async def client_with_db(): """Create client with test database.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.db" await init_entity_cache(str(db_path)) # Temporarily override settings import lorekeeper_mcp.config original_path = lorekeeper_mcp.config.settings.db_path lorekeeper_mcp.config.settings.db_path = str(db_path) client = BaseHttpClient("https://api.example.com") yield client # Restore lorekeeper_mcp.config.settings.db_path = original_path await client.close() @pytest.mark.asyncio async def test_cache_api_entities_stores_entities(client_with_db): """Cache API entities stores each entity.""" entities = [ {"slug": "fireball", "name": "Fireball", "level": 3}, {"slug": "magic-missile", "name": "Magic Missile", "level": 1}, ] await client_with_db._cache_api_entities(entities, "spells") # Verify entities cached cached = await get_cached_entity("spells", "fireball") assert cached is not None assert cached["name"] == "Fireball" @pytest.mark.asyncio async def test_extract_entities_from_paginated_response(client_with_db): """Extract entities handles paginated API response.""" response = { "count": 2, "results": [ {"slug": "fireball", "name": "Fireball"}, {"slug": "magic-missile", "name": "Magic Missile"}, ] } entities = client_with_db._extract_entities(response, "spells") assert len(entities) == 2 assert entities[0]["slug"] == "fireball" @pytest.mark.asyncio async def test_extract_entities_handles_non_paginated(client_with_db): """Extract entities handles direct entity response.""" response = {"slug": "fireball", "name": "Fireball"} entities = client_with_db._extract_entities(response, "spells") assert len(entities) == 1 assert entities[0]["slug"] == "fireball" ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_api_clients/test_base.py::test_cache_api_entities_stores_entities -v` Expected: FAIL with "AttributeError: ... has no attribute '\_cache_api_entities'" **Step 3: Write minimal implementation** ```python # Add to src/lorekeeper_mcp/api_clients/base.py from lorekeeper_mcp.cache.db import bulk_cache_entities # Add these methods to BaseHttpClient class: def _extract_entities( self, response: dict[str, Any], entity_type: str, ) -> list[dict[str, Any]]: """Extract entities from API response. Handles both paginated responses with 'results' and direct entities. Args: response: API response dictionary entity_type: Type of entities Returns: List of entity dictionaries """ # Check if paginated response if "results" in response and isinstance(response["results"], list): return response["results"] # Check if direct entity (has slug) if "slug" in response: return [response] # Unknown format return [] async def _cache_api_entities( self, entities: list[dict[str, Any]], entity_type: str, ) -> None: """Cache entities from API response. Args: entities: List of entity dictionaries entity_type: Type of entities """ if not entities: return try: await bulk_cache_entities( entities, entity_type, source_api=self.source_api, ) logger.debug(f"Cached {len(entities)} {entity_type}") except Exception as e: logger.warning(f"Failed to cache entities: {e}") # Non-fatal - continue without caching ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_api_clients/test_base.py -k "cache_api_entities or extract_entities" -v` Expected: PASS (all 3 new tests pass) **Step 5: Commit** ```bash git add tests/test_api_clients/test_base.py src/lorekeeper_mcp/api_clients/base.py git commit -m "feat(base-client): implement entity caching from API responses" ``` --- ### Task 3.3: Implement offline fallback logic **Files:** - Modify: `src/lorekeeper_mcp/api_clients/base.py` - Test: `tests/test_api_clients/test_base.py` **Step 1: Write the failing test** ```python # Add to tests/test_api_clients/test_base.py from lorekeeper_mcp.api_clients.exceptions import NetworkError from lorekeeper_mcp.cache.db import bulk_cache_entities @pytest.mark.asyncio async def test_make_request_offline_fallback_returns_cache(client_with_db): """Make request falls back to cache on network error.""" # Pre-populate cache entities = [{"slug": "fireball", "name": "Fireball", "level": 3}] await bulk_cache_entities(entities, "spells") # Mock network error with patch.object(client_with_db, "_make_request", side_effect=NetworkError("Network down")): with patch.object(client_with_db, "_query_cache_parallel", return_value=entities): result = await client_with_db.make_request( "/spells", entity_type="spells", use_entity_cache=True, ) # Should return cached data instead of raising assert result == entities @pytest.mark.asyncio async def test_make_request_offline_with_no_cache_returns_empty(client_with_db): """Make request with network error and no cache returns empty.""" # Mock network error with empty cache with patch.object(client_with_db, "_make_request", side_effect=NetworkError("Network down")): with patch.object(client_with_db, "_query_cache_parallel", return_value=[]): result = await client_with_db.make_request( "/spells", entity_type="spells", use_entity_cache=True, ) # Should return empty list assert result == [] @pytest.mark.asyncio async def test_make_request_offline_logs_warning(client_with_db, caplog): """Make request logs warning in offline mode.""" with patch.object(client_with_db, "_make_request", side_effect=NetworkError("Network down")): with patch.object(client_with_db, "_query_cache_parallel", return_value=[]): await client_with_db.make_request( "/spells", entity_type="spells", use_entity_cache=True, ) # Should log offline warning assert "offline" in caplog.text.lower() or "network" in caplog.text.lower() ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_api_clients/test_base.py::test_make_request_offline_fallback_returns_cache -v` Expected: FAIL (make_request doesn't handle entity_type parameter or offline fallback) **Step 3: Write minimal implementation** ```python # Modify src/lorekeeper_mcp/api_clients/base.py # Replace the make_request method in BaseHttpClient: async def make_request( self, endpoint: str, method: str = "GET", use_cache: bool = True, use_entity_cache: bool = False, entity_type: str | None = None, cache_filters: dict[str, Any] | None = None, **kwargs: Any, ) -> dict[str, Any] | list[dict[str, Any]]: """Make HTTP request with caching and retry logic. Args: endpoint: API endpoint path method: HTTP method use_cache: Whether to use legacy URL-based cache (deprecated) use_entity_cache: Whether to use entity-based cache entity_type: Type of entities for entity cache cache_filters: Filters for cache query **kwargs: Additional arguments for httpx request Returns: Parsed JSON response or list of entities Raises: NetworkError: For network-related failures (when not using offline fallback) ApiError: For API error responses (4xx/5xx) """ url = f"{self.base_url}{endpoint}" cache_filters = cache_filters or {} # Start parallel cache query if entity cache enabled cache_task = None if use_entity_cache and entity_type: cache_task = asyncio.create_task( self._query_cache_parallel(entity_type, **cache_filters) ) # Legacy URL-based cache check if use_cache and method == "GET" and not use_entity_cache: cached = await self._get_cached_response(url) if cached is not None: logger.debug(f"Cache hit: {url}") return cached # Make API request try: response = await self._make_request(endpoint, method, **kwargs) # Extract and cache entities if using entity cache if use_entity_cache and entity_type: entities = self._extract_entities(response, entity_type) await self._cache_api_entities(entities, entity_type) # Merge with cache results if available if cache_task: try: cached_entities = await cache_task # For now, API takes precedence, just return API results # (merging logic can be enhanced later) return entities except Exception: return entities return entities # Legacy URL-based cache storage if use_cache and method == "GET": await self._cache_response(url, response) return response except NetworkError as e: # Offline fallback: return cached entities if use_entity_cache and entity_type and cache_task: logger.warning(f"Network error, falling back to cache: {e}") try: cached_entities = await cache_task if cached_entities: logger.info(f"Returning {len(cached_entities)} cached {entity_type}") return cached_entities else: logger.warning(f"No cached {entity_type} available for offline mode") return [] except Exception as cache_error: logger.error(f"Cache fallback failed: {cache_error}") return [] # No fallback available, re-raise raise ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_api_clients/test_base.py -k offline -v` Expected: PASS (all offline tests pass) **Step 5: Commit** ```bash git add tests/test_api_clients/test_base.py src/lorekeeper_mcp/api_clients/base.py git commit -m "feat(base-client): implement offline fallback with entity cache" ``` --- ### Task 3.4: Implement cache-first mode **Files:** - Modify: `src/lorekeeper_mcp/api_clients/base.py` - Test: `tests/test_api_clients/test_base.py` **Step 1: Write the failing test** ```python # Add to tests/test_api_clients/test_base.py @pytest.mark.asyncio async def test_make_request_cache_first_returns_immediately(client_with_db): """Make request with cache_first returns cache without waiting for API.""" # Pre-populate cache entities = [{"slug": "fireball", "name": "Fireball (cached)"}] await bulk_cache_entities(entities, "spells") # Mock slow API response async def slow_api(*args, **kwargs): await asyncio.sleep(0.5) return {"results": [{"slug": "fireball", "name": "Fireball (fresh)"}]} with patch.object(client_with_db, "_make_request", side_effect=slow_api): start = asyncio.get_event_loop().time() result = await client_with_db.make_request( "/spells", entity_type="spells", use_entity_cache=True, cache_first=True, ) elapsed = asyncio.get_event_loop().time() - start # Should return cached data quickly (< 0.1s, not wait for 0.5s API) assert elapsed < 0.1 assert result[0]["name"] == "Fireball (cached)" @pytest.mark.asyncio async def test_make_request_cache_first_refreshes_background(client_with_db): """Make request with cache_first refreshes cache in background.""" # Pre-populate cache with old data old_entities = [{"slug": "fireball", "name": "Fireball (old)"}] await bulk_cache_entities(old_entities, "spells") # Mock API with new data async def mock_api(*args, **kwargs): return {"results": [{"slug": "fireball", "name": "Fireball (new)"}]} with patch.object(client_with_db, "_make_request", side_effect=mock_api): # First call returns cached result1 = await client_with_db.make_request( "/spells", entity_type="spells", use_entity_cache=True, cache_first=True, ) # Wait for background refresh await asyncio.sleep(0.2) # Second call should have refreshed data result2 = await client_with_db.make_request( "/spells", entity_type="spells", use_entity_cache=True, cache_first=True, ) assert result1[0]["name"] == "Fireball (old)" # Background task should have updated cache # (In practice this requires the background task to complete) ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_api_clients/test_base.py::test_make_request_cache_first_returns_immediately -v` Expected: FAIL (cache_first parameter not supported) **Step 3: Write minimal implementation** ```python # Modify src/lorekeeper_mcp/api_clients/base.py # Update the make_request method signature and add cache_first logic: async def make_request( self, endpoint: str, method: str = "GET", use_cache: bool = True, use_entity_cache: bool = False, entity_type: str | None = None, cache_filters: dict[str, Any] | None = None, cache_first: bool = False, **kwargs: Any, ) -> dict[str, Any] | list[dict[str, Any]]: """Make HTTP request with caching and retry logic. Args: endpoint: API endpoint path method: HTTP method use_cache: Whether to use legacy URL-based cache (deprecated) use_entity_cache: Whether to use entity-based cache entity_type: Type of entities for entity cache cache_filters: Filters for cache query cache_first: Return cache immediately, refresh in background **kwargs: Additional arguments for httpx request Returns: Parsed JSON response or list of entities Raises: NetworkError: For network-related failures (when not using offline fallback) ApiError: For API error responses (4xx/5xx) """ url = f"{self.base_url}{endpoint}" cache_filters = cache_filters or {} # Cache-first mode: return cache immediately, refresh async if cache_first and use_entity_cache and entity_type: # Query cache immediately cached_entities = await self._query_cache_parallel(entity_type, **cache_filters) # Start background refresh task async def background_refresh(): try: response = await self._make_request(endpoint, method, **kwargs) entities = self._extract_entities(response, entity_type) await self._cache_api_entities(entities, entity_type) logger.debug(f"Background refresh completed for {entity_type}") except Exception as e: logger.warning(f"Background refresh failed: {e}") # Fire and forget asyncio.create_task(background_refresh()) return cached_entities # ... rest of the existing make_request logic remains the same ... ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_api_clients/test_base.py::test_make_request_cache_first_returns_immediately -v` Expected: PASS **Step 5: Commit** ```bash git add tests/test_api_clients/test_base.py src/lorekeeper_mcp/api_clients/base.py git commit -m "feat(base-client): implement cache-first mode with background refresh" ``` --- ## Phase 4: API Client Updates ### Task 4.1: Update Open5eV1Client for entity caching **Files:** - Modify: `src/lorekeeper_mcp/api_clients/open5e_v1.py` - Test: `tests/test_api_clients/test_open5e_v1.py` **Step 1: Write the failing integration test** ```python # Add to tests/test_api_clients/test_open5e_v1.py import pytest from unittest.mock import patch, AsyncMock from lorekeeper_mcp.api_clients.open5e_v1 import Open5eV1Client from lorekeeper_mcp.cache.db import get_cached_entity, init_entity_cache import tempfile from pathlib import Path @pytest.fixture async def v1_client_with_cache(): """Create V1 client with test cache.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.db" await init_entity_cache(str(db_path)) import lorekeeper_mcp.config original = lorekeeper_mcp.config.settings.db_path lorekeeper_mcp.config.settings.db_path = str(db_path) client = Open5eV1Client() yield client lorekeeper_mcp.config.settings.db_path = original await client.close() @pytest.mark.asyncio async def test_get_monsters_uses_entity_cache(v1_client_with_cache): """Get monsters caches entities, not URLs.""" mock_response = { "results": [ {"slug": "goblin", "name": "Goblin", "type": "humanoid", "challenge_rating": "1/4"} ] } with patch.object(v1_client_with_cache, "_make_request", return_value=mock_response): result = await v1_client_with_cache.get_monsters() # Verify entity cached cached = await get_cached_entity("monsters", "goblin") assert cached is not None assert cached["name"] == "Goblin" assert cached["type"] == "humanoid" @pytest.mark.asyncio async def test_get_classes_uses_entity_cache(v1_client_with_cache): """Get classes caches entities.""" mock_response = { "results": [ {"slug": "fighter", "name": "Fighter", "hit_die": 10} ] } with patch.object(v1_client_with_cache, "_make_request", return_value=mock_response): result = await v1_client_with_cache.get_classes() cached = await get_cached_entity("classes", "fighter") assert cached is not None assert cached["hit_die"] == 10 ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_api_clients/test_open5e_v1.py::test_get_monsters_uses_entity_cache -v` Expected: FAIL (entities not cached) **Step 3: Update Open5eV1Client implementation** ```python # Modify src/lorekeeper_mcp/api_clients/open5e_v1.py # Update get_monsters method and similar methods: async def get_monsters( self, challenge_rating: str | None = None, **kwargs: Any, ) -> list[dict[str, Any]]: """Get monsters from Open5e API v1. Args: challenge_rating: Filter by CR (e.g., "1/4", "5") **kwargs: Additional API parameters Returns: List of monster dictionaries """ cache_filters = {} if challenge_rating: cache_filters["challenge_rating"] = challenge_rating result = await self.make_request( "/monsters/", use_entity_cache=True, entity_type="monsters", cache_filters=cache_filters, params=kwargs, ) # Return entities directly (make_request now returns entity list) if isinstance(result, list): return result # Fallback for legacy response format return result.get("results", []) async def get_classes(self, **kwargs: Any) -> list[dict[str, Any]]: """Get character classes from Open5e API v1. Returns: List of class dictionaries """ result = await self.make_request( "/classes/", use_entity_cache=True, entity_type="classes", params=kwargs, ) if isinstance(result, list): return result return result.get("results", []) async def get_races(self, **kwargs: Any) -> list[dict[str, Any]]: """Get character races from Open5e API v1. Returns: List of race dictionaries """ result = await self.make_request( "/races/", use_entity_cache=True, entity_type="races", params=kwargs, ) if isinstance(result, list): return result return result.get("results", []) ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_api_clients/test_open5e_v1.py -k entity_cache -v` Expected: PASS **Step 5: Commit** ```bash git add tests/test_api_clients/test_open5e_v1.py src/lorekeeper_mcp/api_clients/open5e_v1.py git commit -m "feat(open5e-v1): update client to use entity-based caching" ``` --- ### Task 4.2: Update Open5eV2Client for entity caching **Files:** - Modify: `src/lorekeeper_mcp/api_clients/open5e_v2.py` - Test: `tests/test_api_clients/test_open5e_v2.py` **Step 1: Write the failing integration test** ```python # Add to tests/test_api_clients/test_open5e_v2.py import pytest from unittest.mock import patch from lorekeeper_mcp.api_clients.open5e_v2 import Open5eV2Client from lorekeeper_mcp.cache.db import get_cached_entity, init_entity_cache import tempfile from pathlib import Path @pytest.fixture async def v2_client_with_cache(): """Create V2 client with test cache.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.db" await init_entity_cache(str(db_path)) import lorekeeper_mcp.config original = lorekeeper_mcp.config.settings.db_path lorekeeper_mcp.config.settings.db_path = str(db_path) client = Open5eV2Client() yield client lorekeeper_mcp.config.settings.db_path = original await client.close() @pytest.mark.asyncio async def test_get_spells_uses_entity_cache(v2_client_with_cache): """Get spells caches entities.""" mock_response = { "results": [ {"slug": "fireball", "name": "Fireball", "level": 3, "school": "Evocation"} ] } with patch.object(v2_client_with_cache, "_make_request", return_value=mock_response): result = await v2_client_with_cache.get_spells() cached = await get_cached_entity("spells", "fireball") assert cached is not None assert cached["level"] == 3 @pytest.mark.asyncio async def test_get_weapons_uses_entity_cache(v2_client_with_cache): """Get weapons caches entities.""" mock_response = { "results": [ {"slug": "longsword", "name": "Longsword", "category": "martial"} ] } with patch.object(v2_client_with_cache, "_make_request", return_value=mock_response): result = await v2_client_with_cache.get_weapons() cached = await get_cached_entity("weapons", "longsword") assert cached is not None @pytest.mark.asyncio async def test_get_armor_uses_entity_cache(v2_client_with_cache): """Get armor caches entities.""" mock_response = { "results": [ {"slug": "plate-armor", "name": "Plate Armor", "armor_class": 18} ] } with patch.object(v2_client_with_cache, "_make_request", return_value=mock_response): result = await v2_client_with_cache.get_armor() cached = await get_cached_entity("armor", "plate-armor") assert cached is not None ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_api_clients/test_open5e_v2.py::test_get_spells_uses_entity_cache -v` Expected: FAIL (entities not cached) **Step 3: Update Open5eV2Client implementation** ```python # Modify src/lorekeeper_mcp/api_clients/open5e_v2.py # Update all entity-fetching methods: async def get_spells( self, level: int | None = None, school: str | None = None, **kwargs: Any, ) -> list[dict[str, Any]]: """Get spells from Open5e API v2. Args: level: Filter by spell level school: Filter by spell school **kwargs: Additional API parameters Returns: List of spell dictionaries """ cache_filters = {} if level is not None: cache_filters["level"] = level if school: cache_filters["school"] = school result = await self.make_request( "/spells/", use_entity_cache=True, entity_type="spells", cache_filters=cache_filters, params=kwargs, ) if isinstance(result, list): return result return result.get("results", []) async def get_weapons(self, **kwargs: Any) -> list[dict[str, Any]]: """Get weapons from Open5e API v2.""" result = await self.make_request( "/weapons/", use_entity_cache=True, entity_type="weapons", params=kwargs, ) if isinstance(result, list): return result return result.get("results", []) async def get_armor(self, **kwargs: Any) -> list[dict[str, Any]]: """Get armor from Open5e API v2.""" result = await self.make_request( "/armor/", use_entity_cache=True, entity_type="armor", params=kwargs, ) if isinstance(result, list): return result return result.get("results", []) async def get_backgrounds(self, **kwargs: Any) -> list[dict[str, Any]]: """Get character backgrounds.""" result = await self.make_request( "/backgrounds/", use_entity_cache=True, entity_type="backgrounds", params=kwargs, ) if isinstance(result, list): return result return result.get("results", []) async def get_feats(self, **kwargs: Any) -> list[dict[str, Any]]: """Get character feats.""" result = await self.make_request( "/feats/", use_entity_cache=True, entity_type="feats", params=kwargs, ) if isinstance(result, list): return result return result.get("results", []) async def get_conditions(self, **kwargs: Any) -> list[dict[str, Any]]: """Get game conditions.""" result = await self.make_request( "/conditions/", use_entity_cache=True, entity_type="conditions", params=kwargs, ) if isinstance(result, list): return result return result.get("results", []) ``` **Step 4: Run test to verify it passes** Run: `pytest tests/test_api_clients/test_open5e_v2.py -k entity_cache -v` Expected: PASS (all entity cache tests pass) **Step 5: Commit** ```bash git add tests/test_api_clients/test_open5e_v2.py src/lorekeeper_mcp/api_clients/open5e_v2.py git commit -m "feat(open5e-v2): update client to use entity-based caching" ``` --- ## Phase 5: Testing & Documentation ### Task 5.1: Run full test suite and fix any issues **Step 1: Run all tests** Run: `pytest tests/ -v --cov=src/lorekeeper_mcp/cache --cov=src/lorekeeper_mcp/api_clients` Expected: Review coverage report, identify any gaps **Step 2: Fix failing tests** If any tests fail due to the cache refactor, update them to use entity-based caching patterns. Focus on: - Tests that expect URL-based cache behavior - Tests that check for old `api_cache` table - Integration tests that need cache initialization **Step 3: Verify coverage** Run: `pytest tests/test_cache/ -v --cov=src/lorekeeper_mcp/cache --cov-report=html` Expected: >90% coverage on cache layer **Step 4: Commit fixes** ```bash git add tests/ git commit -m "test: fix tests for entity-based caching" ``` --- ### Task 5.2: Update cache documentation **Files:** - Modify: `docs/cache.md` **Step 1: Read current documentation** Run: `cat docs/cache.md` **Step 2: Update with entity-based architecture** Update `docs/cache.md` with: - Entity-based caching architecture overview - Cache schema and table structure - Examples of cache operations - Migration guide from old cache - Cache statistics API - Offline fallback behavior - Cache-first mode usage **Step 3: Commit documentation** ```bash git add docs/cache.md git commit -m "docs: update cache documentation for entity-based architecture" ``` --- ### Task 5.3: Manual validation testing **Step 1: Initialize cache with real data** ```bash # Start server python -m lorekeeper_mcp.server # In another terminal, make some MCP requests to populate cache # (Use your MCP client or test scripts) ``` **Step 2: Verify cache tables** ```bash sqlite3 data/cache.db ".tables" sqlite3 data/cache.db "SELECT COUNT(*) FROM spells" sqlite3 data/cache.db "SELECT COUNT(*) FROM monsters" ``` Expected: See all entity tables, counts > 0 for populated types **Step 3: Test offline mode** ```bash # Disconnect network or modify hosts file to block API # Make requests - should serve from cache # Check logs for "offline" or "falling back to cache" messages ``` **Step 4: Verify cache statistics** ```python # In Python REPL from lorekeeper_mcp.cache.db import get_cache_stats import asyncio stats = asyncio.run(get_cache_stats()) print(stats) ``` Expected: Complete statistics with entity counts, db size, etc. **Step 5: Document validation results** Create a checklist of what was tested and verified: - [ ] All entity tables created - [ ] Entities cached after API calls - [ ] Filters work correctly - [ ] Offline mode serves cached data - [ ] Cache statistics accurate - [ ] No errors in logs --- ## Verification & Completion ### Final Checklist Run through this checklist before marking implementation complete: - [ ] All entity tables created successfully (`sqlite3 data/cache.db ".tables"`) - [ ] Bulk insert handles 100+ entities efficiently (test with real API data) - [ ] Filters work correctly for all entity types (unit tests pass) - [ ] Parallel cache queries reduce latency vs sequential (integration tests) - [ ] Offline mode serves cached data without errors (manual testing) - [ ] Cache statistics accurate and complete (`get_cache_stats()`) - [ ] Unit test coverage >90% for cache layer (`pytest --cov`) - [ ] Integration tests pass for all API clients (`pytest tests/test_api_clients/`) - [ ] Manual testing confirms end-to-end functionality - [ ] Documentation updated and accurate (`docs/cache.md`) ### Migration Notes The old `api_cache` table will coexist with new entity tables initially. To complete migration: 1. Verify new entity cache working correctly 2. Run cleanup to drop old table: ```python # In Python REPL or script from lorekeeper_mcp.cache.schema import migrate_cache_schema import asyncio asyncio.run(migrate_cache_schema("data/cache.db")) ``` 3. Verify old table removed: `sqlite3 data/cache.db ".tables"` --- ## Execution Options **Plan complete and saved to `openspec/changes/implement-entity-cache/plan.md`.** **Two execution options:** **1. Subagent-Driven (this session)** - I dispatch fresh subagent per task, review between tasks, fast iteration with quality gates **2. Parallel Session (separate)** - Open new session with executing-plans skill for batch execution with checkpoints **Which approach would you like?**

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/frap129/lorekeeper-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server