Skip to main content
Glama

Graph Memory MCP

by BrikerMan
main.py16.4 kB
import json import logging from pathlib import Path from textwrap import dedent from typing import Optional, List, Dict, Any from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Depends from pydantic import BaseModel, Field from schema import Entity, Relation # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Safety marker for bm_gm system BM_GM_SAFETY_MARKER = {"type": "_bm_gm", "source": "brikerman-graph-memory-mcp"} @asynccontextmanager async def lifespan(app: FastAPI): """FastAPI lifespan handler for startup and shutdown events.""" logger.info("Starting Brikerman Graph Memory MCP Server...") # Initialize storage directories if needed storage = MemoryStorage() storage.initialize_storage() yield logger.info("Shutting down Brikerman Graph Memory MCP Server...") # Initialize FastAPI app with metadata app = FastAPI( title="Brikerman Graph Memory MCP", description=dedent(""" Brikerman Graph Memory MCP Server - FastAPI Implementation A persistent, indexed knowledge graph for AI agents, designed for MCP-compatible platforms. This FastAPI server provides REST endpoints for managing a knowledge graph composed of entities, relationships, and observations. Core Concepts: - The 'main' database acts as the core index and routing table - Contexts provide specialized memories for specific topics - Storage location is determined by the MEMORY_FOLDER environment variable - Safety system using _bm_gm markers and naming conventions """), version="0.1.0", lifespan=lifespan, docs_url="/docs", redoc_url="/redoc", ) class MemoryStorage: """ Handles file storage operations for the bm-graph-memory system. Manages both project-local (.bm/) and global storage locations, implements safety checks with _bm_gm markers, and provides JSONL file operations for the knowledge graph data. """ def __init__(self): self.project_path = Path.cwd() / ".bm" self.global_path = Path.home() / ".bm" def get_storage_path(self, location: Optional[str] = None) -> Path: """ Determine storage path based on location preference and availability. Args: location: Force 'project' or 'global' storage, or None for auto-detect Returns: Path to the storage directory """ if location == "project": return self.project_path elif location == "global": return self.global_path else: # Auto-detect: prefer project-local if .bm directory exists return self.project_path if self.project_path.exists() else self.global_path def initialize_storage(self, location: Optional[str] = None): """ Initialize storage directory and create main database if needed. Args: location: Storage location preference """ storage_path = self.get_storage_path(location) storage_path.mkdir(exist_ok=True) main_db_path = storage_path / "memory.jsonl" if not main_db_path.exists(): self._create_empty_database(main_db_path) logger.info(f"Created main database at {main_db_path}") def _create_empty_database(self, db_path: Path): """Create an empty database file with safety marker.""" with open(db_path, "w") as f: f.write(json.dumps(BM_GM_SAFETY_MARKER) + "\n") def _validate_database_file(self, db_path: Path) -> bool: """ Validate that a database file has the proper _bm_gm safety marker. Args: db_path: Path to the database file Returns: True if file is valid, False otherwise """ if not db_path.exists(): return False try: with open(db_path, "r") as f: first_line = f.readline().strip() if not first_line: return False marker = json.loads(first_line) return ( marker.get("type") == "_bm_gm" and marker.get("source") == "brikerman-graph-memory-mcp" ) except (json.JSONDecodeError, IOError): return False def get_database_path( self, context: str = "main", location: Optional[str] = None ) -> Path: """ Get the file path for a specific context database. Args: context: Database context name (defaults to 'main') location: Storage location preference Returns: Path to the database file """ storage_path = self.get_storage_path(location) if context == "main": return storage_path / "memory.jsonl" else: return storage_path / f"memory-{context}.jsonl" def load_database( self, context: str = "main", location: Optional[str] = None ) -> List[Dict[str, Any]]: """ Load all records from a database file. Args: context: Database context name location: Storage location preference Returns: List of database records (excluding safety marker) """ db_path = self.get_database_path(context, location) if not self._validate_database_file(db_path): raise ValueError(f"Invalid or missing database file: {db_path}") records = [] with open(db_path, "r") as f: lines = f.readlines() # Skip the first line (safety marker) for line in lines[1:]: line = line.strip() if line: try: records.append(json.loads(line)) except json.JSONDecodeError: logger.warning( f"Skipping invalid JSON line in {db_path}: {line}" ) return records def save_database( self, records: List[Dict[str, Any]], context: str = "main", location: Optional[str] = None, ): """ Save records to a database file. Args: records: List of records to save context: Database context name location: Storage location preference """ db_path = self.get_database_path(context, location) # Ensure directory exists db_path.parent.mkdir(exist_ok=True) with open(db_path, "w") as f: # Write safety marker first f.write(json.dumps(BM_GM_SAFETY_MARKER) + "\n") # Write all records for record in records: f.write(json.dumps(record) + "\n") def list_contexts(self, location: Optional[str] = None) -> List[str]: """ List all available database contexts. Args: location: Storage location preference Returns: List of context names """ storage_path = self.get_storage_path(location) if not storage_path.exists(): return [] contexts = [] # Check for main database if (storage_path / "memory.jsonl").exists(): contexts.append("main") # Check for context-specific databases for file_path in storage_path.glob("memory-*.jsonl"): context_name = file_path.stem.replace("memory-", "") if self._validate_database_file(file_path): contexts.append(context_name) return contexts # Dependency to get storage instance def get_storage() -> MemoryStorage: """Dependency function to get MemoryStorage instance.""" return MemoryStorage() # Request/Response Models for API endpoints class DeleteRelationsRequest(BaseModel): """Request model for deleting relations from the knowledge graph.""" source: str = Field(description="Name of the source entity in the relation") target: str = Field(description="Name of the target entity in the relation") relation_type: Optional[str] = Field( default=None, description="Specific type of relation to delete (if None, deletes all relations between entities)", ) context: Optional[str] = Field( default=None, description="Database context to target (defaults to 'main')" ) location: Optional[str] = Field( default=None, description="Force 'project' or 'global' storage location" ) class DeleteRelationsResponse(BaseModel): """Response model for the delete relations operation.""" success: bool = Field(description="Whether the operation was successful") deleted_count: int = Field(description="Number of relations deleted") message: str = Field(description="Human-readable status message") remaining_relations: List[Relation] = Field( default_factory=list, description="Remaining relations between the specified entities", ) class ErrorResponse(BaseModel): """Standard error response model.""" detail: str = Field(description="Error message") error_type: str = Field(description="Type of error") # API Endpoints @app.delete( "/memory/relations", response_model=DeleteRelationsResponse, responses={ 404: {"model": ErrorResponse, "description": "Entity or relation not found"}, 400: {"model": ErrorResponse, "description": "Invalid request parameters"}, 500: {"model": ErrorResponse, "description": "Internal server error"}, }, summary="Delete Relations Between Entities", description=""" Removes specific relationships between entities in the knowledge graph. This endpoint allows you to: - Delete all relations between two entities - Delete only relations of a specific type between two entities - Target specific database contexts (work, personal, etc.) - Use either project-local or global storage **Important Notes:** - The 'main' database contents are always included in search results for context - Both source and target entities must exist in the specified context - If relation_type is not specified, all relations between the entities will be deleted - The operation is atomic - either all specified relations are deleted or none """, ) async def delete_relations( request: DeleteRelationsRequest, storage: MemoryStorage = Depends(get_storage) ): """ Delete relations between entities in the knowledge graph. This endpoint implements the `memory_delete_relations` functionality described in the README. It removes specific relationships between entities while maintaining the integrity of the knowledge graph. Args: request: Delete relations request containing source, target, and optional filters storage: Memory storage instance (injected dependency) Returns: DeleteRelationsResponse with operation results Raises: HTTPException: For various error conditions (404, 400, 500) """ try: # Use 'main' context if none specified context = request.context or "main" # Load the target database try: records = storage.load_database(context, request.location) except ValueError as e: raise HTTPException( status_code=404, detail=f"Database context '{context}' not found or invalid: {str(e)}", ) # Find entities to verify they exist entities = { record["name"]: record for record in records if record.get("type") == "entity" } if request.source not in entities: raise HTTPException( status_code=404, detail=f"Source entity '{request.source}' not found in context '{context}'", ) if request.target not in entities: raise HTTPException( status_code=404, detail=f"Target entity '{request.target}' not found in context '{context}'", ) # Find and filter relations to delete relations_to_keep = [] relations_to_delete = [] deleted_count = 0 for record in records: if record.get("type") == "relation": # Check if this relation matches our deletion criteria matches_entities = ( ( record.get("source") == request.source and record.get("target") == request.target ) or # Also check reverse direction for undirected relations ( not record.get("directed", True) and record.get("source") == request.target and record.get("target") == request.source ) ) if matches_entities: # If relation_type is specified, only delete relations of that type if ( request.relation_type is None or record.get("relationType") == request.relation_type ): relations_to_delete.append(record) deleted_count += 1 else: relations_to_keep.append(record) else: relations_to_keep.append(record) else: # Keep all non-relation records (entities, observations, etc.) relations_to_keep.append(record) # Check if any relations were found to delete if deleted_count == 0: if request.relation_type: detail = f"No relations of type '{request.relation_type}' found between '{request.source}' and '{request.target}'" else: detail = f"No relations found between '{request.source}' and '{request.target}'" raise HTTPException(status_code=404, detail=detail) # Save the updated database storage.save_database(relations_to_keep, context, request.location) # Get remaining relations between these entities for response remaining_relations = [] for record in relations_to_keep: if record.get("type") == "relation": matches_entities = ( record.get("source") == request.source and record.get("target") == request.target ) or ( not record.get("directed", True) and record.get("source") == request.target and record.get("target") == request.source ) if matches_entities: remaining_relations.append( Relation( source=record["source"], target=record["target"], relationType=record["relationType"], directed=record.get("directed", True), ) ) # Prepare success response if request.relation_type: message = f"Deleted {deleted_count} '{request.relation_type}' relation(s) between '{request.source}' and '{request.target}'" else: message = f"Deleted {deleted_count} relation(s) between '{request.source}' and '{request.target}'" if context != "main": message += f" in context '{context}'" return DeleteRelationsResponse( success=True, deleted_count=deleted_count, message=message, remaining_relations=remaining_relations, ) except HTTPException: # Re-raise HTTP exceptions as-is raise except Exception as e: # Log unexpected errors and return 500 logger.error(f"Unexpected error in delete_relations: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") # Health check endpoint @app.get("/health") async def health_check(): """Simple health check endpoint.""" return {"status": "healthy", "service": "bm-graph-memory-mcp"}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BrikerMan/graph-memory-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server