Memory MCP Server
by evangstav
- memory_mcp_server
"""Knowledge graph manager that delegates to a configured backend."""
import asyncio
from pathlib import Path
from typing import Dict, List, Optional, Union
from .backends.base import Backend
from .backends.jsonl import JsonlBackend
from .interfaces import Entity, KnowledgeGraph, Relation, SearchOptions
from .validation import KnowledgeGraphValidator, ValidationError
class KnowledgeGraphManager:
"""Manages knowledge graph operations through a configured backend."""
backend: Backend
_write_lock: asyncio.Lock
def __init__(
self,
backend: Union[Backend, Path],
cache_ttl: int = 60,
):
"""Initialize the KnowledgeGraphManager.
Args:
backend: Either a Backend instance or Path to use default JSONL backend
cache_ttl: Cache TTL in seconds (only used for JSONL backend)
"""
if isinstance(backend, Path):
self.backend = JsonlBackend(backend, cache_ttl)
else:
self.backend = backend
self._write_lock = asyncio.Lock()
async def initialize(self) -> None:
"""Initialize the backend connection."""
await self.backend.initialize()
async def close(self) -> None:
"""Close the backend connection."""
await self.backend.close()
async def create_entities(self, entities: List[Entity]) -> List[Entity]:
"""Create multiple new entities.
Args:
entities: List of entities to create
Returns:
List of successfully created entities
Raises:
ValidationError: If any entity fails validation
"""
# Get existing entities for validation
graph = await self.read_graph()
existing_names = {entity.name for entity in graph.entities}
# Validate all entities in one pass
KnowledgeGraphValidator.validate_batch_entities(entities, existing_names)
async with self._write_lock:
return await self.backend.create_entities(entities)
async def delete_entities(self, entity_names: List[str]) -> List[str]:
"""Delete multiple existing entities by name.
Args:
entity_names: List of entity names to delete
Returns:
List of successfully deleted entity names
Raises:
ValueError: If entity_names list is empty
EntityNotFoundError: If any entity is not found in the graph
FileAccessError: If there are file system issues (backend specific)
"""
if not entity_names:
raise ValueError("Entity names list cannot be empty")
async with self._write_lock:
return await self.backend.delete_entities(entity_names)
async def delete_relations(self, from_: str, to: str) -> None:
"""Delete relations between two entities.
Args:
from_: Source entity name
to: Target entity name
Raises:
EntityNotFoundError: If either entity is not found
"""
async with self._write_lock:
return await self.backend.delete_relations(from_, to)
async def create_relations(self, relations: List[Relation]) -> List[Relation]:
"""Create multiple new relations.
Args:
relations: List of relations to create
Returns:
List of successfully created relations
Raises:
ValidationError: If any relation fails validation
EntityNotFoundError: If referenced entities don't exist
"""
# Get existing graph for validation
graph = await self.read_graph()
existing_names = {entity.name for entity in graph.entities}
# Validate all relations in one pass
KnowledgeGraphValidator.validate_batch_relations(
relations, graph.relations, existing_names
)
async with self._write_lock:
return await self.backend.create_relations(relations)
async def read_graph(self) -> KnowledgeGraph:
"""Read the entire knowledge graph.
Returns:
Current state of the knowledge graph
"""
return await self.backend.read_graph()
async def search_nodes(
self, query: str, options: Optional[SearchOptions] = None
) -> KnowledgeGraph:
"""Search for entities and relations matching query.
Args:
query: Search query string
options: Optional SearchOptions for configuring search behavior.
If None, uses exact substring matching.
Returns:
KnowledgeGraph containing matches
Raises:
ValueError: If query is empty or options are invalid
"""
return await self.backend.search_nodes(query, options)
async def flush(self) -> None:
"""Ensure any pending changes are persisted."""
await self.backend.flush()
async def add_observations(self, entity_name: str, observations: List[str]) -> None:
"""Add observations to an existing entity.
Args:
entity_name: Name of the entity to add observations to
observations: List of observations to add
Raises:
EntityNotFoundError: If the entity is not found
ValidationError: If observations are invalid
ValueError: If observations list is empty
"""
if not observations:
raise ValueError("Observations list cannot be empty")
# Validate new observations
KnowledgeGraphValidator.validate_observations(observations)
# Get existing entity to check for duplicate observations
graph = await self.read_graph()
entity = next((e for e in graph.entities if e.name == entity_name), None)
if not entity:
raise ValidationError(f"Entity not found: {entity_name}")
# Check for duplicates against existing observations
existing_observations = set(entity.observations)
duplicates = [obs for obs in observations if obs in existing_observations]
if duplicates:
raise ValidationError(f"Duplicate observations: {', '.join(duplicates)}")
async with self._write_lock:
await self.backend.add_observations(entity_name, observations)
async def add_batch_observations(
self, observations_map: Dict[str, List[str]]
) -> None:
"""Add observations to multiple entities in a single operation.
Args:
observations_map: Dictionary mapping entity names to lists of observations
Raises:
ValidationError: If any observations are invalid
EntityNotFoundError: If any entity is not found
ValueError: If observations_map is empty
"""
# Get existing graph for validation
graph = await self.read_graph()
entities_map = {entity.name: entity for entity in graph.entities}
# Validate all observations in one pass
KnowledgeGraphValidator.validate_batch_observations(
observations_map, entities_map
)
# All validation passed, perform the batch update
async with self._write_lock:
await self.backend.add_batch_observations(observations_map)