Skip to main content
Glama
redis

Redis MCP Server

Official
by redis

xadd

Add entries to Redis streams with optional expiration times to manage time-series data or event logs in Redis databases.

Instructions

Add an entry to a Redis stream with an optional expiration time.

Args: key (str): The stream key. fields (dict): The fields and values for the stream entry. expiration (int, optional): Expiration time in seconds.

Returns: str: The ID of the added entry or an error message.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
keyYes
fieldsYes
expirationNo

Implementation Reference

  • The xadd tool handler function implementing the Redis XADD operation with optional expiration handling and error management.
    @mcp.tool() async def xadd( key: str, fields: Dict[str, Any], expiration: Optional[int] = None ) -> str: """Add an entry to a Redis stream with an optional expiration time. Args: key (str): The stream key. fields (dict): The fields and values for the stream entry. expiration (int, optional): Expiration time in seconds. Returns: str: The ID of the added entry or an error message. """ try: r = RedisConnectionManager.get_connection() entry_id = r.xadd(key, fields) if expiration: r.expire(key, expiration) return f"Successfully added entry {entry_id} to {key}" + ( f" with expiration {expiration} seconds" if expiration else "" ) except RedisError as e: return f"Error adding to stream {key}: {str(e)}"
  • MCP server initialization and tool loading mechanism that imports all src/tools modules, registering the xadd tool via its decorator.
    def load_tools(): import src.tools as tools_pkg for _, module_name, _ in pkgutil.iter_modules(tools_pkg.__path__): importlib.import_module(f"src.tools.{module_name}") # Initialize FastMCP server mcp = FastMCP("Redis MCP Server", dependencies=["redis", "dotenv", "numpy", "aiohttp"]) # Load tools load_tools()
  • Type hints defining the input schema (key: str, fields: dict[str, Any], expiration: Optional[int]) and output (str) for the xadd tool.
    async def xadd( key: str, fields: Dict[str, Any], expiration: Optional[int] = None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/redis/mcp-redis'

If you have feedback or need assistance with the MCP directory API, please join our Discord server