Skip to main content
Glama

Flutter MCP

by adamsmaka
cache.pyβ€’8.63 kB
"""SQLite-based cache implementation for Flutter MCP Server.""" import json import sqlite3 import time from pathlib import Path from typing import Optional, Dict, Any import logging try: from platformdirs import user_cache_dir except ImportError: # Fallback to simple home directory approach def user_cache_dir(app_name: str, app_author: str) -> str: """Simple fallback for cache directory.""" home = Path.home() if hasattr(home, 'absolute'): return str(home / '.cache' / app_name) return str(Path('.') / '.cache' / app_name) logger = logging.getLogger(__name__) class CacheManager: """SQLite-based cache manager for Flutter documentation.""" def __init__(self, app_name: str = "FlutterMCP", ttl_hours: int = 24): """Initialize cache manager. Args: app_name: Application name for cache directory ttl_hours: Time-to-live for cache entries in hours """ self.app_name = app_name self.ttl_seconds = ttl_hours * 3600 self.db_path = self._get_db_path() self._init_db() def _get_db_path(self) -> Path: """Get platform-specific cache database path.""" cache_dir = user_cache_dir(self.app_name, self.app_name) cache_path = Path(cache_dir) cache_path.mkdir(parents=True, exist_ok=True) return cache_path / "cache.db" def _init_db(self) -> None: """Initialize the cache database.""" with sqlite3.connect(str(self.db_path)) as conn: # Check if we need to migrate the schema cursor = conn.execute(""" SELECT sql FROM sqlite_master WHERE type='table' AND name='doc_cache' """) existing_schema = cursor.fetchone() if existing_schema and 'token_count' not in existing_schema[0]: # Migrate existing table to add token_count column logger.info("Migrating cache schema to add token_count") conn.execute(""" ALTER TABLE doc_cache ADD COLUMN token_count INTEGER DEFAULT NULL """) else: # Create new table with token_count conn.execute(""" CREATE TABLE IF NOT EXISTS doc_cache ( key TEXT PRIMARY KEY NOT NULL, value TEXT NOT NULL, created_at INTEGER NOT NULL, expires_at INTEGER NOT NULL, token_count INTEGER DEFAULT NULL ) """) # Create index for expiration queries conn.execute(""" CREATE INDEX IF NOT EXISTS idx_expires_at ON doc_cache(expires_at) """) conn.commit() def get(self, key: str) -> Optional[Dict[str, Any]]: """Get value from cache with lazy expiration. Args: key: Cache key Returns: Cached value as dict or None if not found/expired """ current_time = int(time.time()) with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.execute( "SELECT value, expires_at, token_count FROM doc_cache WHERE key = ?", (key,) ) row = cursor.fetchone() if not row: return None value, expires_at, token_count = row # Check if expired if expires_at < current_time: # Lazy deletion conn.execute("DELETE FROM doc_cache WHERE key = ?", (key,)) conn.commit() logger.debug(f"Cache expired for key: {key}") return None try: result = json.loads(value) # Add token_count to the result if it exists if token_count is not None: result['_cached_token_count'] = token_count return result except json.JSONDecodeError: logger.error(f"Failed to decode cache value for key: {key}") return None def set(self, key: str, value: Dict[str, Any], ttl_override: Optional[int] = None, token_count: Optional[int] = None) -> None: """Set value in cache. Args: key: Cache key value: Value to cache (must be JSON-serializable) ttl_override: Optional TTL override in seconds token_count: Optional token count to store with the cached data """ current_time = int(time.time()) ttl = ttl_override or self.ttl_seconds expires_at = current_time + ttl # Extract token count from value if present and not provided explicitly if token_count is None and '_cached_token_count' in value: token_count = value.pop('_cached_token_count', None) try: value_json = json.dumps(value) except (TypeError, ValueError) as e: logger.error(f"Failed to serialize value for key {key}: {e}") return with sqlite3.connect(str(self.db_path)) as conn: conn.execute( """INSERT OR REPLACE INTO doc_cache (key, value, created_at, expires_at, token_count) VALUES (?, ?, ?, ?, ?)""", (key, value_json, current_time, expires_at, token_count) ) conn.commit() logger.debug(f"Cached key: {key} (expires in {ttl}s, tokens: {token_count})") def delete(self, key: str) -> None: """Delete a key from cache. Args: key: Cache key to delete """ with sqlite3.connect(str(self.db_path)) as conn: conn.execute("DELETE FROM doc_cache WHERE key = ?", (key,)) conn.commit() def clear_expired(self) -> int: """Clear all expired entries from cache. Returns: Number of entries cleared """ current_time = int(time.time()) with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.execute( "DELETE FROM doc_cache WHERE expires_at < ?", (current_time,) ) conn.commit() return cursor.rowcount def clear_all(self) -> None: """Clear all entries from cache.""" with sqlite3.connect(str(self.db_path)) as conn: conn.execute("DELETE FROM doc_cache") conn.commit() def get_stats(self) -> Dict[str, Any]: """Get cache statistics. Returns: Dictionary with cache statistics """ current_time = int(time.time()) with sqlite3.connect(str(self.db_path)) as conn: # Total entries total = conn.execute("SELECT COUNT(*) FROM doc_cache").fetchone()[0] # Expired entries expired = conn.execute( "SELECT COUNT(*) FROM doc_cache WHERE expires_at < ?", (current_time,) ).fetchone()[0] # Entries with token counts with_tokens = conn.execute( "SELECT COUNT(*) FROM doc_cache WHERE token_count IS NOT NULL" ).fetchone()[0] # Total tokens cached total_tokens = conn.execute( "SELECT SUM(token_count) FROM doc_cache WHERE token_count IS NOT NULL AND expires_at >= ?", (current_time,) ).fetchone()[0] or 0 # Database size db_size = self.db_path.stat().st_size if self.db_path.exists() else 0 return { "total_entries": total, "expired_entries": expired, "active_entries": total - expired, "entries_with_token_counts": with_tokens, "total_cached_tokens": total_tokens, "database_size_bytes": db_size, "database_path": str(self.db_path) } # Global cache instance _cache_instance: Optional[CacheManager] = None def get_cache() -> CacheManager: """Get or create the global cache instance.""" global _cache_instance if _cache_instance is None: _cache_instance = CacheManager() return _cache_instance

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/adamsmaka/flutter-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server