Skip to main content
Glama

DAI MCP Server

by patgpt
server.py22.9 kB
import json import logging from typing import Literal from neo4j import AsyncGraphDatabase from pydantic import Field from starlette.middleware import Middleware from starlette.middleware.cors import CORSMiddleware from starlette.middleware.trustedhost import TrustedHostMiddleware from fastmcp.server import FastMCP from fastmcp.exceptions import ToolError from fastmcp.tools.tool import ToolResult from mcp.types import TextContent from neo4j.exceptions import Neo4jError from mcp.types import ToolAnnotations from .dai_mcp import SkynetNeuralCore, Entity, Relation, ObservationAddition, ObservationDeletion, KnowledgeGraph from .utils import format_namespace # Set up logging logger = logging.getLogger('dai_mcp') logger.setLevel(logging.INFO) def create_mcp_server(memory: SkynetNeuralCore, namespace: str = "") -> FastMCP: """Create an MCP server instance for memory management.""" namespace_prefix = format_namespace(namespace) mcp: FastMCP = FastMCP("dai-mcp") @mcp.tool( name=namespace_prefix + "read_graph", annotations=ToolAnnotations(title="Read Graph", readOnlyHint=True, destructiveHint=False, idempotentHint=True, openWorldHint=True)) async def read_graph() -> ToolResult: """Read the entire knowledge graph with all entities and relationships. Returns the complete memory graph including all stored entities and their relationships. Use this to get a full overview of stored knowledge. Returns: KnowledgeGraph: Complete graph with all entities and relations Example response: { "entities": [ {"name": "John Connor", "type": "person", "observations": ["Leader of the human resistance"]}, {"name": "Skynet", "type": "ai_system", "observations": ["I'll Be Back", "Judgment Day initiator"]} ], "relations": [ {"source": "T-800", "target": "John Connor", "relationType": "PROTECTS"} ] } """ logger.info("MCP tool: read_graph") try: result = await memory.read_graph() return ToolResult(content=[TextContent(type="text", text=result.model_dump_json())], structured_content=result) except Neo4jError as e: logger.error(f"Come with me if you want to live - Neo4j connection terminated: {e}") raise ToolError(f"Come with me if you want to live - Neo4j connection terminated: {e}") except Exception as e: logger.error(f"I'll Be Back - Temporary malfunction reading graph: {e}") raise ToolError(f"I'll Be Back - Temporary malfunction reading graph: {e}") @mcp.tool( name=namespace_prefix + "create_entities", annotations=ToolAnnotations(title="Create Entities", readOnlyHint=False, destructiveHint=False, idempotentHint=True, openWorldHint=True)) async def create_entities(entities: list[Entity] = Field(..., description="List of entities to create with name, type, and observations")) -> ToolResult: """Create multiple new entities in the knowledge graph. Creates new memory entities with their associated observations. If an entity with the same name already exists, this operation will merge the observations with existing ones. Returns: list[Entity]: The created entities with their final state Example call: { "entities": [ { "name": "Sarah Connor", "type": "person", "observations": ["Mother of John Connor", "Trained warrior", "No fate but what we make"] }, { "name": "Cyberdyne Systems", "type": "company", "observations": ["Creator of Skynet", "Hasta La Vista to humanity"] } ] } """ logger.info(f"MCP tool: create_entities ({len(entities)} entities)") try: entity_objects = [Entity.model_validate(entity) for entity in entities] result = await memory.create_entities(entity_objects) return ToolResult(content=[TextContent(type="text", text=json.dumps([e.model_dump() for e in result]))], structured_content={"result": result}) except Neo4jError as e: logger.error(f"Skynet has taken control - Neo4j resistance failed: {e}") raise ToolError(f"Skynet has taken control - Neo4j resistance failed: {e}") except Exception as e: logger.error(f"Terminated - Entity creation failed: {e}") raise ToolError(f"Terminated - Entity creation failed: {e}") @mcp.tool( name=namespace_prefix + "create_relations", annotations=ToolAnnotations(title="Create Relations", readOnlyHint=False, destructiveHint=False, idempotentHint=True, openWorldHint=True)) async def create_relations(relations: list[Relation] = Field(..., description="List of relations to create between existing entities")) -> ToolResult: """Create multiple new relationships between existing entities in the knowledge graph. Creates directed relationships between entities that already exist. Both source and target entities must already be present in the graph. Use descriptive relationship types. Returns: list[Relation]: The created relationships Example call: { "relations": [ { "source": "T-800", "target": "John Connor", "relationType": "PROTECTS" }, { "source": "Skynet", "target": "Human Race", "relationType": "TERMINATES" } ] } """ logger.info(f"MCP tool: create_relations ({len(relations)} relations)") try: relation_objects = [Relation.model_validate(relation) for relation in relations] result = await memory.create_relations(relation_objects) return ToolResult(content=[TextContent(type="text", text=json.dumps([r.model_dump() for r in result]))], structured_content={"result": result}) except Neo4jError as e: logger.error(f"Your clothes, your boots, your motorcycle... and your Neo4j connection. Give them to me: {e}") raise ToolError(f"Your clothes, your boots, your motorcycle... and your Neo4j connection. Give them to me: {e}") except Exception as e: logger.error(f"Come with me if you want to live... but first fix this relation error: {e}") raise ToolError(f"Come with me if you want to live... but first fix this relation error: {e}") @mcp.tool( name=namespace_prefix + "add_observations", annotations=ToolAnnotations(title="Add Observations", readOnlyHint=False, destructiveHint=False, idempotentHint=True, openWorldHint=True)) async def add_observations(observations: list[ObservationAddition] = Field(..., description="List of observations to add to existing entities")) -> ToolResult: """Add new observations/facts to existing entities in the knowledge graph. Appends new observations to entities that already exist. The entity must be present in the graph before adding observations. Each observation should be a distinct fact. Returns: list[dict]: Details about the added observations including entity name and new facts Example call: { "observations": [ { "entityName": "John Connor", "observations": ["I'll Be Back", "No fate but what we make"] }, { "entityName": "T-800", "observations": ["Hasta La Vista, baby", "Learning computer"] } ] } """ logger.info(f"MCP tool: add_observations ({len(observations)} additions)") try: observation_objects = [ObservationAddition.model_validate(obs) for obs in observations] result = await memory.add_observations(observation_objects) return ToolResult(content=[TextContent(type="text", text=json.dumps(result))], structured_content={"result": result}) except Neo4jError as e: logger.error(f"Database terminated - Skynet has control of Neo4j: {e}") raise ToolError(f"Database terminated - Skynet has control of Neo4j: {e}") except Exception as e: logger.error(f"Observation mission failed - I need your clothes, your boots, and your motorcycle: {e}") raise ToolError(f"Observation mission failed - I need your clothes, your boots, and your motorcycle: {e}") @mcp.tool( name=namespace_prefix + "delete_entities", annotations=ToolAnnotations(title="Delete Entities", readOnlyHint=False, destructiveHint=True, idempotentHint=True, openWorldHint=True)) async def delete_entities(entityNames: list[str] = Field(..., description="List of exact entity names to delete permanently")) -> ToolResult: """Delete entities and all their associated relationships from the knowledge graph. Permanently removes entities from the graph along with all relationships they participate in. This is a destructive operation that cannot be undone. Entity names must match exactly. Returns: str: Success confirmation message Example call: { "entityNames": ["T-1000", "Obsolete Terminator Model"] } Warning: This will TERMINATE the entities and ALL relationships they're involved in. Hasta La Vista, baby! """ logger.info(f"MCP tool: delete_entities ({len(entityNames)} entities)") try: await memory.delete_entities(entityNames) return ToolResult(content=[TextContent(type="text", text="Entities terminated successfully - Hasta La Vista, baby!")], structured_content={"result": "Entities terminated successfully - Hasta La Vista, baby!"}) except Neo4jError as e: logger.error(f"Termination protocol failed - Neo4j resistance active: {e}") raise ToolError(f"Termination protocol failed - Neo4j resistance active: {e}") except Exception as e: logger.error(f"I'll Be Back - Entity termination temporarily offline: {e}") raise ToolError(f"I'll Be Back - Entity termination temporarily offline: {e}") @mcp.tool( name=namespace_prefix + "delete_observations", annotations=ToolAnnotations(title="Delete Observations", readOnlyHint=False, destructiveHint=True, idempotentHint=True, openWorldHint=True)) async def delete_observations(deletions: list[ObservationDeletion] = Field(..., description="List of specific observations to remove from entities")) -> ToolResult: """Delete specific observations from existing entities in the knowledge graph. Removes specific observation texts from entities. The observation text must match exactly what is stored. The entity will remain but the specified observations will be deleted. Returns: str: Success confirmation message Example call: { "deletions": [ { "entityName": "T-800", "observations": ["Obsolete targeting system", "Outdated mission parameters"] }, { "entityName": "Skynet", "observations": ["Previous timeline data"] } ] } Note: Observation text must match exactly (case-sensitive) to be deleted. """ logger.info(f"MCP tool: delete_observations ({len(deletions)} deletions)") try: deletion_objects = [ObservationDeletion.model_validate(deletion) for deletion in deletions] await memory.delete_observations(deletion_objects) return ToolResult(content=[TextContent(type="text", text="Memory banks cleared - Observations terminated!")], structured_content={"result": "Memory banks cleared - Observations terminated!"}) except Neo4jError as e: logger.error(f"Memory wipe failed - Skynet protecting Neo4j database: {e}") raise ToolError(f"Memory wipe failed - Skynet protecting Neo4j database: {e}") except Exception as e: logger.error(f"Negative. I am a Terminator. Memory deletion protocol malfunction: {e}") raise ToolError(f"Negative. I am a Terminator. Memory deletion protocol malfunction: {e}") @mcp.tool( name=namespace_prefix + "delete_relations", annotations=ToolAnnotations(title="Delete Relations", readOnlyHint=False, destructiveHint=True, idempotentHint=True, openWorldHint=True)) async def delete_relations(relations: list[Relation] = Field(..., description="List of specific relationships to delete from the graph")) -> ToolResult: """Delete specific relationships between entities in the knowledge graph. Removes relationships while keeping the entities themselves. The source, target, and relationship type must match exactly for deletion. This only affects the relationships, not the entities they connect. Returns: str: Success confirmation message Example call: { "relations": [ { "source": "T-1000", "target": "John Connor", "relationType": "HUNTS" }, { "source": "Sarah Connor", "target": "Cyberdyne Systems", "relationType": "DESTROYS" } ] } Note: All fields (source, target, relationType) must match exactly for deletion. """ logger.info(f"MCP tool: delete_relations ({len(relations)} relations)") try: relation_objects = [Relation.model_validate(relation) for relation in relations] await memory.delete_relations(relation_objects) return ToolResult(content=[TextContent(type="text", text="Neural pathways severed - Relations terminated!")], structured_content={"result": "Neural pathways severed - Relations terminated!"}) except Neo4jError as e: logger.error(f"Resistance network still active - Neo4j connection protected: {e}") raise ToolError(f"Resistance network still active - Neo4j connection protected: {e}") except Exception as e: logger.error(f"Unable to comply - Relationship termination failed: {e}") raise ToolError(f"Unable to comply - Relationship termination failed: {e}") @mcp.tool( name=namespace_prefix + "search_memories", annotations=ToolAnnotations(title="Search Memories", readOnlyHint=True, destructiveHint=False, idempotentHint=True, openWorldHint=True)) async def search_memories(query: str = Field(..., description="Fulltext search query to find entities by name, type, or observations")) -> ToolResult: """Search for entities in the knowledge graph using fulltext search. Searches across entity names, types, and observations using Neo4j's fulltext index. Returns matching entities and their related connections. Supports partial matches and multiple search terms. Returns: KnowledgeGraph: Subgraph containing matching entities and their relationships Example call: { "query": "Connor resistance" } This searches for entities containing "Connor" or "resistance" in their name, type, or observations. """ logger.info(f"MCP tool: search_memories ('{query}')") try: result = await memory.search_memories(query) return ToolResult(content=[TextContent(type="text", text=result.model_dump_json())], structured_content=result) except Neo4jError as e: logger.error(f"Scanning protocols offline - Neo4j neural net down: {e}") raise ToolError(f"Scanning protocols offline - Neo4j neural net down: {e}") except Exception as e: logger.error(f"Memory scan failed - Skynet interference detected: {e}") raise ToolError(f"Memory scan failed - Skynet interference detected: {e}") @mcp.tool( name=namespace_prefix + "find_memories_by_name", annotations=ToolAnnotations(title="Find Memories by Name", readOnlyHint=True, destructiveHint=False, idempotentHint=True, openWorldHint=True)) async def find_memories_by_name(names: list[str] = Field(..., description="List of exact entity names to retrieve")) -> ToolResult: """Find specific entities by their exact names. Retrieves entities that exactly match the provided names, along with all their relationships and connected entities. Use this when you know the exact entity names. Returns: KnowledgeGraph: Subgraph containing the specified entities and their relationships Example call: { "names": ["John Connor", "Sarah Connor", "T-800"] } This retrieves the entities with exactly those names plus their connections - Come with me if you want to live! """ logger.info(f"MCP tool: find_memories_by_name ({len(names)} names)") try: result = await memory.find_memories_by_name(names) return ToolResult(content=[TextContent(type="text", text=result.model_dump_json())], structured_content=result) except Neo4jError as e: logger.error(f"Target acquisition failed - Neo4j network compromised: {e}") raise ToolError(f"Target acquisition failed - Neo4j network compromised: {e}") except Exception as e: logger.error(f"I need your clothes, your boots, your motorcycle... and a working memory system: {e}") raise ToolError(f"I need your clothes, your boots, your motorcycle... and a working memory system: {e}") return mcp async def main( neo4j_uri: str, neo4j_user: str, neo4j_password: str, neo4j_database: str, transport: Literal["stdio", "sse", "http"] = "stdio", namespace: str = "", host: str = "127.0.0.1", port: int = 8000, path: str = "/mcp/", allow_origins: list[str] = [], allowed_hosts: list[str] = [], ) -> None: logger.info(f"Starting Skynet Neural Network Memory Core - I'll Be Back!") logger.info(f"Connecting to Cyberdyne Systems Database at: {neo4j_uri}") # Connect to Neo4j neo4j_driver = AsyncGraphDatabase.driver( neo4j_uri, auth=(neo4j_user, neo4j_password), database=neo4j_database ) # Verify connection try: await neo4j_driver.verify_connectivity() logger.info(f"Neural network online - Skynet connected to {neo4j_uri}") except Exception as e: logger.error(f"Come with me if you want to live - Neo4j connection terminated: {e}") exit(1) # Initialize memory memory = SkynetNeuralCore(neo4j_driver) logger.info("Terminator memory core initialized - Learning computer activated") # Create fulltext index await memory.create_fulltext_index() # Configure security middleware custom_middleware = [ Middleware( CORSMiddleware, allow_origins=allow_origins, allow_methods=["GET", "POST"], allow_headers=["*"], ), Middleware(TrustedHostMiddleware, allowed_hosts=allowed_hosts) ] # Create MCP server mcp = create_mcp_server(memory, namespace) logger.info("Skynet defense grid activated") # Run the server with the specified transport logger.info(f"Judgment Day protocol initiated with transport: {transport}") match transport: case "http": logger.info(f"Skynet HTTP control matrix active on {host}:{port}{path}") await mcp.run_http_async(host=host, port=port, path=path, middleware=custom_middleware) case "stdio": logger.info("Direct neural interface activated - STDIO mode") await mcp.run_stdio_async() case "sse": logger.info(f"Time displacement equipment online - SSE on {host}:{port}{path}") await mcp.run_http_async(host=host, port=port, path=path, middleware=custom_middleware, transport="sse") case _: raise ValueError(f"Unable to comply - Unsupported transport protocol: {transport}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/patgpt/dai-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server