"""
Conversation history storage with Redis backend and in-memory fallback.
This provides persistent conversation history that survives server restarts
and works across multiple server instances (horizontal scaling).
Design:
- Primary: Redis for distributed state
- Fallback: In-memory dict if Redis unavailable
- Automatic TTL for old conversations
- Type-safe with proper error handling
"""
from __future__ import annotations
import json
import time
from typing import List, Dict, Any, Optional
from abc import ABC, abstractmethod
from src.config import settings
from src.observability import get_logger
logger = get_logger(__name__)
class ConversationStore(ABC):
"""Abstract base for conversation storage."""
@abstractmethod
async def get_history(self, user_id: int, org_id: int) -> List[Dict[str, Any]]:
"""Get conversation history for a user."""
pass
@abstractmethod
async def set_history(self, user_id: int, org_id: int, history: List[Dict[str, Any]]) -> None:
"""Save conversation history for a user."""
pass
@abstractmethod
async def clear_history(self, user_id: int, org_id: int) -> None:
"""Clear conversation history for a user."""
pass
class RedisConversationStore(ConversationStore):
"""
Redis-backed conversation storage.
Provides distributed state management for horizontal scaling.
"""
def __init__(self):
"""Initialize Redis connection."""
import redis.asyncio as redis
self.redis = redis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=True,
)
self.ttl = settings.redis_ttl_conversations
logger.info("redis_conversation_store_initialized", url=settings.redis_url)
def _make_key(self, user_id: int, org_id: int) -> str:
"""Generate Redis key for conversation."""
return f"conv:{user_id}:{org_id}"
async def get_history(self, user_id: int, org_id: int) -> List[Dict[str, Any]]:
"""Get conversation history from Redis."""
try:
key = self._make_key(user_id, org_id)
data = await self.redis.get(key)
if data:
history = json.loads(data)
logger.debug(
"conversation_retrieved",
user_id=user_id,
org_id=org_id,
message_count=len(history)
)
return history
return []
except Exception as e:
logger.error(
"redis_get_failed",
user_id=user_id,
org_id=org_id,
error=str(e)
)
return []
async def set_history(self, user_id: int, org_id: int, history: List[Dict[str, Any]]) -> None:
"""Save conversation history to Redis with TTL."""
try:
key = self._make_key(user_id, org_id)
data = json.dumps(history)
await self.redis.setex(key, self.ttl, data)
logger.debug(
"conversation_saved",
user_id=user_id,
org_id=org_id,
message_count=len(history),
ttl=self.ttl
)
except Exception as e:
logger.error(
"redis_set_failed",
user_id=user_id,
org_id=org_id,
error=str(e)
)
async def clear_history(self, user_id: int, org_id: int) -> None:
"""Clear conversation history from Redis."""
try:
key = self._make_key(user_id, org_id)
await self.redis.delete(key)
logger.info(
"conversation_cleared",
user_id=user_id,
org_id=org_id
)
except Exception as e:
logger.error(
"redis_clear_failed",
user_id=user_id,
org_id=org_id,
error=str(e)
)
class InMemoryConversationStore(ConversationStore):
"""
In-memory conversation storage (fallback).
Used when Redis is unavailable. Data is lost on restart.
"""
def __init__(self):
"""Initialize in-memory storage."""
self._store: Dict[tuple[int, int], tuple[List[Dict[str, Any]], float]] = {}
self.ttl = settings.redis_ttl_conversations
logger.warning("in_memory_conversation_store_initialized", note="data_not_persistent")
def _make_key(self, user_id: int, org_id: int) -> tuple[int, int]:
"""Generate key for conversation."""
return (user_id, org_id)
def _cleanup_expired(self) -> None:
"""Remove expired conversations."""
now = time.time()
expired = [
key for key, (_, ts) in self._store.items()
if now - ts > self.ttl
]
for key in expired:
del self._store[key]
async def get_history(self, user_id: int, org_id: int) -> List[Dict[str, Any]]:
"""Get conversation history from memory."""
self._cleanup_expired()
key = self._make_key(user_id, org_id)
item = self._store.get(key)
if item:
history, ts = item
now = time.time()
if now - ts < self.ttl:
logger.debug(
"conversation_retrieved",
user_id=user_id,
org_id=org_id,
message_count=len(history),
source="memory"
)
return history
return []
async def set_history(self, user_id: int, org_id: int, history: List[Dict[str, Any]]) -> None:
"""Save conversation history to memory."""
key = self._make_key(user_id, org_id)
self._store[key] = (history, time.time())
logger.debug(
"conversation_saved",
user_id=user_id,
org_id=org_id,
message_count=len(history),
source="memory"
)
async def clear_history(self, user_id: int, org_id: int) -> None:
"""Clear conversation history from memory."""
key = self._make_key(user_id, org_id)
self._store.pop(key, None)
logger.info(
"conversation_cleared",
user_id=user_id,
org_id=org_id,
source="memory"
)
# Global store instance
_store: Optional[ConversationStore] = None
def get_conversation_store() -> ConversationStore:
"""
Get the conversation store instance.
This creates a singleton store that uses Redis if available,
otherwise falls back to in-memory storage.
Returns:
ConversationStore instance
"""
global _store
if _store is None:
if settings.redis_enabled:
try:
_store = RedisConversationStore()
logger.info("conversation_store_initialized", backend="redis")
except Exception as e:
logger.warning(
"redis_unavailable_fallback_to_memory",
error=str(e)
)
_store = InMemoryConversationStore()
else:
_store = InMemoryConversationStore()
logger.info("conversation_store_initialized", backend="memory")
return _store