chatmemorysystem.py•12.2 kB
"""
Chat Memory System
Manages conversation history and context for chat sessions. Provides
persistent storage, retrieval, and management of chat memories with
metadata support and conversation threading.
"""
import asyncio
import json
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import structlog
from pydantic import BaseModel, Field
logger = structlog.get_logger(__name__)
class MemoryEntry(BaseModel):
id: Optional[str] = None
conversation_id: str
content: str
metadata: Dict[str, Any] = Field(default_factory=dict)
timestamp: datetime = Field(default_factory=datetime.utcnow)
role: str = "user"
importance: float = 1.0
class ConversationSummary(BaseModel):
conversation_id: str
summary: str
last_updated: datetime
message_count: int
participants: List[str] = Field(default_factory=list)
class ChatMemorySystem:
def __init__(self, db_path: Optional[str] = None):
self.db_path = db_path or "chat_memory.db"
self.db_lock = asyncio.Lock()
self._init_database()
def _init_database(self):
conn = sqlite3.connect(self.db_path)
try:
conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id TEXT NOT NULL,
content TEXT NOT NULL,
metadata TEXT DEFAULT '{}',
timestamp TEXT NOT NULL,
role TEXT DEFAULT 'user',
importance REAL DEFAULT 1.0
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS conversation_summaries (
conversation_id TEXT PRIMARY KEY,
summary TEXT NOT NULL,
last_updated TEXT NOT NULL,
message_count INTEGER DEFAULT 0,
participants TEXT DEFAULT '[]'
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_conversation_id
ON memories(conversation_id)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp
ON memories(timestamp)
""")
conn.commit()
finally:
conn.close()
async def store_memory(
self,
conversation_id: str,
content: str,
metadata: Optional[Dict[str, Any]] = None,
role: str = "user",
importance: float = 1.0,
) -> str:
async with self.db_lock:
memory = MemoryEntry(
conversation_id=conversation_id,
content=content,
metadata=metadata or {},
role=role,
importance=importance,
)
conn = sqlite3.connect(self.db_path)
try:
cursor = conn.execute("""
INSERT INTO memories
(conversation_id, content, metadata, timestamp, role, importance)
VALUES (?, ?, ?, ?, ?, ?)
""", (
memory.conversation_id,
memory.content,
json.dumps(memory.metadata),
memory.timestamp.isoformat(),
memory.role,
memory.importance,
))
memory_id = str(cursor.lastrowid)
conn.commit()
await self._update_conversation_summary(conversation_id)
logger.info(
"Memory stored",
conversation_id=conversation_id,
memory_id=memory_id,
role=role,
)
return memory_id
finally:
conn.close()
async def get_memory(
self,
conversation_id: str,
limit: Optional[int] = None,
min_importance: float = 0.0,
) -> List[MemoryEntry]:
async with self.db_lock:
conn = sqlite3.connect(self.db_path)
try:
query = """
SELECT id, conversation_id, content, metadata, timestamp, role, importance
FROM memories
WHERE conversation_id = ? AND importance >= ?
ORDER BY timestamp DESC
"""
params = [conversation_id, min_importance]
if limit:
query += " LIMIT ?"
params.append(limit)
cursor = conn.execute(query, params)
rows = cursor.fetchall()
memories = []
for row in rows:
memory = MemoryEntry(
id=str(row[0]),
conversation_id=row[1],
content=row[2],
metadata=json.loads(row[3]),
timestamp=datetime.fromisoformat(row[4]),
role=row[5],
importance=row[6],
)
memories.append(memory)
return memories
finally:
conn.close()
async def search_memory(
self,
query: str,
conversation_id: Optional[str] = None,
limit: int = 10,
) -> List[MemoryEntry]:
async with self.db_lock:
conn = sqlite3.connect(self.db_path)
try:
sql_query = """
SELECT id, conversation_id, content, metadata, timestamp, role, importance
FROM memories
WHERE content LIKE ?
"""
params = [f"%{query}%"]
if conversation_id:
sql_query += " AND conversation_id = ?"
params.append(conversation_id)
sql_query += " ORDER BY importance DESC, timestamp DESC LIMIT ?"
params.append(limit)
cursor = conn.execute(sql_query, params)
rows = cursor.fetchall()
memories = []
for row in rows:
memory = MemoryEntry(
id=str(row[0]),
conversation_id=row[1],
content=row[2],
metadata=json.loads(row[3]),
timestamp=datetime.fromisoformat(row[4]),
role=row[5],
importance=row[6],
)
memories.append(memory)
return memories
finally:
conn.close()
async def get_conversation_summary(self, conversation_id: str) -> Optional[ConversationSummary]:
async with self.db_lock:
conn = sqlite3.connect(self.db_path)
try:
cursor = conn.execute("""
SELECT conversation_id, summary, last_updated, message_count, participants
FROM conversation_summaries
WHERE conversation_id = ?
""", (conversation_id,))
row = cursor.fetchone()
if not row:
return None
return ConversationSummary(
conversation_id=row[0],
summary=row[1],
last_updated=datetime.fromisoformat(row[2]),
message_count=row[3],
participants=json.loads(row[4]),
)
finally:
conn.close()
async def _update_conversation_summary(self, conversation_id: str):
conn = sqlite3.connect(self.db_path)
try:
cursor = conn.execute("""
SELECT COUNT(*) FROM memories WHERE conversation_id = ?
""", (conversation_id,))
message_count = cursor.fetchone()[0]
cursor = conn.execute("""
SELECT content FROM memories
WHERE conversation_id = ?
ORDER BY timestamp DESC
LIMIT 10
""", (conversation_id,))
recent_messages = [row[0] for row in cursor.fetchall()]
summary = f"Conversation with {message_count} messages. Recent topics: {', '.join(recent_messages[:3])}"
conn.execute("""
INSERT OR REPLACE INTO conversation_summaries
(conversation_id, summary, last_updated, message_count, participants)
VALUES (?, ?, ?, ?, ?)
""", (
conversation_id,
summary,
datetime.utcnow().isoformat(),
message_count,
json.dumps(["user", "assistant"]),
))
conn.commit()
finally:
conn.close()
async def delete_memory(self, memory_id: str) -> bool:
async with self.db_lock:
conn = sqlite3.connect(self.db_path)
try:
cursor = conn.execute("""
DELETE FROM memories WHERE id = ?
""", (memory_id,))
deleted = cursor.rowcount > 0
conn.commit()
return deleted
finally:
conn.close()
async def delete_conversation(self, conversation_id: str) -> int:
async with self.db_lock:
conn = sqlite3.connect(self.db_path)
try:
cursor = conn.execute("""
DELETE FROM memories WHERE conversation_id = ?
""", (conversation_id,))
deleted_count = cursor.rowcount
conn.execute("""
DELETE FROM conversation_summaries WHERE conversation_id = ?
""", (conversation_id,))
conn.commit()
return deleted_count
finally:
conn.close()
async def get_conversation_list(self, limit: int = 50) -> List[ConversationSummary]:
async with self.db_lock:
conn = sqlite3.connect(self.db_path)
try:
cursor = conn.execute("""
SELECT conversation_id, summary, last_updated, message_count, participants
FROM conversation_summaries
ORDER BY last_updated DESC
LIMIT ?
""", (limit,))
conversations = []
for row in cursor.fetchall():
conversation = ConversationSummary(
conversation_id=row[0],
summary=row[1],
last_updated=datetime.fromisoformat(row[2]),
message_count=row[3],
participants=json.loads(row[4]),
)
conversations.append(conversation)
return conversations
finally:
conn.close()
async def export_conversation(self, conversation_id: str) -> Dict[str, Any]:
memories = await self.get_memory(conversation_id)
summary = await self.get_conversation_summary(conversation_id)
return {
"conversation_id": conversation_id,
"summary": summary.dict() if summary else None,
"memories": [memory.dict() for memory in memories],
"exported_at": datetime.utcnow().isoformat(),
}
async def import_conversation(self, conversation_data: Dict[str, Any]) -> str:
conversation_id = conversation_data["conversation_id"]
for memory_data in conversation_data.get("memories", []):
await self.store_memory(
conversation_id=memory_data["conversation_id"],
content=memory_data["content"],
metadata=memory_data.get("metadata", {}),
role=memory_data.get("role", "user"),
importance=memory_data.get("importance", 1.0),
)
return conversation_id