xadd
Adds an entry to a Redis stream with customizable fields and an optional expiration time, returning the entry ID upon success. Ideal for managing data streams efficiently.
Instructions
Add an entry to a Redis stream with an optional expiration time.
Args: key (str): The stream key. fields (dict): The fields and values for the stream entry. expiration (int, optional): Expiration time in seconds.
Returns: str: The ID of the added entry or an error message.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| expiration | No | ||
| fields | Yes | ||
| key | Yes |
Implementation Reference
- src/tools/stream.py:9-32 (handler)The xadd tool handler: an async function decorated with @mcp.tool() for registration. It adds an entry to a Redis stream using r.xadd(key, fields), optionally sets expiration with r.expire, and returns a success message or error.@mcp.tool() async def xadd( key: str, fields: Dict[str, Any], expiration: Optional[int] = None ) -> str: """Add an entry to a Redis stream with an optional expiration time. Args: key (str): The stream key. fields (dict): The fields and values for the stream entry. expiration (int, optional): Expiration time in seconds. Returns: str: The ID of the added entry or an error message. """ try: r = RedisConnectionManager.get_connection() entry_id = r.xadd(key, fields) if expiration: r.expire(key, expiration) return f"Successfully added entry {entry_id} to {key}" + ( f" with expiration {expiration} seconds" if expiration else "" ) except RedisError as e: return f"Error adding to stream {key}: {str(e)}"