Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
cache_adapter.py8.36 kB
""" Cache manager adapter. This module provides adapters that make the existing cache system compatible with the new ICacheManager interface. """ import asyncio import logging from typing import Any from maverick_mcp.data.cache import ( CacheManager as ExistingCacheManager, ) from maverick_mcp.data.cache import ( clear_cache, get_from_cache, save_to_cache, ) from maverick_mcp.providers.interfaces.cache import CacheConfig, ICacheManager logger = logging.getLogger(__name__) class RedisCacheAdapter(ICacheManager): """ Adapter that makes the existing cache system compatible with ICacheManager interface. This adapter wraps the existing cache functions and CacheManager class to provide the new interface while maintaining all existing functionality. """ def __init__(self, config: CacheConfig | None = None): """ Initialize the cache adapter. Args: config: Cache configuration (optional, defaults to environment) """ self._config = config self._cache_manager = ExistingCacheManager() logger.debug("RedisCacheAdapter initialized") async def get(self, key: str) -> Any: """ Get data from cache (async wrapper). Args: key: Cache key to retrieve Returns: Cached data or None if not found or expired """ loop = asyncio.get_event_loop() return await loop.run_in_executor(None, get_from_cache, key) async def set(self, key: str, value: Any, ttl: int | None = None) -> bool: """ Store data in cache (async wrapper). Args: key: Cache key value: Data to cache (must be JSON serializable) ttl: Time-to-live in seconds (None for default TTL) Returns: True if successfully cached, False otherwise """ loop = asyncio.get_event_loop() return await loop.run_in_executor(None, save_to_cache, key, value, ttl) async def delete(self, key: str) -> bool: """ Delete a key from cache. Args: key: Cache key to delete Returns: True if key was deleted, False if key didn't exist """ return await self._cache_manager.delete(key) async def exists(self, key: str) -> bool: """ Check if a key exists in cache. Args: key: Cache key to check Returns: True if key exists and hasn't expired, False otherwise """ return await self._cache_manager.exists(key) async def clear(self, pattern: str | None = None) -> int: """ Clear cache entries. Args: pattern: Pattern to match keys (e.g., "stock:*") If None, clears all cache entries Returns: Number of entries cleared """ loop = asyncio.get_event_loop() return await loop.run_in_executor(None, clear_cache, pattern) async def get_many(self, keys: list[str]) -> dict[str, Any]: """ Get multiple values at once for better performance. Args: keys: List of cache keys to retrieve Returns: Dictionary mapping keys to their cached values (missing keys will not be in the result) """ return await self._cache_manager.get_many(keys) async def set_many(self, items: list[tuple[str, Any, int | None]]) -> int: """ Set multiple values at once for better performance. Args: items: List of tuples (key, value, ttl) Returns: Number of items successfully cached """ return await self._cache_manager.batch_save(items) async def delete_many(self, keys: list[str]) -> int: """ Delete multiple keys for better performance. Args: keys: List of keys to delete Returns: Number of keys successfully deleted """ return await self._cache_manager.batch_delete(keys) async def exists_many(self, keys: list[str]) -> dict[str, bool]: """ Check existence of multiple keys for better performance. Args: keys: List of keys to check Returns: Dictionary mapping keys to their existence status """ return await self._cache_manager.batch_exists(keys) async def count_keys(self, pattern: str) -> int: """ Count keys matching a pattern. Args: pattern: Pattern to match (e.g., "stock:*") Returns: Number of matching keys """ return await self._cache_manager.count_keys(pattern) async def get_or_set( self, key: str, default_value: Any, ttl: int | None = None ) -> Any: """ Get value from cache, setting it if it doesn't exist. Args: key: Cache key default_value: Value to set if key doesn't exist ttl: Time-to-live for the default value Returns: Either the existing cached value or the default value """ # Check if key exists existing_value = await self.get(key) if existing_value is not None: return existing_value # Set default value and return it await self.set(key, default_value, ttl) return default_value async def increment(self, key: str, amount: int = 1) -> int: """ Increment a numeric value in cache. Args: key: Cache key amount: Amount to increment by Returns: New value after increment Raises: ValueError: If the key exists but doesn't contain a numeric value """ # Get current value current = await self.get(key) if current is None: # Key doesn't exist, start from 0 new_value = amount else: # Try to convert to int and increment try: current_int = int(current) new_value = current_int + amount except (ValueError, TypeError): raise ValueError(f"Key {key} contains non-numeric value: {current}") # Set the new value await self.set(key, new_value) return new_value async def set_if_not_exists( self, key: str, value: Any, ttl: int | None = None ) -> bool: """ Set a value only if the key doesn't already exist. Args: key: Cache key value: Value to set ttl: Time-to-live in seconds Returns: True if the value was set, False if key already existed """ # Check if key already exists if await self.exists(key): return False # Key doesn't exist, set the value return await self.set(key, value, ttl) async def get_ttl(self, key: str) -> int | None: """ Get the remaining time-to-live for a key. Args: key: Cache key Returns: Remaining TTL in seconds, None if key doesn't exist or has no TTL """ # This would need to be implemented in the underlying cache manager # For now, return None as we don't have TTL introspection in the existing system logger.warning(f"TTL introspection not implemented for key: {key}") return None async def expire(self, key: str, ttl: int) -> bool: """ Set expiration time for an existing key. Args: key: Cache key ttl: Time-to-live in seconds Returns: True if expiration was set, False if key doesn't exist """ # Check if key exists if not await self.exists(key): return False # Get current value and re-set with new TTL current_value = await self.get(key) if current_value is not None: return await self.set(key, current_value, ttl) return False def get_sync_cache_manager(self) -> ExistingCacheManager: """ Get the underlying synchronous cache manager for backward compatibility. Returns: The wrapped CacheManager instance """ return self._cache_manager

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server