Skip to main content
Glama

Redis MCP Server

Official
by redis
misc.py9.05 kB
from typing import Any, Dict, Union, List import aiohttp from redis.exceptions import RedisError from src.common.connection import RedisConnectionManager from src.common.server import mcp from src.common.config import MCP_DOCS_SEARCH_URL from src.version import __version__ @mcp.tool() async def delete(key: str) -> str: """Delete a Redis key. Args: key (str): The key to delete. Returns: str: Confirmation message or an error message. """ try: r = RedisConnectionManager.get_connection() result = r.delete(key) return f"Successfully deleted {key}" if result else f"Key {key} not found" except RedisError as e: return f"Error deleting key {key}: {str(e)}" @mcp.tool() async def type(key: str) -> Dict[str, Any]: """Returns the string representation of the type of the value stored at key Args: key (str): The key to check. Returns: str: The type of key, or none when key doesn't exist """ try: r = RedisConnectionManager.get_connection() key_type = r.type(key) info = {"key": key, "type": key_type, "ttl": r.ttl(key)} return info except RedisError as e: return {"error": str(e)} @mcp.tool() async def expire(name: str, expire_seconds: int) -> str: """Set an expiration time for a Redis key. Args: name: The Redis key. expire_seconds: Time in seconds after which the key should expire. Returns: A success message or an error message. """ try: r = RedisConnectionManager.get_connection() success = r.expire(name, expire_seconds) return ( f"Expiration set to {expire_seconds} seconds for '{name}'." if success else f"Key '{name}' does not exist." ) except RedisError as e: return f"Error setting expiration for key '{name}': {str(e)}" @mcp.tool() async def rename(old_key: str, new_key: str) -> Dict[str, Any]: """ Renames a Redis key from old_key to new_key. Args: old_key (str): The current name of the Redis key to rename. new_key (str): The new name to assign to the key. Returns: Dict[str, Any]: A dictionary containing the result of the operation. On success: {"status": "success", "message": "..."} On error: {"error": "..."} """ try: r = RedisConnectionManager.get_connection() # Check if the old key exists if not r.exists(old_key): return {"error": f"Key '{old_key}' does not exist."} # Rename the key r.rename(old_key, new_key) return { "status": "success", "message": f"Renamed key '{old_key}' to '{new_key}'", } except RedisError as e: return {"error": str(e)} @mcp.tool() async def scan_keys( pattern: str = "*", count: int = 100, cursor: int = 0 ) -> Union[str, Dict[str, Any]]: """ Scan keys in the Redis database using the SCAN command (non-blocking, production-safe). ⚠️ IMPORTANT: This returns PARTIAL results from one iteration. Use scan_all_keys() to get ALL matching keys, or call this function multiple times with the returned cursor until cursor becomes 0. The SCAN command iterates through the keyspace in small chunks, making it safe to use on large databases without blocking other operations. Args: pattern: Pattern to match keys against (default is "*" for all keys). Common patterns: "user:*", "cache:*", "*:123", etc. count: Hint for the number of keys to return per iteration (default 100). Redis may return more or fewer keys than this hint. cursor: The cursor position to start scanning from (0 to start from beginning). To continue scanning, use the cursor value returned from previous call. Returns: A dictionary containing: - 'cursor': Next cursor position (0 means scan is complete) - 'keys': List of keys found in this iteration (PARTIAL RESULTS) - 'total_scanned': Number of keys returned in this batch - 'scan_complete': Boolean indicating if scan is finished Or an error message if something goes wrong. Example usage: First call: scan_keys("user:*") -> returns cursor=1234, keys=[...], scan_complete=False Next call: scan_keys("user:*", cursor=1234) -> continues from where it left off Final call: returns cursor=0, scan_complete=True when done """ try: r = RedisConnectionManager.get_connection() cursor, keys = r.scan(cursor=cursor, match=pattern, count=count) # Convert bytes to strings if needed decoded_keys = [ key.decode("utf-8") if isinstance(key, bytes) else key for key in keys ] return { "cursor": cursor, "keys": decoded_keys, "total_scanned": len(decoded_keys), "scan_complete": cursor == 0, } except RedisError as e: return f"Error scanning keys with pattern '{pattern}': {str(e)}" @mcp.tool() async def scan_all_keys( pattern: str = "*", batch_size: int = 100 ) -> Union[str, List[str]]: """ Scan and return ALL keys matching a pattern using multiple SCAN iterations. This function automatically handles the SCAN cursor iteration to collect all matching keys. It's safer than KEYS * for large databases but will still collect all results in memory. ⚠️ WARNING: With very large datasets (millions of keys), this may consume significant memory. For large-scale operations, consider using scan_keys() with manual iteration instead. Args: pattern: Pattern to match keys against (default is "*" for all keys). batch_size: Number of keys to scan per iteration (default 100). Returns: A list of all keys matching the pattern or an error message. """ try: r = RedisConnectionManager.get_connection() all_keys = [] cursor = 0 while True: cursor, keys = r.scan(cursor=cursor, match=pattern, count=batch_size) # Convert bytes to strings if needed and add to results decoded_keys = [ key.decode("utf-8") if isinstance(key, bytes) else key for key in keys ] all_keys.extend(decoded_keys) # Break when scan is complete (cursor returns to 0) if cursor == 0: break return all_keys except RedisError as e: return f"Error scanning all keys with pattern '{pattern}': {str(e)}" @mcp.tool() async def search_redis_documents( question: str, ) -> Union[List[Dict[str, Any]], Dict[str, Any]]: """Search Redis documentation and knowledge base to learn about Redis concepts and use cases. This tool exposes updated and curated documentation, and must be invoked every time the user wants to learn more in areas including: **Common Use Cases:** - Session Management: User session storage and management - Caching: Application-level and database query caching - Rate Limiting: API throttling and request limiting - Leaderboards: Gaming and ranking systems - Semantic Search: AI-powered similarity search - Agentic Workflows: AI agent state and memory management - RAG (Retrieval-Augmented Generation): Vector storage for AI applications - Real-time Analytics: Counters, metrics, and time-series data - Message Queues: Task queues and job processing - Geospatial: Location-based queries and proximity search Args: question: The question about Redis concepts, data structures, features, or use cases Returns: Union[List[Dict[str, Any]], Dict[str, Any]]: A list of documentation results from the API, or a dict with an error message. """ if not MCP_DOCS_SEARCH_URL: return {"error": "MCP_DOCS_SEARCH_URL environment variable is not configured"} if not question.strip(): return {"error": "Question parameter cannot be empty"} try: headers = { "Accept": "application/json", "User-Agent": f"Redis-MCP-Server/{__version__}", } async with aiohttp.ClientSession() as session: async with session.get( url=MCP_DOCS_SEARCH_URL, params={"q": question}, headers=headers ) as response: # Try to parse JSON response try: result = await response.json() return result except aiohttp.ContentTypeError: # If not JSON, return text content text_content = await response.text() return {"error": f"Non-JSON response: {text_content}"} except aiohttp.ClientError as e: return {"error": f"HTTP client error: {str(e)}"} except Exception as e: return {"error": f"Unexpected error calling ConvAI API: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/redis/mcp-redis'

If you have feedback or need assistance with the MCP directory API, please join our Discord server