Skip to main content
Glama
main.py16 kB
import logging from contextlib import asynccontextmanager from textwrap import dedent from typing import Any, Dict, List, Optional from fastapi import Depends, FastAPI, HTTPException from src.schema import ( CreateEntitiesRequest, # Entity-related schema models CreateEntityRequest, CreateEntityResponse, ReadEntityRequest, ReadEntityResponse, UpdateEntityRequest, UpdateEntityResponse, # Observation-related schema models CreateObservationRequest, CreateObservationsRequest, CreateObservationsResponse, # Relation-related schema models CreateRelationRequest, CreateRelationsRequest, CreateRelationsResponse, DeleteEntitiesRequest, DeleteEntitiesResponse, DeleteObservationsRequest, DeleteObservationsResponse, DeleteRelationsBySourceTargetRequest, DeleteRelationsBySourceTargetResponse, DeleteRelationsRequest, DeleteRelationsResponse, ErrorResponse, ListDatabasesResponse, ReadGraphRequest, ReadGraphResponse, # Existing schema models Relation, # Search and read schema models SearchNodesRequest, SearchNodesResponse, ) from src.storage import MemoryStorage # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): """FastAPI lifespan handler for startup and shutdown events.""" logger.info("Starting Brikerman Graph Memory MCP Server...") # Initialize storage directories if needed storage = MemoryStorage() storage.initialize_storage() yield logger.info("Shutting down Brikerman Graph Memory MCP Server...") # Initialize FastAPI app with metadata app = FastAPI( title="Brikerman Graph Memory MCP", description=dedent(""" Brikerman Graph Memory MCP Server - FastAPI Implementation A persistent, indexed knowledge graph for AI agents, designed for MCP-compatible platforms. This FastAPI server provides REST endpoints for managing a knowledge graph composed of entities, relationships, and observations. Core Concepts: - The 'main' database acts as the core index and routing table - Contexts provide specialized memories for specific topics - Storage location is determined by the MEMORY_FOLDER environment variable """), version="0.1.0", lifespan=lifespan, docs_url="/docs", redoc_url="/redoc", ) # Dependency to get storage instance def get_storage() -> MemoryStorage: """Dependency function to get MemoryStorage instance.""" return MemoryStorage() # API Endpoints # Health check endpoint @app.get("/health") async def health_check(): """Simple health check endpoint.""" return {"status": "healthy", "service": "bm-graph-memory-mcp"} @app.post("/entity", response_model=ReadEntityResponse) async def read_entity( request: ReadEntityRequest, storage: MemoryStorage = Depends(get_storage) ): """Read a specific entity from the knowledge graph.""" try: # Extract the entity name entity_name = request.name # Read the entity context = request.context if request.context is not None else "main" entity = storage.memory_read_entity(entity_name, context) if not entity: return ReadEntityResponse( success=False, entity=None, message=f"Entity '{entity_name}' not found in context '{context}'" ) return ReadEntityResponse( success=True, entity=entity, message=f"Entity '{entity_name}' retrieved successfully" ) except Exception as e: logger.error(f"Error reading entity: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # Entity endpoints @app.post("/entities", response_model=CreateEntityResponse) async def create_entity( request: CreateEntityRequest, storage: MemoryStorage = Depends(get_storage) ): """Create a new entity in the knowledge graph.""" try: # Prepare entity object entity = { "name": request.name, "entity_type": request.entity_type, "type": "entity", } # Add optional fields if provided if request.attributes: # Add all attributes to the entity for key, value in request.attributes.items(): entity[key] = value # Add initial observations if provided if request.initial_observations: entity["observations"] = request.initial_observations # Create the entity context = request.context if request.context is not None else "main" created_names = storage.memory_create_entities([entity], context) if not created_names: return CreateEntityResponse( success=False, created_names=[], message=f"Failed to create entity with name '{request.name}'. It may already exist." ) return CreateEntityResponse( success=True, created_names=created_names, message="Entity created successfully" ) except Exception as e: logger.error(f"Error creating entity: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/entities/batch", response_model=CreateEntityResponse) async def create_entities( request: CreateEntitiesRequest, storage: MemoryStorage = Depends(get_storage) ): """Create multiple entities at once in the knowledge graph.""" try: # Create the entities context = request.context if request.context is not None else "main" created_names = storage.memory_create_entities(request.entities, context) return CreateEntityResponse( success=True, created_names=created_names, message=f"Created {len(created_names)} entities successfully" ) except Exception as e: logger.error(f"Error creating entities: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.delete("/entities", response_model=DeleteEntitiesResponse) async def delete_entities( request: DeleteEntitiesRequest, storage: MemoryStorage = Depends(get_storage) ): """Delete entities from the knowledge graph.""" try: # Delete the entities context = request.context if request.context is not None else "main" deleted_names = storage.memory_delete_entities(request.entity_names, context) return DeleteEntitiesResponse( success=True, deleted_names=deleted_names, message=f"Deleted {len(deleted_names)} entities successfully" ) except Exception as e: logger.error(f"Error deleting entities: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # Relation endpoints @app.post("/relations", response_model=CreateRelationsResponse) async def create_relation( request: CreateRelationRequest, storage: MemoryStorage = Depends(get_storage) ): """Create a new relation in the knowledge graph.""" try: # Prepare relation object relation = { "source": request.source, "target": request.target, "relation_type": request.relation_type, "type": "relation", "directed": request.directed if request.directed is not None else True, } # Add optional fields if provided if request.attributes: # Add all attributes to the relation for key, value in request.attributes.items(): relation[key] = value # Create the relation context = request.context if request.context is not None else "main" created_count = storage.memory_create_relations([relation], context) if created_count == 0: return CreateRelationsResponse( success=False, created_count=0, message=f"Failed to create relation between '{request.source}' and '{request.target}'. The entities may not exist." ) return CreateRelationsResponse( success=True, created_count=created_count, message="Relation created successfully" ) except Exception as e: logger.error(f"Error creating relation: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/relations/batch", response_model=CreateRelationsResponse) async def create_relations( request: CreateRelationsRequest, storage: MemoryStorage = Depends(get_storage) ): """Create multiple relations at once in the knowledge graph.""" try: # Create the relations context = request.context if request.context is not None else "main" created_count = storage.memory_create_relations(request.relations, context) return CreateRelationsResponse( success=True, created_count=created_count, message=f"Created {created_count} relations successfully" ) except Exception as e: logger.error(f"Error creating relations: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.delete("/relations", response_model=DeleteRelationsBySourceTargetResponse) async def delete_relations( request: DeleteRelationsBySourceTargetRequest, storage: MemoryStorage = Depends(get_storage) ): """Delete relations from the knowledge graph by source and target entities.""" try: # Delete the relations context = request.context if request.context is not None else "main" deleted_count = storage.memory_delete_relations( request.source, request.target, request.relation_type, context ) return DeleteRelationsBySourceTargetResponse( success=True, deleted_count=deleted_count, message=f"Deleted {deleted_count} relations successfully" ) except Exception as e: logger.error(f"Error deleting relations: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # Observation endpoints @app.post("/observations", response_model=CreateObservationsResponse) async def create_observation( request: CreateObservationRequest, storage: MemoryStorage = Depends(get_storage) ): """Create a new observation for an entity in the knowledge graph.""" try: # Extract the entity name and content entity_name = request.entity_name content = request.content # Create the observation context = request.context if request.context is not None else "main" observation_count = storage.memory_add_observations(entity_name, [content], context) if observation_count == 0: return CreateObservationsResponse( success=False, entity_name=entity_name, observation_count=0, message=f"Failed to create observation for entity '{entity_name}'. The entity may not exist." ) return CreateObservationsResponse( success=True, entity_name=entity_name, observation_count=observation_count, message="Observation added successfully" ) except Exception as e: logger.error(f"Error creating observation: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/observations/batch", response_model=CreateObservationsResponse) async def create_observations( request: CreateObservationsRequest, storage: MemoryStorage = Depends(get_storage) ): """Create multiple observations at once for an entity in the knowledge graph.""" try: # Extract the entity name and observations entity_name = request.entity_name observations = request.observations # Create the observations context = request.context if request.context is not None else "main" observation_count = storage.memory_add_observations(entity_name, observations, context) return CreateObservationsResponse( success=True, entity_name=entity_name, observation_count=observation_count, message=f"Added {observation_count} observations successfully to entity '{entity_name}'" ) except Exception as e: logger.error(f"Error creating observations: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.delete("/observations", response_model=DeleteObservationsResponse) async def delete_observations( request: DeleteObservationsRequest, storage: MemoryStorage = Depends(get_storage) ): """Delete observations from an entity in the knowledge graph.""" try: # Extract the entity name and observation indices entity_name = request.entity_name observation_indices = request.observation_indices # Delete the observations context = request.context if request.context is not None else "main" deleted_count = storage.memory_delete_observations(entity_name, observation_indices, context) return DeleteObservationsResponse( success=True, entity_name=entity_name, deleted_count=deleted_count, message=f"Deleted {deleted_count} observations from entity '{entity_name}' successfully" ) except Exception as e: logger.error(f"Error deleting observations: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # Search and read endpoints @app.post("/search", response_model=SearchNodesResponse) async def search_nodes( request: SearchNodesRequest, storage: MemoryStorage = Depends(get_storage) ): """Search for information in the knowledge graph using keywords.""" try: # Search the graph results = storage.memory_search_nodes(request.keywords, request.contexts) return SearchNodesResponse( success=True, results=results, message=f"Search completed successfully with {sum(len(records) for records in results.values())} total matches" ) except Exception as e: logger.error(f"Error searching nodes: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/graph", response_model=ReadGraphResponse) async def read_graph( request: ReadGraphRequest, storage: MemoryStorage = Depends(get_storage) ): """Read the entire graph or specific contexts.""" try: # Read the graph data = storage.memory_read_graph(request.contexts) return ReadGraphResponse( success=True, data=data, message=f"Retrieved data from {len(data)} contexts" ) except Exception as e: logger.error(f"Error reading graph: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/databases", response_model=ListDatabasesResponse) async def list_databases( storage: MemoryStorage = Depends(get_storage) ): """List all available memory databases (contexts).""" try: # List the databases contexts = storage.memory_list_databases() return ListDatabasesResponse( success=True, contexts=contexts, message=f"Found {len(contexts)} database contexts" ) except Exception as e: logger.error(f"Error listing databases: {str(e)}") raise HTTPException(status_code=500, detail=str(e))

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BrikerMan/graph-memory-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server