#!/usr/bin/env python3
"""
Graphiti MCP Server
A Model Context Protocol (MCP) server that integrates with Zep's Graphiti
for persistent memory and context continuity across AI agents.
"""
import asyncio
import json
import os
import sys
from typing import Any, Dict, List, Optional
from datetime import datetime
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # python-dotenv is optional
try:
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
except ImportError as e:
print(f"Error: Failed to import mcp package: {e}")
print("\nTroubleshooting:")
print("1. Verify mcp is installed: pip show mcp")
print("2. Try reinstalling: pip install --upgrade mcp")
print("3. Check Python path: python -c 'import sys; print(sys.path)'")
print("4. If using virtual environment, make sure it's activated")
sys.exit(1)
try:
from neo4j import GraphDatabase
from openai import OpenAI
except ImportError:
print("Error: Required packages not found. Install with: uv sync or pip install neo4j openai")
sys.exit(1)
class GraphitiMCP:
"""Graphiti MCP Server implementation."""
def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str,
openrouter_api_key: str, model_name: str = "openai/gpt-4o-mini", use_openrouter: bool = True):
"""Initialize Graphiti MCP server.
Args:
neo4j_uri: Neo4j database URI
neo4j_user: Neo4j username
neo4j_password: Neo4j password
openrouter_api_key: OpenRouter API key
model_name: Model name to use (OpenRouter format, e.g., "openai/gpt-4o-mini")
use_openrouter: Whether to use OpenRouter (True) or OpenAI directly (False)
"""
self.neo4j_uri = neo4j_uri
self.neo4j_user = neo4j_user
self.neo4j_password = neo4j_password
self.model_name = model_name
self.use_openrouter = use_openrouter
# Initialize Neo4j driver
try:
# For free tier, try both neo4j+s:// and bolt+s:// protocols
print(f"Connecting to Neo4j at {neo4j_uri}...")
self.driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
# Verify connection with timeout
self.driver.verify_connectivity()
print("[SUCCESS] Connected to Neo4j successfully!")
except Exception as e:
print(f"[ERROR] Error connecting to Neo4j: {e}")
print("\nTroubleshooting tips:")
print("1. Free tier doesn't support IP filtering - connections should work from any IP")
print("2. Verify your database is running (not paused)")
print("3. Check URI format - try bolt+s:// if neo4j+s:// doesn't work")
print("4. Get fresh connection string from Aura console → Connect section")
print("5. Check firewall settings (port 7687)")
print("6. Run: .\fix-neo4j-connection.ps1 for detailed help")
# Suggest trying bolt+s:// if using neo4j+s://
if "neo4j+s://" in neo4j_uri:
print("\n💡 Tip: Try using bolt+s:// instead of neo4j+s://")
print(f" Change: {neo4j_uri}")
print(f" To: {neo4j_uri.replace('neo4j+s://', 'bolt+s://')}")
raise
# Initialize OpenAI-compatible client (OpenRouter uses OpenAI-compatible API)
if use_openrouter:
self.openai_client = OpenAI(
api_key=openrouter_api_key,
base_url="https://openrouter.ai/api/v1"
)
print(f"[SUCCESS] Initialized OpenRouter client with model: {model_name}")
else:
self.openai_client = OpenAI(api_key=openrouter_api_key)
print(f"[SUCCESS] Initialized OpenAI client with model: {model_name}")
# Initialize MCP server
self.server = Server("graphiti-mcp")
self._register_handlers()
def _register_handlers(self):
"""Register MCP tool handlers."""
@self.server.list_tools()
async def list_tools() -> List[Tool]:
"""List available tools."""
return [
Tool(
name="store_memory",
description="Store a memory or context in the graph database for future retrieval",
inputSchema={
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "The content or memory to store"
},
"metadata": {
"type": "object",
"description": "Optional metadata associated with this memory",
"additionalProperties": True
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Optional tags for categorizing the memory"
}
},
"required": ["content"]
}
),
Tool(
name="retrieve_memories",
description="Retrieve relevant memories from the graph database based on a query",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query to find relevant memories"
},
"limit": {
"type": "integer",
"description": "Maximum number of memories to retrieve (default: 10)",
"default": 10
}
},
"required": ["query"]
}
),
Tool(
name="create_relationship",
description="Create a relationship between two memories or entities in the graph",
inputSchema={
"type": "object",
"properties": {
"source_id": {
"type": "string",
"description": "ID of the source memory/entity"
},
"target_id": {
"type": "string",
"description": "ID of the target memory/entity"
},
"relationship_type": {
"type": "string",
"description": "Type of relationship (e.g., 'relates_to', 'follows', 'references')"
},
"properties": {
"type": "object",
"description": "Optional properties for the relationship",
"additionalProperties": True
}
},
"required": ["source_id", "target_id", "relationship_type"]
}
),
Tool(
name="get_context",
description="Get contextual information for a given query by retrieving and synthesizing relevant memories",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The query to get context for"
},
"max_memories": {
"type": "integer",
"description": "Maximum number of memories to consider (default: 20)",
"default": 20
}
},
"required": ["query"]
}
),
Tool(
name="search_graph",
description="Search the graph database using Cypher query",
inputSchema={
"type": "object",
"properties": {
"cypher_query": {
"type": "string",
"description": "Cypher query to execute"
},
"parameters": {
"type": "object",
"description": "Optional parameters for the Cypher query",
"additionalProperties": True
}
},
"required": ["cypher_query"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle tool calls."""
try:
if name == "store_memory":
result = await self._store_memory(arguments)
elif name == "retrieve_memories":
result = await self._retrieve_memories(arguments)
elif name == "create_relationship":
result = await self._create_relationship(arguments)
elif name == "get_context":
result = await self._get_context(arguments)
elif name == "search_graph":
result = await self._search_graph(arguments)
else:
result = {"error": f"Unknown tool: {name}"}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
except Exception as e:
error_msg = {"error": str(e), "tool": name}
return [TextContent(type="text", text=json.dumps(error_msg, indent=2))]
async def _store_memory(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Store a memory in Neo4j."""
content = args.get("content")
metadata = args.get("metadata", {})
tags = args.get("tags", [])
# Convert metadata to JSON string (Neo4j doesn't support nested maps as properties)
metadata_json = json.dumps(metadata) if metadata else "{}"
# Generate embedding using OpenAI-compatible API (OpenRouter)
try:
# OpenRouter supports OpenAI embeddings
# For OpenRouter, use "openai/text-embedding-3-small" or "text-embedding-ada-002"
# For direct OpenAI, use "text-embedding-3-small"
if self.use_openrouter:
embedding_model = "openai/text-embedding-3-small"
else:
embedding_model = "text-embedding-3-small"
embedding_response = self.openai_client.embeddings.create(
model=embedding_model,
input=content
)
embedding = embedding_response.data[0].embedding
except Exception as e:
return {"error": f"Failed to generate embedding: {e}"}
# Create memory node in Neo4j
with self.driver.session() as session:
query = """
CREATE (m:Memory {
id: randomUUID(),
content: $content,
embedding: $embedding,
metadata: $metadata,
tags: $tags,
created_at: datetime()
})
RETURN m.id as id, m.content as content, m.created_at as created_at
"""
result = session.run(
query,
content=content,
embedding=embedding,
metadata=metadata_json,
tags=tags
)
record = result.single()
if record:
return {
"success": True,
"id": record["id"],
"content": record["content"],
"created_at": str(record["created_at"])
}
else:
return {"error": "Failed to create memory"}
async def _retrieve_memories(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Retrieve relevant memories using vector similarity."""
query = args.get("query")
limit = args.get("limit", 10)
# Generate embedding for query
try:
if self.use_openrouter:
embedding_model = "openai/text-embedding-3-small"
else:
embedding_model = "text-embedding-3-small"
embedding_response = self.openai_client.embeddings.create(
model=embedding_model,
input=query
)
query_embedding = embedding_response.data[0].embedding
except Exception as e:
return {"error": f"Failed to generate embedding: {e}"}
# Search for similar memories using cosine similarity
with self.driver.session() as session:
# Note: This is a simplified version. For production, use Neo4j's vector index
cypher_query = """
MATCH (m:Memory)
WITH m,
gds.similarity.cosine(m.embedding, $query_embedding) as similarity
WHERE similarity > 0.5
RETURN m.id as id, m.content as content, m.metadata as metadata,
m.tags as tags, m.created_at as created_at, similarity
ORDER BY similarity DESC
LIMIT $limit
"""
# Fallback to simpler query if vector similarity functions aren't available
try:
result = session.run(cypher_query, query_embedding=query_embedding, limit=limit)
except Exception:
# Simple text search fallback
cypher_query = """
MATCH (m:Memory)
WHERE m.content CONTAINS $query OR ANY(tag in m.tags WHERE tag CONTAINS $query)
RETURN m.id as id, m.content as content, m.metadata as metadata,
m.tags as tags, m.created_at as created_at
ORDER BY m.created_at DESC
LIMIT $limit
"""
result = session.run(cypher_query, query=query, limit=limit)
memories = []
for record in result:
# Parse metadata from JSON string back to dict
metadata_str = record.get("metadata", "{}")
try:
metadata_dict = json.loads(metadata_str) if isinstance(metadata_str, str) else (metadata_str or {})
except (json.JSONDecodeError, TypeError):
metadata_dict = {}
memories.append({
"id": record.get("id"),
"content": record.get("content"),
"metadata": metadata_dict,
"tags": record.get("tags", []),
"created_at": str(record.get("created_at", "")),
"similarity": record.get("similarity")
})
return {
"query": query,
"count": len(memories),
"memories": memories
}
async def _create_relationship(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Create a relationship between two memories."""
source_id = args.get("source_id")
target_id = args.get("target_id")
relationship_type = args.get("relationship_type")
properties = args.get("properties", {})
with self.driver.session() as session:
query = f"""
MATCH (source:Memory {{id: $source_id}})
MATCH (target:Memory {{id: $target_id}})
CREATE (source)-[r:{relationship_type} $properties]->(target)
RETURN r, source.id as source_id, target.id as target_id
"""
result = session.run(query, source_id=source_id, target_id=target_id, properties=properties)
record = result.single()
if record:
return {
"success": True,
"source_id": record["source_id"],
"target_id": record["target_id"],
"relationship_type": relationship_type
}
else:
return {"error": "Failed to create relationship. Check if both nodes exist."}
async def _get_context(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Get contextual information by retrieving and synthesizing memories."""
query = args.get("query")
max_memories = args.get("max_memories", 20)
# Retrieve relevant memories
memories_result = await self._retrieve_memories({"query": query, "limit": max_memories})
if "error" in memories_result:
return memories_result
memories = memories_result.get("memories", [])
if not memories:
return {
"query": query,
"context": "No relevant memories found.",
"memories_used": 0
}
# Synthesize context using OpenAI-compatible API (OpenRouter or OpenAI)
memory_texts = "\n\n".join([
f"Memory {i+1}: {mem['content']}"
for i, mem in enumerate(memories)
])
prompt = f"""Based on the following memories, provide contextual information for the query: "{query}"
Memories:
{memory_texts}
Provide a concise summary that synthesizes the relevant information from these memories to answer the query."""
try:
# OpenRouter requires HTTP headers for some features, but basic API calls work the same
response = self.openai_client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": "You are a helpful assistant that synthesizes information from memories to provide contextual answers."},
{"role": "user", "content": prompt}
],
temperature=0.7
)
synthesized_context = response.choices[0].message.content
return {
"query": query,
"context": synthesized_context,
"memories_used": len(memories),
"source_memories": [{"id": m["id"], "content": m["content"][:100]} for m in memories]
}
except Exception as e:
return {
"error": f"Failed to synthesize context: {e}",
"memories": memories
}
async def _search_graph(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a Cypher query on the graph."""
cypher_query = args.get("cypher_query")
parameters = args.get("parameters", {})
try:
with self.driver.session() as session:
result = session.run(cypher_query, **parameters)
records = []
for record in result:
records.append(dict(record))
return {
"success": True,
"count": len(records),
"results": records
}
except Exception as e:
return {"error": f"Cypher query failed: {e}"}
async def run_sse(self, host: str = "0.0.0.0", port: int = 8000):
"""Run the server with SSE transport."""
try:
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import uvicorn
except ImportError:
print("Error: fastapi, uvicorn, and sse-starlette are required for SSE transport")
print("Install with: pip install fastapi uvicorn sse-starlette")
sys.exit(1)
app = FastAPI()
base_url = f"http://{host}:{port}"
# Simple health check endpoint
@app.get("/")
async def root():
return {"status": "ok", "service": "graphiti-mcp-server"}
# SSE endpoint - simplified for now
@app.get("/sse")
async def sse_endpoint():
"""SSE endpoint for MCP communication."""
from starlette.responses import StreamingResponse
async def event_stream():
# Send initial connection message
yield f"data: {json.dumps({'type': 'connection', 'status': 'ready'})}\n\n"
# Keep connection alive
while True:
await asyncio.sleep(30)
yield f"data: {json.dumps({'type': 'ping'})}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
print(f"Starting SSE server on {base_url}/sse")
config = uvicorn.Config(app, host=host, port=port, log_level="info")
server = uvicorn.Server(config)
await server.serve()
async def run_stdio(self):
"""Run the server with stdio transport."""
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream=read_stream,
write_stream=write_stream
)
async def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(description="Graphiti MCP Server")
parser.add_argument("--transport", choices=["sse", "stdio"], default="stdio",
help="Transport protocol (sse or stdio)")
parser.add_argument("--model", default="openai/gpt-4o-mini",
help="Model name (OpenRouter format, e.g., 'openai/gpt-4o-mini')")
parser.add_argument("--host", default="0.0.0.0",
help="Host for SSE transport")
parser.add_argument("--port", type=int, default=8000,
help="Port for SSE transport")
args = parser.parse_args()
# Load environment variables
neo4j_uri = os.getenv("NEO4J_URI", "bolt://localhost:7687")
neo4j_user = os.getenv("NEO4J_USER", "neo4j")
neo4j_password = os.getenv("NEO4J_PASSWORD", "demodemo")
# Support both OpenRouter and OpenAI (OpenRouter takes precedence)
openrouter_api_key = os.getenv("OPENROUTER_API_KEY")
openai_api_key = os.getenv("OPENAI_API_KEY")
use_openrouter = bool(openrouter_api_key)
if not openrouter_api_key and not openai_api_key:
print("Error: Either OPENROUTER_API_KEY or OPENAI_API_KEY environment variable is required")
sys.exit(1)
api_key = openrouter_api_key if use_openrouter else openai_api_key
model_name = os.getenv("MODEL_NAME", args.model)
# Initialize and run server
server = GraphitiMCP(
neo4j_uri=neo4j_uri,
neo4j_user=neo4j_user,
neo4j_password=neo4j_password,
openrouter_api_key=api_key,
model_name=model_name,
use_openrouter=use_openrouter
)
print(f"Starting Graphiti MCP Server with {args.transport} transport...")
if args.transport == "sse":
await server.run_sse(host=args.host, port=args.port)
else:
await server.run_stdio()
if __name__ == "__main__":
asyncio.run(main())