Knowledge Graph Memory Server
by AgentWong
- optimized_memory_mcp_server
"""Main entry point for the memory MCP server."""
# Standard library imports
import asyncio
import json
import logging
import argparse
import os
import time
from typing import List, Dict, Any
from urllib.parse import urlparse
from asyncio import Semaphore
from functools import wraps
# Third-party imports
import mcp.types as types
from mcp.server import Server
from mcp.server.stdio import stdio_server
# Local imports
from .optimized_sqlite_manager import OptimizedSQLiteManager
from .exceptions import (
KnowledgeGraphError,
EntityNotFoundError,
EntityAlreadyExistsError,
RelationValidationError,
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("knowledge-graph-server")
def parse_database_config() -> Dict[str, Any]:
"""Parse database configuration from environment variables."""
return {
"database_url": os.environ.get("DATABASE_URL", "sqlite:///memory.db"),
"echo": os.environ.get("SQL_ECHO", "").lower() == "true"
}
def validate_database_url(url: str) -> None:
"""Validate the database URL format."""
parsed = urlparse(url)
if parsed.scheme not in ["sqlite"]:
raise ValueError(
"Invalid database URL scheme. Must be 'sqlite'"
)
async def async_main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--database-url",
type=str,
help="SQLite database URL (e.g., sqlite+aiosqlite:///path/to/db)"
)
args = parser.parse_args()
# Get database configuration
config = parse_database_config()
if args.database_url:
config["database_url"] = args.database_url
# Validate database URL
validate_database_url(config["database_url"])
# Initialize the optimized SQLite manager
manager = OptimizedSQLiteManager(
database_url=config["database_url"],
echo=config["echo"]
)
# Initialize database schema and indices
await manager.initialize()
app = Server("knowledge-graph-server")
@app.list_tools()
async def list_tools() -> List[types.Tool]:
return [
types.Tool(
name="create_entities",
description="Create multiple new entities in the knowledge graph",
inputSchema={
"type": "object",
"properties": {
"entities": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"entityType": {"type": "string"},
"observations": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["name", "entityType", "observations"],
"additionalProperties": False
}
}
},
"required": ["entities"],
"additionalProperties": False
}
),
types.Tool(
name="create_relations",
description="Create multiple new relations between entities",
inputSchema={
"type": "object",
"properties": {
"relations": {
"type": "array",
"items": {
"type": "object",
"properties": {
"from": {"type": "string"},
"to": {"type": "string"},
"relationType": {"type": "string"}
},
"required": ["from", "to", "relationType"],
"additionalProperties": False
}
}
},
"required": ["relations"],
"additionalProperties": False
}
),
types.Tool(
name="read_graph",
description="Read the entire knowledge graph",
inputSchema={
"type": "object",
"properties": {},
"additionalProperties": False
}
),
types.Tool(
name="search_nodes",
description="Search for nodes based on query",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string"}
},
"required": ["query"],
"additionalProperties": False
}
),
types.Tool(
name="add_observations",
description="Add new observations to existing entities",
inputSchema={
"type": "object",
"properties": {
"observations": {
"type": "array",
"items": {
"type": "object",
"properties": {
"entityName": {"type": "string"},
"contents": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["entityName", "contents"],
"additionalProperties": False
}
}
},
"required": ["observations"],
"additionalProperties": False
}
),
types.Tool(
name="delete_entities",
description="Remove entities and their relations",
inputSchema={
"type": "object",
"properties": {
"entityNames": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["entityNames"],
"additionalProperties": False
}
),
types.Tool(
name="delete_observations",
description="Remove specific observations from entities",
inputSchema={
"type": "object",
"properties": {
"deletions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"entityName": {"type": "string"},
"observations": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["entityName", "observations"],
"additionalProperties": False
}
}
},
"required": ["deletions"],
"additionalProperties": False
}
),
types.Tool(
name="delete_relations",
description="Remove specific relations from the graph",
inputSchema={
"type": "object",
"properties": {
"relations": {
"type": "array",
"items": {
"type": "object",
"properties": {
"from": {"type": "string"},
"to": {"type": "string"},
"relationType": {"type": "string"}
},
"required": ["from", "to", "relationType"],
"additionalProperties": False
}
}
},
"required": ["relations"],
"additionalProperties": False
}
),
types.Tool(
name="open_nodes",
description="Retrieve specific nodes by name",
inputSchema={
"type": "object",
"properties": {
"names": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["names"],
"additionalProperties": False
}
)
]
def rate_limit(max_requests: int = 100, window_seconds: int = 60):
"""Rate limiting decorator for API endpoints."""
semaphore = Semaphore(max_requests)
window_start = time.monotonic()
request_count = 0
window_lock = asyncio.Lock()
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
nonlocal window_start, request_count
async with window_lock:
current_time = time.monotonic()
if current_time - window_start >= window_seconds:
window_start = current_time
request_count = 0
if request_count >= max_requests:
raise Exception("Rate limit exceeded")
try:
async with semaphore:
request_count += 1
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"Rate limit error: {e}")
raise
return wrapper
return decorator
async def health_check() -> Dict[str, Any]:
"""Check the health of the server and database."""
try:
await manager.read_graph()
return {"status": "healthy", "timestamp": time.time()}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
@app.call_tool()
@rate_limit()
async def call_tool(
name: str,
arguments: Dict[str, Any]
) -> List[types.TextContent]:
try:
result = await getattr(manager, name)(**arguments)
return [types.TextContent(
type="text",
text=json.dumps(result) if result is not None else "Operation completed successfully"
)]
except Exception as e:
error_message = f"Error in {name}: {str(e)}"
logger.error(error_message, exc_info=True)
return [types.TextContent(type="text", text=error_message)]
async with stdio_server() as (read_stream, write_stream):
logger.info(
f"Knowledge Graph MCP Server running on stdio "
f"(database: {config['database_url']})"
)
try:
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
finally:
await manager.cleanup()
def main():
try:
asyncio.run(async_main())
except KeyboardInterrupt:
logger.info("Server shutting down...")
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
exit(1)