Skip to main content
Glama
storage.py23.8 kB
""" Handles file storage operations for the bm-graph-memory system. Implements safety checks with _bm_gm markers and provides JSONL file operations for the knowledge graph data. Storage location is determined by the MEMORY_FOLDER environment variable. """ import json import logging import os from pathlib import Path from typing import List, Dict, Any, Optional # Configure logging logger = logging.getLogger(__name__) # Safety marker for bm_gm system BM_GM_SAFETY_MARKER = {"type": "_bm_gm", "source": "brikerman-graph-memory-mcp"} class MemoryStorage: """ Handles file storage operations for the bm-graph-memory system. Implements safety checks with _bm_gm markers and provides JSONL file operations for the knowledge graph data. Storage location is determined by the MEMORY_FOLDER environment variable. """ def __init__(self): # Use MEMORY_FOLDER environment variable or default to ~/.bm memory_folder = os.environ.get("MEMORY_FOLDER") self.storage_path = Path(memory_folder) if memory_folder else Path.home() / ".bm" def get_storage_path(self) -> Path: """ Get the storage path based on MEMORY_FOLDER environment variable. Returns: Path to the storage directory """ return self.storage_path def initialize_storage(self): """ Initialize storage directory and create main database if needed. """ storage_path = self.get_storage_path() storage_path.mkdir(exist_ok=True) main_db_path = storage_path / "memory.jsonl" if not main_db_path.exists(): self._create_empty_database(main_db_path) logger.info(f"Created main database at {main_db_path}") def _create_empty_database(self, db_path: Path): """Create an empty database file with safety marker.""" with open(db_path, 'w') as f: f.write(json.dumps(BM_GM_SAFETY_MARKER) + '\n') def _validate_database_file(self, db_path: Path) -> bool: """ Validate that a database file has the proper _bm_gm safety marker. Args: db_path: Path to the database file Returns: True if file is valid, False otherwise """ if not db_path.exists(): return False try: with open(db_path, 'r') as f: first_line = f.readline().strip() if not first_line: return False marker = json.loads(first_line) return (marker.get("type") == "_bm_gm" and marker.get("source") == "brikerman-graph-memory-mcp") except (json.JSONDecodeError, IOError): return False def get_database_path(self, context: str = "main") -> Path: """ Get the file path for a specific context database. Args: context: Database context name (defaults to 'main') Returns: Path to the database file """ storage_path = self.get_storage_path() if context == "main": return storage_path / "memory.jsonl" else: return storage_path / f"memory-{context}.jsonl" def load_database(self, context: str = "main") -> List[Dict[str, Any]]: """ Load all records from a database file. Args: context: Database context name Returns: List of database records (excluding safety marker) """ db_path = self.get_database_path(context) if not self._validate_database_file(db_path): raise ValueError(f"Invalid or missing database file: {db_path}") records = [] with open(db_path, 'r') as f: lines = f.readlines() # Skip the first line (safety marker) for line in lines[1:]: line = line.strip() if line: try: records.append(json.loads(line)) except json.JSONDecodeError: logger.warning(f"Skipping invalid JSON line in {db_path}: {line}") return records def save_database(self, records: List[Dict[str, Any]], context: str = "main"): """ Save records to a database file. Args: records: List of records to save context: Database context name """ db_path = self.get_database_path(context) # Ensure directory exists db_path.parent.mkdir(exist_ok=True) with open(db_path, 'w') as f: # Write safety marker first f.write(json.dumps(BM_GM_SAFETY_MARKER) + '\n') # Write all records for record in records: f.write(json.dumps(record) + '\n') def list_contexts(self) -> List[str]: """ List all available database contexts. Returns: List of context names """ storage_path = self.get_storage_path() if not storage_path.exists(): return [] contexts = [] # Check for main database if (storage_path / "memory.jsonl").exists(): contexts.append("main") # Check for context-specific databases for file_path in storage_path.glob("memory-*.jsonl"): context_name = file_path.stem.replace("memory-", "") if self._validate_database_file(file_path): contexts.append(context_name) return contexts def memory_create_entities(self, entities: List[Dict[str, Any]], context: str = "main") -> List[str]: """ Add new entities (people, places, concepts) to the knowledge graph. Args: entities: List of entity objects to add (must include name and entity_type) context: Database context name (defaults to 'main') Returns: List of entity names that were created """ # Initialize storage if needed self.initialize_storage() # Load existing database try: records = self.load_database(context) except ValueError: # If database doesn't exist, create it db_path = self.get_database_path(context) self._create_empty_database(db_path) records = [] # Extract existing entity names in this context to avoid duplicates existing_entities = { record.get("name") for record in records if record.get("type") == "entity" and "name" in record } # Track created entity names created_names = [] # Process each entity for entity in entities: # Ensure entity has required fields if "name" not in entity or "entity_type" not in entity: logger.warning("Skipping entity without required name or entity_type") continue entity_name = entity["name"] # Skip if entity already exists in this context if entity_name in existing_entities: logger.warning(f"Entity with name '{entity_name}' already exists in context '{context}', skipping") continue # Add type marker if not present if "type" not in entity: entity["type"] = "entity" # Initialize empty observations list if not present if "observations" not in entity: entity["observations"] = [] # Rename entity_type to entityType to match our schema if "entity_type" in entity and "entityType" not in entity: entity["entityType"] = entity["entity_type"] del entity["entity_type"] # Add initial observations if provided if "initial_observations" in entity and entity["initial_observations"]: entity["observations"] = entity["initial_observations"] del entity["initial_observations"] # Add to records and track name records.append(entity) created_names.append(entity_name) existing_entities.add(entity_name) # Save updated database self.save_database(records, context) return created_names def memory_create_relations(self, relations: List[Dict[str, Any]], context: str = "main") -> int: """ Create labeled links between existing entities in the knowledge graph. Args: relations: List of relation objects to add (must include source, target, and relation_type) context: Database context name (defaults to 'main') Returns: Number of relations that were created """ # Initialize storage if needed self.initialize_storage() # Load existing database try: records = self.load_database(context) except ValueError: # If database doesn't exist, create it db_path = self.get_database_path(context) self._create_empty_database(db_path) records = [] # Extract existing entity names for validation entity_names = { record.get("name") for record in records if record.get("type") == "entity" and "name" in record } # Track created relations count created_count = 0 # Process each relation for relation in relations: # Ensure relation has required fields required_fields = ["source", "target", "relation_type"] if not all(field in relation for field in required_fields): missing = [f for f in required_fields if f not in relation] logger.warning(f"Skipping relation missing required fields: {missing}") continue source_name = relation["source"] target_name = relation["target"] # Validate that source and target entities exist if source_name not in entity_names: logger.warning(f"Source entity '{source_name}' does not exist in context '{context}', skipping relation") continue if target_name not in entity_names: logger.warning(f"Target entity '{target_name}' does not exist in context '{context}', skipping relation") continue # Add type marker if not present if "type" not in relation: relation["type"] = "relation" # Rename relation_type to relationType to match our schema if "relation_type" in relation and "relationType" not in relation: relation["relationType"] = relation["relation_type"] del relation["relation_type"] # Add to records and increment count records.append(relation) created_count += 1 # Save updated database self.save_database(records, context) return created_count def memory_add_observations(self, entity_name: str, observations: List[str], context: str = "main") -> int: """ Add text information to an existing entity in the knowledge graph. Args: entity_name: Name of the entity to add observations to observations: List of observation text contents to add context: Database context name (defaults to 'main') Returns: Number of observations that were added """ # Initialize storage if needed self.initialize_storage() # Load existing database try: records = self.load_database(context) except ValueError: # If database doesn't exist, create it db_path = self.get_database_path(context) self._create_empty_database(db_path) records = [] # Find the entity to update entity_found = False observations_added = 0 for record in records: if record.get("type") == "entity" and record.get("name") == entity_name: entity_found = True # Ensure the entity has an observations list if "observations" not in record: record["observations"] = [] # Add new observations for observation in observations: record["observations"].append(observation) observations_added += 1 break if not entity_found: logger.warning(f"Entity '{entity_name}' not found in context '{context}', no observations added") return 0 # Save updated database self.save_database(records, context) return observations_added def memory_search_nodes(self, keywords: List[str], contexts: Optional[List[str]] = None) -> Dict[str, List[Dict[str, Any]]]: """ Search for information in the knowledge graph using keywords. Args: keywords: List of keywords to search for contexts: List of context names to search in (if None, search in all contexts) Returns: Dictionary mapping context names to lists of matching records The results will always include the full contents of the 'main' database. """ # Initialize storage if needed self.initialize_storage() # If no contexts specified, search all available contexts if contexts is None: contexts = self.list_contexts() # Ensure 'main' is always included if "main" not in contexts: contexts.append("main") results = {} # Process each context for context in contexts: try: records = self.load_database(context) except ValueError: # Skip invalid databases logger.warning(f"Skipping invalid database context: {context}") continue # For 'main' context, include all records regardless of keywords if context == "main": results[context] = records continue # For other contexts, filter by keywords matching_records = [] for record in records: # Check each field in the record for keyword matches record_str = json.dumps(record).lower() if any(keyword.lower() in record_str for keyword in keywords): matching_records.append(record) if matching_records: results[context] = matching_records return results def memory_read_graph(self, contexts: Optional[List[str]] = None) -> Dict[str, List[Dict[str, Any]]]: """ Dump the entire content of one or more databases. Args: contexts: List of context names to read (if None, read all contexts) Returns: Dictionary mapping context names to lists of records The results will always include the full contents of the 'main' database. """ # Initialize storage if needed self.initialize_storage() # If no contexts specified, read all available contexts if contexts is None: contexts = self.list_contexts() # Ensure 'main' is always included if "main" not in contexts: contexts.append("main") results = {} # Process each context for context in contexts: try: records = self.load_database(context) results[context] = records except ValueError: # Skip invalid databases logger.warning(f"Skipping invalid database context: {context}") continue return results def memory_list_databases(self) -> List[str]: """ Show all available memory databases (contexts). Returns: List of available context names """ # This is essentially the same as list_contexts return self.list_contexts() def memory_delete_entities(self, entity_names: List[str], context: str = "main") -> List[str]: """ Remove entities from the graph. Args: entity_names: List of entity names to remove context: Database context name (defaults to 'main') Returns: List of entity names that were successfully removed """ # Initialize storage if needed self.initialize_storage() try: records = self.load_database(context) except ValueError: logger.warning(f"Invalid or missing database context: {context}") return [] deleted_names = [] updated_records = [] # Create a set for faster lookups entity_names_set = set(entity_names) # Also remove related relations for record in records: record_type = record.get("type") if record_type == "entity" and record.get("name") in entity_names_set: # Skip this entity (effectively removing it) deleted_names.append(record.get("name")) continue elif record_type == "relation" and (record.get("source") in entity_names_set or record.get("target") in entity_names_set): # Skip relations that involve deleted entities continue # Keep all other records updated_records.append(record) # Save updated database self.save_database(updated_records, context) return deleted_names def memory_delete_relations(self, source: str, target: str, relation_type: Optional[str] = None, context: str = "main") -> int: """ Remove specific relationships between entities. Args: source: Name of the source entity target: Name of the target entity relation_type: Type of relation to delete (if None, deletes all relations between entities) context: Database context name (defaults to 'main') Returns: Number of relations that were successfully removed """ # Initialize storage if needed self.initialize_storage() try: records = self.load_database(context) except ValueError: logger.warning(f"Invalid or missing database context: {context}") return 0 deleted_count = 0 updated_records = [] remaining_relations = [] # Remove specified relations for record in records: if record.get("type") == "relation" and record.get("source") == source and record.get("target") == target: # If relation_type is specified, only delete relations of that type if relation_type is None or record.get("relationType") == relation_type: # Skip this relation (effectively removing it) deleted_count += 1 continue else: # Keep this relation and add to remaining_relations remaining_relations.append(record) # Keep all other records updated_records.append(record) # Save updated database self.save_database(updated_records, context) return deleted_count def memory_delete_observations(self, entity_name: str, observation_indices: List[int], context: str = "main") -> int: """ Remove specific observations from an entity. Args: entity_name: Name of the entity to remove observations from observation_indices: List of observation indices (0-based) to remove context: Database context name (defaults to 'main') Returns: Number of observations that were successfully removed """ # Initialize storage if needed self.initialize_storage() try: records = self.load_database(context) except ValueError: logger.warning(f"Invalid or missing database context: {context}") return 0 deleted_count = 0 entity_found = False # Sort indices in descending order to avoid shifting issues observation_indices = sorted(observation_indices, reverse=True) # Find the entity and remove observations for record in records: if record.get("type") == "entity" and record.get("name") == entity_name: entity_found = True if "observations" not in record or not record["observations"]: logger.warning(f"Entity '{entity_name}' has no observations to delete") break # Delete observations at the specified indices for index in observation_indices: if 0 <= index < len(record["observations"]): del record["observations"][index] deleted_count += 1 else: logger.warning(f"Invalid observation index {index} for entity '{entity_name}'") break if not entity_found: logger.warning(f"Entity '{entity_name}' not found in context '{context}', no observations deleted") return 0 # Save updated database self.save_database(records, context) return deleted_count def memory_read_entity(self, entity_name: str, context: str = "main") -> Optional[Dict[str, Any]]: """ Read a specific entity from the knowledge graph. Args: entity_name: Name of the entity to read context: Database context name (defaults to 'main') Returns: The entity data if found, None otherwise """ # Initialize storage if needed self.initialize_storage() try: records = self.load_database(context) except ValueError: logger.warning(f"Invalid or missing database context: {context}") return None # Find the entity with the given name for record in records: if record.get("type") == "entity" and record.get("name") == entity_name: return record # Entity not found return None

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BrikerMan/graph-memory-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server