"""
Database-Blind Bi-Temporal Knowledge Graph MCP Server
======================================================
A complete MCP server with:
- Bi-temporal knowledge graph (FalkorDB)
- Session-aware episodic memory
- Smart conflict resolution (location/employment changes)
- Entity extraction via OpenAI
- Custom automation tools section
STRUCTURE:
1. Configuration
2. FalkorDB Driver
3. Session Store
4. Entity Extractor
5. Graphiti Memory Core
6. Cleanup Manager
7. MCP Server & Core Tools
8. CUSTOM AUTOMATION TOOLS (add your tools here!)
9. Server Startup
"""
import os
import sys
import logging
import asyncio
import threading
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Tuple
import httpx
from falkordb import FalkorDB
from openai import OpenAI
from fastmcp import FastMCP
# =============================================================================
# LOGGING CONFIGURATION
# =============================================================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# =============================================================================
# CONFIGURATION
# =============================================================================
SERVER_PORT = int(os.getenv("PORT", "5000"))
SERVER_HOST = os.getenv("HOST", "0.0.0.0")
FALKORDB_HOST = os.getenv("FALKORDB_HOST", "localhost")
FALKORDB_PORT = int(os.getenv("FALKORDB_PORT", "6379"))
FALKORDB_USERNAME = os.getenv("FALKORDB_USERNAME", "") # Leave empty if not needed
FALKORDB_PASSWORD = os.getenv("FALKORDB_PASSWORD", "") # Leave empty if not needed
FALKORDB_DATABASE = os.getenv("FALKORDB_DATABASE", "graphiti_memory")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
GROUP_ID = os.getenv("DEFAULT_GROUP_ID", "default")
SESSION_TTL_MINUTES = int(os.getenv("SESSION_TTL_MINUTES", "30"))
MAX_SESSIONS = int(os.getenv("MAX_SESSIONS", "1000"))
CLEANUP_INTERVAL_SECONDS = int(os.getenv("CLEANUP_INTERVAL_SECONDS", "300"))
CONNECTION_IDLE_TIMEOUT = int(os.getenv("CONNECTION_IDLE_TIMEOUT", "600"))
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
OPENAI_MAX_TOKENS = int(os.getenv("OPENAI_MAX_TOKENS", "1000"))
# =============================================================================
# FALKORDB DRIVER (Singleton with Connection Pooling)
# =============================================================================
class FalkorDBDriver:
"""Singleton FalkorDB driver with automatic connection management."""
_instance = None
_lock = threading.Lock()
_client = None
_graph = None
_last_activity = None
@classmethod
def connect(cls):
"""Establish connection to FalkorDB."""
with cls._lock:
if cls._client is None:
try:
# Build connection parameters
conn_params = {
'host': FALKORDB_HOST,
'port': FALKORDB_PORT
}
# Only add username/password if they are set
if FALKORDB_USERNAME:
conn_params['username'] = FALKORDB_USERNAME
if FALKORDB_PASSWORD:
conn_params['password'] = FALKORDB_PASSWORD
cls._client = FalkorDB(**conn_params)
cls._graph = cls._client.select_graph(FALKORDB_DATABASE)
cls._last_activity = datetime.utcnow()
logger.info(f"β
Connected to FalkorDB at {FALKORDB_HOST}:{FALKORDB_PORT}/{FALKORDB_DATABASE}")
# Test connection
cls._graph.query("RETURN 1")
logger.info("β
FalkorDB connection test successful")
except Exception as e:
logger.error(f"β Failed to connect to FalkorDB: {e}")
raise
@classmethod
def disconnect(cls):
"""Close FalkorDB connection."""
with cls._lock:
if cls._client is not None:
try:
cls._client = None
cls._graph = None
cls._last_activity = None
logger.info("π Connection to FalkorDB closed")
except Exception as e:
logger.error(f"Error closing FalkorDB connection: {e}")
@classmethod
def _should_disconnect(cls) -> bool:
"""Check if connection should be closed due to inactivity."""
if cls._last_activity is None:
return False
idle_time = (datetime.utcnow() - cls._last_activity).total_seconds()
return idle_time > CONNECTION_IDLE_TIMEOUT
@classmethod
def _ensure_connected(cls):
"""Ensure connection is active, reconnect if needed."""
if cls._client is None or cls._graph is None:
cls.connect()
cls._last_activity = datetime.utcnow()
@classmethod
def is_connected(cls) -> bool:
"""Check if connected to FalkorDB."""
return cls._client is not None and cls._graph is not None
@classmethod
def query(cls, cypher: str, params: Optional[Dict] = None) -> List[Dict]:
"""Execute a read query."""
cls._ensure_connected()
try:
result = cls._graph.query(cypher, params or {})
return [dict(record) for record in result.result_set] if result.result_set else []
except Exception as e:
logger.error(f"Query error: {e}")
raise
@classmethod
def write(cls, cypher: str, params: Optional[Dict] = None) -> Any:
"""Execute a write query."""
cls._ensure_connected()
try:
result = cls._graph.query(cypher, params or {})
return result
except Exception as e:
logger.error(f"Write error: {e}")
raise
# =============================================================================
# SESSION STORE (Memory-Bounded with TTL)
# =============================================================================
class SessionStore:
"""Thread-safe session store with TTL and memory limits."""
_sessions: Dict[str, Dict[str, Any]] = {}
_lock = threading.Lock()
@classmethod
def get_or_create(cls, session_id: str, group_id: str) -> Dict[str, Any]:
"""Get existing session or create new one."""
with cls._lock:
now = datetime.utcnow()
if len(cls._sessions) >= MAX_SESSIONS:
cls._cleanup_oldest()
if session_id not in cls._sessions:
cls._sessions[session_id] = {
'session_id': session_id,
'group_id': group_id,
'created_at': now,
'last_activity': now,
'message_count': 0,
'entities': set()
}
else:
cls._sessions[session_id]['last_activity'] = now
return cls._sessions[session_id]
@classmethod
def update_activity(cls, session_id: str):
"""Update last activity timestamp."""
with cls._lock:
if session_id in cls._sessions:
cls._sessions[session_id]['last_activity'] = datetime.utcnow()
@classmethod
def increment_messages(cls, session_id: str):
"""Increment message count."""
with cls._lock:
if session_id in cls._sessions:
cls._sessions[session_id]['message_count'] += 1
@classmethod
def add_entity(cls, session_id: str, entity_name: str):
"""Add entity to session."""
with cls._lock:
if session_id in cls._sessions:
cls._sessions[session_id]['entities'].add(entity_name)
@classmethod
def cleanup_expired(cls) -> int:
"""Remove expired sessions."""
with cls._lock:
now = datetime.utcnow()
ttl_delta = timedelta(minutes=SESSION_TTL_MINUTES)
expired = [
sid for sid, sess in cls._sessions.items()
if now - sess['last_activity'] > ttl_delta
]
for sid in expired:
del cls._sessions[sid]
if expired:
logger.info(f"π§Ή Cleaned up {len(expired)} expired sessions")
return len(expired)
@classmethod
def _cleanup_oldest(cls):
"""Remove oldest sessions when limit is reached."""
if not cls._sessions:
return
sorted_sessions = sorted(
cls._sessions.items(),
key=lambda x: x[1]['last_activity']
)
to_remove = max(1, len(sorted_sessions) // 10)
for sid, _ in sorted_sessions[:to_remove]:
del cls._sessions[sid]
logger.info(f"Removed {to_remove} oldest sessions (limit reached)")
@classmethod
def get_stats(cls) -> Dict[str, Any]:
"""Get session store statistics."""
with cls._lock:
now = datetime.utcnow()
ttl_delta = timedelta(minutes=SESSION_TTL_MINUTES)
active = sum(
1 for sess in cls._sessions.values()
if now - sess['last_activity'] <= ttl_delta
)
return {
'total_sessions': len(cls._sessions),
'active_sessions': active,
'expired_sessions': len(cls._sessions) - active,
'max_sessions': MAX_SESSIONS
}
# =============================================================================
# OPENAI ENTITY EXTRACTOR
# =============================================================================
class EntityExtractor:
"""Extract entities and relationships from text using OpenAI."""
_client = None
@classmethod
def _get_client(cls):
"""Get or create OpenAI client."""
if cls._client is None and OPENAI_API_KEY:
cls._client = OpenAI(api_key=OPENAI_API_KEY)
return cls._client
@classmethod
def extract(cls, text: str) -> List[Tuple[str, str, str]]:
"""Extract entities and relationships from text."""
client = cls._get_client()
if not client:
logger.warning("OpenAI not configured, skipping entity extraction")
return []
try:
prompt = f"""Extract entities and their relationships from the following text.
Return ONLY a JSON array of objects, each with: "source", "relation", "target".
Focus on concrete facts about people, places, organizations, and their relationships.
Pay special attention to location and employment changes.
Text: {text}
Example output:
[
{{"source": "John", "relation": "works at", "target": "Google"}},
{{"source": "Sarah", "relation": "lives in", "target": "New York"}}
]"""
response = client.chat.completions.create(
model=OPENAI_MODEL,
messages=[
{"role": "system", "content": "You are an entity extraction assistant. Return only valid JSON arrays."},
{"role": "user", "content": prompt}
],
max_tokens=OPENAI_MAX_TOKENS,
temperature=0.3
)
content = response.choices[0].message.content.strip()
import json
try:
if content.startswith("```"):
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]
entities = json.loads(content.strip())
if not isinstance(entities, list):
logger.warning("OpenAI response is not a list")
return []
result = []
for item in entities:
if all(k in item for k in ['source', 'relation', 'target']):
result.append((
str(item['source']).strip(),
str(item['relation']).strip().lower(),
str(item['target']).strip()
))
logger.info(f"Extracted {len(result)} relationships from text")
return result
except json.JSONDecodeError as e:
logger.error(f"Failed to parse OpenAI response as JSON: {e}")
return []
except Exception as e:
logger.error(f"Entity extraction error: {e}")
return []
# =============================================================================
# GRAPHITI MEMORY CORE
# =============================================================================
class GraphitiMemory:
"""Core memory operations with bi-temporal tracking."""
@staticmethod
def setup_schema():
"""Create indexes and constraints for the graph schema."""
try:
FalkorDBDriver.write("CREATE INDEX IF NOT EXISTS FOR (n:EntityNode) ON (n.group_id)")
FalkorDBDriver.write("CREATE INDEX IF NOT EXISTS FOR (n:EntityNode) ON (n.name)")
FalkorDBDriver.write("CREATE INDEX IF NOT EXISTS FOR (n:EpisodicNode) ON (n.group_id)")
FalkorDBDriver.write("CREATE INDEX IF NOT EXISTS FOR (n:EpisodicNode) ON (n.session_id)")
logger.info("β
Graph schema initialized")
except Exception as e:
logger.error(f"Schema setup error: {e}")
raise
@staticmethod
def _get_current_timestamp() -> str:
"""Get current UTC timestamp in ISO format."""
return datetime.utcnow().isoformat() + 'Z'
@staticmethod
def _normalize_entity_name(name: str) -> str:
"""Normalize entity names for consistency."""
return name.strip().title()
@staticmethod
def _should_invalidate_previous(relation: str) -> bool:
"""Determine if this relation type should invalidate previous facts."""
invalidation_keywords = [
'location', 'lives in', 'moved to', 'relocated',
'works at', 'employed by', 'job', 'position', 'title',
'married to', 'spouse', 'partner'
]
relation_lower = relation.lower()
return any(keyword in relation_lower for keyword in invalidation_keywords)
@staticmethod
def add_fact(
source_entity: str,
relation: str,
target_entity: str,
group_id: str = None,
session_id: str = None,
valid_at: str = None,
metadata: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Add a new fact to the knowledge graph with bi-temporal tracking."""
try:
effective_group = group_id or GROUP_ID
current_time = valid_at or GraphitiMemory._get_current_timestamp()
source_entity = GraphitiMemory._normalize_entity_name(source_entity)
target_entity = GraphitiMemory._normalize_entity_name(target_entity)
if GraphitiMemory._should_invalidate_previous(relation):
GraphitiMemory._invalidate_previous_facts(
source_entity, relation, effective_group, current_time
)
FalkorDBDriver.write("""
MERGE (s:EntityNode {name: $source, group_id: $group})
ON CREATE SET s.created_at = $timestamp, s.updated_at = $timestamp
ON MATCH SET s.updated_at = $timestamp
MERGE (t:EntityNode {name: $target, group_id: $group})
ON CREATE SET t.created_at = $timestamp, t.updated_at = $timestamp
ON MATCH SET t.updated_at = $timestamp
""", {
'source': source_entity,
'target': target_entity,
'group': effective_group,
'timestamp': current_time
})
rel_metadata = metadata or {}
FalkorDBDriver.write("""
MATCH (s:EntityNode {name: $source, group_id: $group})
MATCH (t:EntityNode {name: $target, group_id: $group})
CREATE (s)-[r:RELATES_TO {
fact: $fact,
created_at: $timestamp,
valid_at: $timestamp,
invalid_at: null,
expired_at: null,
session_id: $session,
metadata: $metadata
}]->(t)
""", {
'source': source_entity,
'target': target_entity,
'group': effective_group,
'fact': relation,
'timestamp': current_time,
'session': session_id,
'metadata': str(rel_metadata)
})
if session_id:
GraphitiMemory._link_to_episode(
session_id, effective_group, [source_entity, target_entity]
)
logger.info(f"β
Added fact: {source_entity} -{relation}-> {target_entity}")
return {
'success': True,
'source': source_entity,
'relation': relation,
'target': target_entity,
'valid_at': current_time
}
except Exception as e:
logger.error(f"Error adding fact: {e}")
return {'success': False, 'error': str(e)}
@staticmethod
def _invalidate_previous_facts(
entity: str,
relation: str,
group_id: str,
invalid_at: str
):
"""Invalidate previous facts for the same entity and relation type."""
try:
FalkorDBDriver.write("""
MATCH (s:EntityNode {name: $entity, group_id: $group})-[r:RELATES_TO]->(t:EntityNode)
WHERE r.invalid_at IS NULL
AND toLower(r.fact) CONTAINS $relation_keyword
SET r.invalid_at = $timestamp
""", {
'entity': entity,
'group': group_id,
'relation_keyword': relation.lower().split()[0],
'timestamp': invalid_at
})
logger.info(f"β οΈ Invalidated previous facts for {entity} ({relation})")
except Exception as e:
logger.error(f"Error invalidating facts: {e}")
@staticmethod
def _link_to_episode(session_id: str, group_id: str, entities: List[str]):
"""Link entities to an episodic node."""
try:
current_time = GraphitiMemory._get_current_timestamp()
FalkorDBDriver.write("""
MERGE (e:EpisodicNode {session_id: $session, group_id: $group})
ON CREATE SET
e.uuid = randomUUID(),
e.name = $session,
e.created_at = $timestamp,
e.updated_at = $timestamp,
e.message_count = 1,
e.status = 'active',
e.source = 'mcp_server'
ON MATCH SET
e.updated_at = $timestamp,
e.message_count = e.message_count + 1
""", {
'session': session_id,
'group': group_id,
'timestamp': current_time
})
for entity in entities:
FalkorDBDriver.write("""
MATCH (e:EpisodicNode {session_id: $session, group_id: $group})
MATCH (en:EntityNode {name: $entity, group_id: $group})
MERGE (e)-[:MENTIONS]->(en)
""", {
'session': session_id,
'group': group_id,
'entity': entity
})
except Exception as e:
logger.error(f"Error linking to episode: {e}")
@staticmethod
def add_message_with_extraction(
content: str,
session_id: str,
group_id: str = None,
extract_entities: bool = True
) -> Dict[str, Any]:
"""Add a message and automatically extract entities if enabled."""
try:
effective_group = group_id or GROUP_ID
SessionStore.get_or_create(session_id, effective_group)
SessionStore.increment_messages(session_id)
facts_added = []
if extract_entities and OPENAI_API_KEY:
relationships = EntityExtractor.extract(content)
for source, relation, target in relationships:
result = GraphitiMemory.add_fact(
source_entity=source,
relation=relation,
target_entity=target,
group_id=effective_group,
session_id=session_id
)
if result.get('success'):
facts_added.append(result)
SessionStore.add_entity(session_id, source)
SessionStore.add_entity(session_id, target)
return {
'success': True,
'session_id': session_id,
'facts_added': len(facts_added),
'facts': facts_added
}
except Exception as e:
logger.error(f"Error processing message: {e}")
return {'success': False, 'error': str(e)}
@staticmethod
def query_facts(
entity_name: str = None,
group_id: str = None,
include_invalid: bool = False,
max_facts: int = 20
) -> Dict[str, Any]:
"""Query facts from the knowledge graph."""
try:
effective_group = group_id or GROUP_ID
current_time = GraphitiMemory._get_current_timestamp()
if entity_name:
entity_name = GraphitiMemory._normalize_entity_name(entity_name)
if include_invalid:
results = FalkorDBDriver.query("""
MATCH (s:EntityNode)-[r:RELATES_TO]->(t:EntityNode)
WHERE s.group_id = $group
AND (s.name = $entity OR t.name = $entity)
RETURN s.name as source, r.fact as fact, t.name as target,
r.valid_at as valid_from, r.invalid_at as invalid_at,
r.created_at as created_at
ORDER BY r.created_at DESC
LIMIT $limit
""", {
'group': effective_group,
'entity': entity_name,
'limit': max_facts
})
else:
results = FalkorDBDriver.query("""
MATCH (s:EntityNode)-[r:RELATES_TO]->(t:EntityNode)
WHERE s.group_id = $group
AND (s.name = $entity OR t.name = $entity)
AND (r.invalid_at IS NULL OR r.invalid_at > $timestamp)
RETURN s.name as source, r.fact as fact, t.name as target,
r.valid_at as valid_from, r.invalid_at as invalid_at,
r.created_at as created_at
ORDER BY r.created_at DESC
LIMIT $limit
""", {
'group': effective_group,
'entity': entity_name,
'timestamp': current_time,
'limit': max_facts
})
else:
if include_invalid:
results = FalkorDBDriver.query("""
MATCH (s:EntityNode)-[r:RELATES_TO]->(t:EntityNode)
WHERE s.group_id = $group
RETURN s.name as source, r.fact as fact, t.name as target,
r.valid_at as valid_from, r.invalid_at as invalid_at,
r.created_at as created_at
ORDER BY r.created_at DESC
LIMIT $limit
""", {
'group': effective_group,
'limit': max_facts
})
else:
results = FalkorDBDriver.query("""
MATCH (s:EntityNode)-[r:RELATES_TO]->(t:EntityNode)
WHERE s.group_id = $group
AND (r.invalid_at IS NULL OR r.invalid_at > $timestamp)
RETURN s.name as source, r.fact as fact, t.name as target,
r.valid_at as valid_from, r.invalid_at as invalid_at,
r.created_at as created_at
ORDER BY r.created_at DESC
LIMIT $limit
""", {
'group': effective_group,
'timestamp': current_time,
'limit': max_facts
})
return {
'success': True,
'facts': results,
'count': len(results),
'entity': entity_name,
'include_invalid': include_invalid
}
except Exception as e:
logger.error(f"Error querying facts: {e}")
return {'success': False, 'error': str(e)}
@staticmethod
def query_at_time(
timestamp: str,
entity_name: str = None,
group_id: str = None,
max_facts: int = 20
) -> Dict[str, Any]:
"""Query what facts were valid at a specific point in time."""
try:
effective_group = group_id or GROUP_ID
if entity_name:
entity_name = GraphitiMemory._normalize_entity_name(entity_name)
results = FalkorDBDriver.query("""
MATCH (s:EntityNode)-[r:RELATES_TO]->(t:EntityNode)
WHERE s.group_id = $group
AND (s.name = $entity OR t.name = $entity)
AND r.valid_at <= $timestamp
AND (r.invalid_at IS NULL OR r.invalid_at > $timestamp)
RETURN s.name as source, r.fact as fact, t.name as target,
r.valid_at as valid_from, r.invalid_at as valid_until,
r.created_at as created_at
ORDER BY r.valid_at DESC
LIMIT $limit
""", {
'group': effective_group,
'entity': entity_name,
'timestamp': timestamp,
'limit': max_facts
})
else:
results = FalkorDBDriver.query("""
MATCH (s:EntityNode)-[r:RELATES_TO]->(t:EntityNode)
WHERE s.group_id = $group
AND r.valid_at <= $timestamp
AND (r.invalid_at IS NULL OR r.invalid_at > $timestamp)
RETURN s.name as source, r.fact as fact, t.name as target,
r.valid_at as valid_from, r.invalid_at as valid_until,
r.created_at as created_at
ORDER BY r.valid_at DESC
LIMIT $limit
""", {
'group': effective_group,
'timestamp': timestamp,
'limit': max_facts
})
return {
'success': True,
'timestamp': timestamp,
'facts': results,
'count': len(results),
'message': f'Found {len(results)} facts valid at {timestamp}'
}
except Exception as e:
logger.error(f"Error querying facts at time: {e}")
return {'success': False, 'error': str(e)}
@staticmethod
def get_episodes(
group_ids: List[str] = None,
max_episodes: int = 10
) -> Dict[str, Any]:
"""Get recent episodes (conversation sessions) from the graph."""
try:
effective_groups = group_ids or [GROUP_ID]
results = FalkorDBDriver.query("""
MATCH (e:EpisodicNode)
WHERE e.group_id IN $groups
OPTIONAL MATCH (e)-[:MENTIONS]->(en:EntityNode)
WITH e, collect(DISTINCT en.name) as entities
RETURN e.uuid as uuid, e.session_id as session_id, e.name as name,
e.message_count as message_count, e.source as source,
e.created_at as created_at, e.updated_at as updated_at,
e.status as status, e.group_id as group_id, entities
ORDER BY e.updated_at DESC
LIMIT $limit
""", {
'groups': effective_groups,
'limit': max_episodes
})
if not results:
return {'success': True, 'message': 'No episodes found', 'episodes': []}
return {
'success': True,
'message': f'Found {len(results)} episodes',
'episodes': results
}
except Exception as e:
logger.error(f"Error getting episodes: {e}")
return {'success': False, 'error': str(e)}
@staticmethod
def clear_graph(group_ids: List[str] = None) -> Dict[str, Any]:
"""Clear all data for specified group IDs."""
try:
effective_groups = group_ids or [GROUP_ID]
count_result = FalkorDBDriver.query("""
MATCH (n)
WHERE n.group_id IN $groups
RETURN count(n) as count
""", {'groups': effective_groups})
count = count_result[0]['count'] if count_result else 0
FalkorDBDriver.write("""
MATCH (n)
WHERE n.group_id IN $groups
DETACH DELETE n
""", {'groups': effective_groups})
with SessionStore._lock:
sessions_to_remove = [
sid for sid, sess in SessionStore._sessions.items()
if sess['group_id'] in effective_groups
]
for sid in sessions_to_remove:
del SessionStore._sessions[sid]
return {
'success': True,
'message': f'Deleted {count} nodes from groups: {", ".join(effective_groups)}'
}
except Exception as e:
logger.error(f"Error clearing graph: {e}")
return {'success': False, 'error': str(e)}
# =============================================================================
# CLEANUP MANAGER
# =============================================================================
class CleanupManager:
"""Background task manager for session and connection cleanup."""
_task = None
_running = False
@classmethod
async def start(cls):
"""Start the cleanup background task."""
if cls._running:
return
cls._running = True
cls._task = asyncio.create_task(cls._cleanup_loop())
logger.info(f"π Cleanup manager started (interval: {CLEANUP_INTERVAL_SECONDS}s)")
@classmethod
async def stop(cls):
"""Stop the cleanup background task."""
cls._running = False
if cls._task:
cls._task.cancel()
try:
await cls._task
except asyncio.CancelledError:
pass
logger.info("π Cleanup manager stopped")
@classmethod
async def _cleanup_loop(cls):
"""Main cleanup loop."""
while cls._running:
try:
await asyncio.sleep(CLEANUP_INTERVAL_SECONDS)
expired_count = SessionStore.cleanup_expired()
if FalkorDBDriver._should_disconnect():
FalkorDBDriver.disconnect()
logger.info("π€ Closed idle database connection")
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Cleanup error: {e}")
# =============================================================================
# MCP SERVER INITIALIZATION
# =============================================================================
mcp = FastMCP("Bi-Temporal Knowledge Graph MCP Server")
# =============================================================================
# CORE MEMORY TOOLS
# =============================================================================
@mcp.tool()
async def add_fact(
source_entity: str,
relation: str,
target_entity: str,
group_id: Optional[str] = None,
session_id: Optional[str] = None,
valid_at: Optional[str] = None
) -> Dict[str, Any]:
"""
Add a new fact to the knowledge graph with bi-temporal tracking.
Automatically invalidates previous facts for location/employment changes.
"""
return GraphitiMemory.add_fact(
source_entity=source_entity,
relation=relation,
target_entity=target_entity,
group_id=group_id,
session_id=session_id,
valid_at=valid_at
)
@mcp.tool()
async def add_message(
content: str,
session_id: str,
group_id: Optional[str] = None,
extract_entities: bool = True
) -> Dict[str, Any]:
"""
Add a message and automatically extract entities if enabled.
Uses OpenAI to extract entities and relationships from natural language.
"""
return GraphitiMemory.add_message_with_extraction(
content=content,
session_id=session_id,
group_id=group_id,
extract_entities=extract_entities
)
@mcp.tool()
async def query_facts(
entity_name: Optional[str] = None,
group_id: Optional[str] = None,
include_invalid: bool = False,
max_facts: int = 20
) -> Dict[str, Any]:
"""Query facts from the knowledge graph."""
return GraphitiMemory.query_facts(
entity_name=entity_name,
group_id=group_id,
include_invalid=include_invalid,
max_facts=max_facts
)
@mcp.tool()
async def query_at_time(
timestamp: str,
entity_name: Optional[str] = None,
group_id: Optional[str] = None,
max_facts: int = 20
) -> Dict[str, Any]:
"""Query what facts were valid at a specific point in time."""
return GraphitiMemory.query_at_time(
timestamp=timestamp,
entity_name=entity_name,
group_id=group_id,
max_facts=max_facts
)
@mcp.tool()
async def get_episodes(
group_ids: Optional[List[str]] = None,
max_episodes: int = 10
) -> Dict[str, Any]:
"""Get recent episodes (conversation sessions) from the graph."""
return GraphitiMemory.get_episodes(
group_ids=group_ids,
max_episodes=max_episodes
)
@mcp.tool()
async def clear_graph(
group_ids: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Clear all data for specified group IDs. WARNING: Permanently deletes data!"""
return GraphitiMemory.clear_graph(group_ids=group_ids)
@mcp.tool()
async def get_status() -> Dict[str, Any]:
"""Get server status and database connection info."""
try:
result = FalkorDBDriver.query("MATCH (n) RETURN count(n) as count")
node_count = result[0]['count'] if result else 0
stats = FalkorDBDriver.query("""
MATCH (n)
WITH labels(n)[0] as label, count(*) as count
RETURN label, count
ORDER BY count DESC
""")
rel_stats = FalkorDBDriver.query("""
MATCH ()-[r]->()
WITH type(r) as rel_type, count(*) as count
RETURN rel_type, count
ORDER BY count DESC
""")
fact_validity = FalkorDBDriver.query("""
MATCH ()-[r:RELATES_TO]->()
RETURN
count(CASE WHEN r.invalid_at IS NULL THEN 1 END) as valid_facts,
count(CASE WHEN r.invalid_at IS NOT NULL THEN 1 END) as invalid_facts
""")
session_stats = SessionStore.get_stats()
return {
'success': True,
'status': 'ok',
'architecture': 'database-blind',
'database': 'FalkorDB',
'host': f'{FALKORDB_HOST}:{FALKORDB_PORT}',
'graph': FALKORDB_DATABASE,
'total_nodes': node_count,
'node_types': {s['label']: s['count'] for s in stats} if stats else {},
'relationship_types': {s['rel_type']: s['count'] for s in rel_stats} if rel_stats else {},
'fact_validity': fact_validity[0] if fact_validity else {},
'session_store': session_stats,
'db_connected': FalkorDBDriver.is_connected(),
'openai_enabled': OPENAI_API_KEY is not None,
'bi_temporal': True,
'smart_conflict_resolution': True,
'default_group': GROUP_ID,
'resource_management': {
'session_ttl_minutes': SESSION_TTL_MINUTES,
'max_sessions': MAX_SESSIONS,
'cleanup_interval_seconds': CLEANUP_INTERVAL_SECONDS,
'connection_idle_timeout': CONNECTION_IDLE_TIMEOUT
}
}
except Exception as e:
logger.error(f"Status check failed: {e}")
return {'success': False, 'status': 'error', 'error': str(e)}
@mcp.tool()
async def force_cleanup() -> Dict[str, Any]:
"""Manually trigger cleanup of expired sessions and idle connections."""
try:
expired_count = SessionStore.cleanup_expired()
db_was_connected = FalkorDBDriver.is_connected()
if FalkorDBDriver._should_disconnect():
FalkorDBDriver.disconnect()
return {
'success': True,
'message': 'Cleanup completed',
'expired_sessions_removed': expired_count,
'db_connection_closed': db_was_connected and not FalkorDBDriver.is_connected()
}
except Exception as e:
logger.error(f"Force cleanup failed: {e}")
return {'success': False, 'error': str(e)}
# =============================================================================
# ββββββββββ ββββββββββββββββββββ βββββββ ββββ ββββ βββββββββ βββββββ βββββββ βββ ββββββββ
#βββββββββββ ββββββββββββββββββββββββββββββββββ βββββ ββββββββββββββββββββββββββββββ ββββββββ
#βββ βββ βββββββββββ βββ βββ ββββββββββββββ βββ βββ ββββββ ββββββ ββββββββ
#βββ βββ βββββββββββ βββ βββ ββββββββββββββ βββ βββ ββββββ ββββββ ββββββββ
#βββββββββββββββββββββββββ βββ ββββββββββββ βββ βββ βββ ββββββββββββββββββββββββββββββββββ
# βββββββ βββββββ ββββββββ βββ βββββββ βββ βββ βββ βββββββ βββββββ ββββββββββββββββ
#
# ADD YOUR CUSTOM AUTOMATION TOOLS BELOW
# Each tool should be decorated with @mcp.tool()
# The Automation Engine App can generate and paste tools here
#
# Example:
# @mcp.tool()
# async def my_custom_tool(param1: str, param2: int = 10) -> str:
# """Tool description that the AI will see."""
# import httpx
# payload = {"param1": param1, "param2": param2}
# url = "https://your-webhook-url.com/endpoint"
# async with httpx.AsyncClient() as client:
# try:
# resp = await client.post(url, json=payload)
# return f"Success: returned {resp.status_code}"
# except Exception as e:
# return f"Error: {str(e)}"
# =============================================================================
# ============ Template: LinkedIn Poster - Automation Engine Template ============
# Independent tools for each webhook in this template
# Total webhooks: 3
@mcp.tool()
async def linkedin_poster___automation_engine_template___linkedin__template1_image(caption: str, imageurl: str) -> str:
"""
Posts an image with a caption to your LinkedIn page.
Args:
caption: The text content that will appear with the image. (maps to 'caption')
imageurl: A direct, public URL to the image file (e.g., ending in .jpg, .png). (maps to 'imageUrl')
"""
import httpx
payload = {"caption": caption, "imageUrl": imageurl}
url = "https://webhook.latenode.com/48975/prod/linkedin_template1_image"
async with httpx.AsyncClient() as client:
try:
resp = await client.post(url, json=payload)
return f"Success: LinkedIn Poster - Automation Engine Template - Linkedin _Template1_Image returned {resp.status_code}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
async def linkedin_poster___automation_engine_template___linkedin__template1_text(content: str) -> str:
"""
Posts a text-only update to your LinkedIn page.
Args:
content: The full text content of the LinkedIn post. (maps to 'content')
"""
import httpx
payload = {"content": content}
url = "https://webhook.latenode.com/48975/prod/linkedin_template1_text"
async with httpx.AsyncClient() as client:
try:
resp = await client.post(url, json=payload)
return f"Success: LinkedIn Poster - Automation Engine Template - Linkedin _Template1_Text returned {resp.status_code}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
async def linkedin_poster___automation_engine_template___linkedin__template1_video(caption: str, videourl: str) -> str:
"""
Posts a video with a title and caption to your LinkedIn page.
Args:
caption: The text content that will appear with the video. (maps to 'caption')
videourl: A direct, public URL to the video file (e.g., ending in .mp4). (maps to 'videoUrl')
"""
import httpx
payload = {"caption": caption, "videoUrl": videourl}
url = "https://webhook.latenode.com/48975/prod/linkedin_template1_video"
async with httpx.AsyncClient() as client:
try:
resp = await client.post(url, json=payload)
return f"Success: LinkedIn Poster - Automation Engine Template - Linkedin _Template1_Video returned {resp.status_code}"
except Exception as e:
return f"Error: {str(e)}"
# =============================================================================
# END CUSTOM TOOLS SECTION
# =============================================================================
# =============================================================================
# SERVER STARTUP
# =============================================================================
async def startup():
"""Initialize all components on server startup."""
logger.info("Starting Bi-Temporal Knowledge Graph MCP Server...")
try:
FalkorDBDriver.connect()
GraphitiMemory.setup_schema()
logger.info("β
FalkorDB initialized successfully")
except Exception as e:
logger.error(f"β FalkorDB initialization failed: {e}")
sys.exit(1)
try:
await CleanupManager.start()
except Exception as e:
logger.error(f"Cleanup manager failed to start: {e}")
if OPENAI_API_KEY:
logger.info("β
OpenAI configured - Auto entity extraction enabled")
else:
logger.warning("β οΈ OPENAI_API_KEY not set - Manual fact addition only")
logger.info(f"π Default group: {GROUP_ID}")
logger.info(f"β° Session TTL: {SESSION_TTL_MINUTES} minutes")
logger.info("β
Server initialization complete")
async def shutdown():
"""Clean up resources on server shutdown."""
logger.info("π Shutting down server...")
try:
await CleanupManager.stop()
except Exception as e:
logger.error(f"Error stopping cleanup manager: {e}")
try:
FalkorDBDriver.disconnect()
except Exception as e:
logger.error(f"Error closing FalkorDB connection: {e}")
logger.info("β
Server shutdown complete")
if __name__ == "__main__":
print("""
+---------------------------------------------------------------+
| Database-Blind Bi-Temporal Knowledge Graph MCP Server |
| |
| Features: |
| - Session-aware episodes with fact invalidation |
| - Full bi-temporal tracking (valid_at + invalid_at) |
| - Smart conflict resolution (location/employment changes) |
| - Auto session cleanup (TTL-based) |
| - FalkorDB + OpenAI integration |
| |
| Custom Tools: Add @mcp.tool() functions in the |
| CUSTOM AUTOMATION TOOLS section above |
+---------------------------------------------------------------+
""")
print(f"π Starting server on {SERVER_HOST}:{SERVER_PORT}")
print(f"π‘ MCP endpoint: /sse\n")
mcp.run(
transport="sse",
host=SERVER_HOST,
port=SERVER_PORT
)