Skip to main content
Glama

MCP Server with LLM Integration

by MelaLitho
chatmemorysystem.py12.2 kB
""" Chat Memory System Manages conversation history and context for chat sessions. Provides persistent storage, retrieval, and management of chat memories with metadata support and conversation threading. """ import asyncio import json import sqlite3 from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import structlog from pydantic import BaseModel, Field logger = structlog.get_logger(__name__) class MemoryEntry(BaseModel): id: Optional[str] = None conversation_id: str content: str metadata: Dict[str, Any] = Field(default_factory=dict) timestamp: datetime = Field(default_factory=datetime.utcnow) role: str = "user" importance: float = 1.0 class ConversationSummary(BaseModel): conversation_id: str summary: str last_updated: datetime message_count: int participants: List[str] = Field(default_factory=list) class ChatMemorySystem: def __init__(self, db_path: Optional[str] = None): self.db_path = db_path or "chat_memory.db" self.db_lock = asyncio.Lock() self._init_database() def _init_database(self): conn = sqlite3.connect(self.db_path) try: conn.execute(""" CREATE TABLE IF NOT EXISTS memories ( id INTEGER PRIMARY KEY AUTOINCREMENT, conversation_id TEXT NOT NULL, content TEXT NOT NULL, metadata TEXT DEFAULT '{}', timestamp TEXT NOT NULL, role TEXT DEFAULT 'user', importance REAL DEFAULT 1.0 ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS conversation_summaries ( conversation_id TEXT PRIMARY KEY, summary TEXT NOT NULL, last_updated TEXT NOT NULL, message_count INTEGER DEFAULT 0, participants TEXT DEFAULT '[]' ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_conversation_id ON memories(conversation_id) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_timestamp ON memories(timestamp) """) conn.commit() finally: conn.close() async def store_memory( self, conversation_id: str, content: str, metadata: Optional[Dict[str, Any]] = None, role: str = "user", importance: float = 1.0, ) -> str: async with self.db_lock: memory = MemoryEntry( conversation_id=conversation_id, content=content, metadata=metadata or {}, role=role, importance=importance, ) conn = sqlite3.connect(self.db_path) try: cursor = conn.execute(""" INSERT INTO memories (conversation_id, content, metadata, timestamp, role, importance) VALUES (?, ?, ?, ?, ?, ?) """, ( memory.conversation_id, memory.content, json.dumps(memory.metadata), memory.timestamp.isoformat(), memory.role, memory.importance, )) memory_id = str(cursor.lastrowid) conn.commit() await self._update_conversation_summary(conversation_id) logger.info( "Memory stored", conversation_id=conversation_id, memory_id=memory_id, role=role, ) return memory_id finally: conn.close() async def get_memory( self, conversation_id: str, limit: Optional[int] = None, min_importance: float = 0.0, ) -> List[MemoryEntry]: async with self.db_lock: conn = sqlite3.connect(self.db_path) try: query = """ SELECT id, conversation_id, content, metadata, timestamp, role, importance FROM memories WHERE conversation_id = ? AND importance >= ? ORDER BY timestamp DESC """ params = [conversation_id, min_importance] if limit: query += " LIMIT ?" params.append(limit) cursor = conn.execute(query, params) rows = cursor.fetchall() memories = [] for row in rows: memory = MemoryEntry( id=str(row[0]), conversation_id=row[1], content=row[2], metadata=json.loads(row[3]), timestamp=datetime.fromisoformat(row[4]), role=row[5], importance=row[6], ) memories.append(memory) return memories finally: conn.close() async def search_memory( self, query: str, conversation_id: Optional[str] = None, limit: int = 10, ) -> List[MemoryEntry]: async with self.db_lock: conn = sqlite3.connect(self.db_path) try: sql_query = """ SELECT id, conversation_id, content, metadata, timestamp, role, importance FROM memories WHERE content LIKE ? """ params = [f"%{query}%"] if conversation_id: sql_query += " AND conversation_id = ?" params.append(conversation_id) sql_query += " ORDER BY importance DESC, timestamp DESC LIMIT ?" params.append(limit) cursor = conn.execute(sql_query, params) rows = cursor.fetchall() memories = [] for row in rows: memory = MemoryEntry( id=str(row[0]), conversation_id=row[1], content=row[2], metadata=json.loads(row[3]), timestamp=datetime.fromisoformat(row[4]), role=row[5], importance=row[6], ) memories.append(memory) return memories finally: conn.close() async def get_conversation_summary(self, conversation_id: str) -> Optional[ConversationSummary]: async with self.db_lock: conn = sqlite3.connect(self.db_path) try: cursor = conn.execute(""" SELECT conversation_id, summary, last_updated, message_count, participants FROM conversation_summaries WHERE conversation_id = ? """, (conversation_id,)) row = cursor.fetchone() if not row: return None return ConversationSummary( conversation_id=row[0], summary=row[1], last_updated=datetime.fromisoformat(row[2]), message_count=row[3], participants=json.loads(row[4]), ) finally: conn.close() async def _update_conversation_summary(self, conversation_id: str): conn = sqlite3.connect(self.db_path) try: cursor = conn.execute(""" SELECT COUNT(*) FROM memories WHERE conversation_id = ? """, (conversation_id,)) message_count = cursor.fetchone()[0] cursor = conn.execute(""" SELECT content FROM memories WHERE conversation_id = ? ORDER BY timestamp DESC LIMIT 10 """, (conversation_id,)) recent_messages = [row[0] for row in cursor.fetchall()] summary = f"Conversation with {message_count} messages. Recent topics: {', '.join(recent_messages[:3])}" conn.execute(""" INSERT OR REPLACE INTO conversation_summaries (conversation_id, summary, last_updated, message_count, participants) VALUES (?, ?, ?, ?, ?) """, ( conversation_id, summary, datetime.utcnow().isoformat(), message_count, json.dumps(["user", "assistant"]), )) conn.commit() finally: conn.close() async def delete_memory(self, memory_id: str) -> bool: async with self.db_lock: conn = sqlite3.connect(self.db_path) try: cursor = conn.execute(""" DELETE FROM memories WHERE id = ? """, (memory_id,)) deleted = cursor.rowcount > 0 conn.commit() return deleted finally: conn.close() async def delete_conversation(self, conversation_id: str) -> int: async with self.db_lock: conn = sqlite3.connect(self.db_path) try: cursor = conn.execute(""" DELETE FROM memories WHERE conversation_id = ? """, (conversation_id,)) deleted_count = cursor.rowcount conn.execute(""" DELETE FROM conversation_summaries WHERE conversation_id = ? """, (conversation_id,)) conn.commit() return deleted_count finally: conn.close() async def get_conversation_list(self, limit: int = 50) -> List[ConversationSummary]: async with self.db_lock: conn = sqlite3.connect(self.db_path) try: cursor = conn.execute(""" SELECT conversation_id, summary, last_updated, message_count, participants FROM conversation_summaries ORDER BY last_updated DESC LIMIT ? """, (limit,)) conversations = [] for row in cursor.fetchall(): conversation = ConversationSummary( conversation_id=row[0], summary=row[1], last_updated=datetime.fromisoformat(row[2]), message_count=row[3], participants=json.loads(row[4]), ) conversations.append(conversation) return conversations finally: conn.close() async def export_conversation(self, conversation_id: str) -> Dict[str, Any]: memories = await self.get_memory(conversation_id) summary = await self.get_conversation_summary(conversation_id) return { "conversation_id": conversation_id, "summary": summary.dict() if summary else None, "memories": [memory.dict() for memory in memories], "exported_at": datetime.utcnow().isoformat(), } async def import_conversation(self, conversation_data: Dict[str, Any]) -> str: conversation_id = conversation_data["conversation_id"] for memory_data in conversation_data.get("memories", []): await self.store_memory( conversation_id=memory_data["conversation_id"], content=memory_data["content"], metadata=memory_data.get("metadata", {}), role=memory_data.get("role", "user"), importance=memory_data.get("importance", 1.0), ) return conversation_id

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MelaLitho/MCPServer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server