Skip to main content
Glama
connectimtiazh

Bi-Temporal Knowledge Graph MCP Server

main.pyβ€’46 kB
""" Database-Blind Bi-Temporal Knowledge Graph MCP Server ====================================================== A complete MCP server with: - Bi-temporal knowledge graph (FalkorDB) - Session-aware episodic memory - Smart conflict resolution (location/employment changes) - Entity extraction via OpenAI - Custom automation tools section STRUCTURE: 1. Configuration 2. FalkorDB Driver 3. Session Store 4. Entity Extractor 5. Graphiti Memory Core 6. Cleanup Manager 7. MCP Server & Core Tools 8. CUSTOM AUTOMATION TOOLS (add your tools here!) 9. Server Startup """ import os import sys import logging import asyncio import threading from datetime import datetime, timedelta from typing import Dict, Any, List, Optional, Tuple import httpx from falkordb import FalkorDB from openai import OpenAI from fastmcp import FastMCP # ============================================================================= # LOGGING CONFIGURATION # ============================================================================= logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # ============================================================================= # CONFIGURATION # ============================================================================= SERVER_PORT = int(os.getenv("PORT", "5000")) SERVER_HOST = os.getenv("HOST", "0.0.0.0") FALKORDB_HOST = os.getenv("FALKORDB_HOST", "localhost") FALKORDB_PORT = int(os.getenv("FALKORDB_PORT", "6379")) FALKORDB_USERNAME = os.getenv("FALKORDB_USERNAME", "") # Leave empty if not needed FALKORDB_PASSWORD = os.getenv("FALKORDB_PASSWORD", "") # Leave empty if not needed FALKORDB_DATABASE = os.getenv("FALKORDB_DATABASE", "graphiti_memory") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") GROUP_ID = os.getenv("DEFAULT_GROUP_ID", "default") SESSION_TTL_MINUTES = int(os.getenv("SESSION_TTL_MINUTES", "30")) MAX_SESSIONS = int(os.getenv("MAX_SESSIONS", "1000")) CLEANUP_INTERVAL_SECONDS = int(os.getenv("CLEANUP_INTERVAL_SECONDS", "300")) CONNECTION_IDLE_TIMEOUT = int(os.getenv("CONNECTION_IDLE_TIMEOUT", "600")) OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini") OPENAI_MAX_TOKENS = int(os.getenv("OPENAI_MAX_TOKENS", "1000")) # ============================================================================= # FALKORDB DRIVER (Singleton with Connection Pooling) # ============================================================================= class FalkorDBDriver: """Singleton FalkorDB driver with automatic connection management.""" _instance = None _lock = threading.Lock() _client = None _graph = None _last_activity = None @classmethod def connect(cls): """Establish connection to FalkorDB.""" with cls._lock: if cls._client is None: try: # Build connection parameters conn_params = { 'host': FALKORDB_HOST, 'port': FALKORDB_PORT } # Only add username/password if they are set if FALKORDB_USERNAME: conn_params['username'] = FALKORDB_USERNAME if FALKORDB_PASSWORD: conn_params['password'] = FALKORDB_PASSWORD cls._client = FalkorDB(**conn_params) cls._graph = cls._client.select_graph(FALKORDB_DATABASE) cls._last_activity = datetime.utcnow() logger.info(f"βœ… Connected to FalkorDB at {FALKORDB_HOST}:{FALKORDB_PORT}/{FALKORDB_DATABASE}") # Test connection cls._graph.query("RETURN 1") logger.info("βœ… FalkorDB connection test successful") except Exception as e: logger.error(f"❌ Failed to connect to FalkorDB: {e}") raise @classmethod def disconnect(cls): """Close FalkorDB connection.""" with cls._lock: if cls._client is not None: try: cls._client = None cls._graph = None cls._last_activity = None logger.info("πŸ”Œ Connection to FalkorDB closed") except Exception as e: logger.error(f"Error closing FalkorDB connection: {e}") @classmethod def _should_disconnect(cls) -> bool: """Check if connection should be closed due to inactivity.""" if cls._last_activity is None: return False idle_time = (datetime.utcnow() - cls._last_activity).total_seconds() return idle_time > CONNECTION_IDLE_TIMEOUT @classmethod def _ensure_connected(cls): """Ensure connection is active, reconnect if needed.""" if cls._client is None or cls._graph is None: cls.connect() cls._last_activity = datetime.utcnow() @classmethod def is_connected(cls) -> bool: """Check if connected to FalkorDB.""" return cls._client is not None and cls._graph is not None @classmethod def query(cls, cypher: str, params: Optional[Dict] = None) -> List[Dict]: """Execute a read query.""" cls._ensure_connected() try: result = cls._graph.query(cypher, params or {}) return [dict(record) for record in result.result_set] if result.result_set else [] except Exception as e: logger.error(f"Query error: {e}") raise @classmethod def write(cls, cypher: str, params: Optional[Dict] = None) -> Any: """Execute a write query.""" cls._ensure_connected() try: result = cls._graph.query(cypher, params or {}) return result except Exception as e: logger.error(f"Write error: {e}") raise # ============================================================================= # SESSION STORE (Memory-Bounded with TTL) # ============================================================================= class SessionStore: """Thread-safe session store with TTL and memory limits.""" _sessions: Dict[str, Dict[str, Any]] = {} _lock = threading.Lock() @classmethod def get_or_create(cls, session_id: str, group_id: str) -> Dict[str, Any]: """Get existing session or create new one.""" with cls._lock: now = datetime.utcnow() if len(cls._sessions) >= MAX_SESSIONS: cls._cleanup_oldest() if session_id not in cls._sessions: cls._sessions[session_id] = { 'session_id': session_id, 'group_id': group_id, 'created_at': now, 'last_activity': now, 'message_count': 0, 'entities': set() } else: cls._sessions[session_id]['last_activity'] = now return cls._sessions[session_id] @classmethod def update_activity(cls, session_id: str): """Update last activity timestamp.""" with cls._lock: if session_id in cls._sessions: cls._sessions[session_id]['last_activity'] = datetime.utcnow() @classmethod def increment_messages(cls, session_id: str): """Increment message count.""" with cls._lock: if session_id in cls._sessions: cls._sessions[session_id]['message_count'] += 1 @classmethod def add_entity(cls, session_id: str, entity_name: str): """Add entity to session.""" with cls._lock: if session_id in cls._sessions: cls._sessions[session_id]['entities'].add(entity_name) @classmethod def cleanup_expired(cls) -> int: """Remove expired sessions.""" with cls._lock: now = datetime.utcnow() ttl_delta = timedelta(minutes=SESSION_TTL_MINUTES) expired = [ sid for sid, sess in cls._sessions.items() if now - sess['last_activity'] > ttl_delta ] for sid in expired: del cls._sessions[sid] if expired: logger.info(f"🧹 Cleaned up {len(expired)} expired sessions") return len(expired) @classmethod def _cleanup_oldest(cls): """Remove oldest sessions when limit is reached.""" if not cls._sessions: return sorted_sessions = sorted( cls._sessions.items(), key=lambda x: x[1]['last_activity'] ) to_remove = max(1, len(sorted_sessions) // 10) for sid, _ in sorted_sessions[:to_remove]: del cls._sessions[sid] logger.info(f"Removed {to_remove} oldest sessions (limit reached)") @classmethod def get_stats(cls) -> Dict[str, Any]: """Get session store statistics.""" with cls._lock: now = datetime.utcnow() ttl_delta = timedelta(minutes=SESSION_TTL_MINUTES) active = sum( 1 for sess in cls._sessions.values() if now - sess['last_activity'] <= ttl_delta ) return { 'total_sessions': len(cls._sessions), 'active_sessions': active, 'expired_sessions': len(cls._sessions) - active, 'max_sessions': MAX_SESSIONS } # ============================================================================= # OPENAI ENTITY EXTRACTOR # ============================================================================= class EntityExtractor: """Extract entities and relationships from text using OpenAI.""" _client = None @classmethod def _get_client(cls): """Get or create OpenAI client.""" if cls._client is None and OPENAI_API_KEY: cls._client = OpenAI(api_key=OPENAI_API_KEY) return cls._client @classmethod def extract(cls, text: str) -> List[Tuple[str, str, str]]: """Extract entities and relationships from text.""" client = cls._get_client() if not client: logger.warning("OpenAI not configured, skipping entity extraction") return [] try: prompt = f"""Extract entities and their relationships from the following text. Return ONLY a JSON array of objects, each with: "source", "relation", "target". Focus on concrete facts about people, places, organizations, and their relationships. Pay special attention to location and employment changes. Text: {text} Example output: [ {{"source": "John", "relation": "works at", "target": "Google"}}, {{"source": "Sarah", "relation": "lives in", "target": "New York"}} ]""" response = client.chat.completions.create( model=OPENAI_MODEL, messages=[ {"role": "system", "content": "You are an entity extraction assistant. Return only valid JSON arrays."}, {"role": "user", "content": prompt} ], max_tokens=OPENAI_MAX_TOKENS, temperature=0.3 ) content = response.choices[0].message.content.strip() import json try: if content.startswith("```"): content = content.split("```")[1] if content.startswith("json"): content = content[4:] entities = json.loads(content.strip()) if not isinstance(entities, list): logger.warning("OpenAI response is not a list") return [] result = [] for item in entities: if all(k in item for k in ['source', 'relation', 'target']): result.append(( str(item['source']).strip(), str(item['relation']).strip().lower(), str(item['target']).strip() )) logger.info(f"Extracted {len(result)} relationships from text") return result except json.JSONDecodeError as e: logger.error(f"Failed to parse OpenAI response as JSON: {e}") return [] except Exception as e: logger.error(f"Entity extraction error: {e}") return [] # ============================================================================= # GRAPHITI MEMORY CORE # ============================================================================= class GraphitiMemory: """Core memory operations with bi-temporal tracking.""" @staticmethod def setup_schema(): """Create indexes and constraints for the graph schema.""" try: FalkorDBDriver.write("CREATE INDEX IF NOT EXISTS FOR (n:EntityNode) ON (n.group_id)") FalkorDBDriver.write("CREATE INDEX IF NOT EXISTS FOR (n:EntityNode) ON (n.name)") FalkorDBDriver.write("CREATE INDEX IF NOT EXISTS FOR (n:EpisodicNode) ON (n.group_id)") FalkorDBDriver.write("CREATE INDEX IF NOT EXISTS FOR (n:EpisodicNode) ON (n.session_id)") logger.info("βœ… Graph schema initialized") except Exception as e: logger.error(f"Schema setup error: {e}") raise @staticmethod def _get_current_timestamp() -> str: """Get current UTC timestamp in ISO format.""" return datetime.utcnow().isoformat() + 'Z' @staticmethod def _normalize_entity_name(name: str) -> str: """Normalize entity names for consistency.""" return name.strip().title() @staticmethod def _should_invalidate_previous(relation: str) -> bool: """Determine if this relation type should invalidate previous facts.""" invalidation_keywords = [ 'location', 'lives in', 'moved to', 'relocated', 'works at', 'employed by', 'job', 'position', 'title', 'married to', 'spouse', 'partner' ] relation_lower = relation.lower() return any(keyword in relation_lower for keyword in invalidation_keywords) @staticmethod def add_fact( source_entity: str, relation: str, target_entity: str, group_id: str = None, session_id: str = None, valid_at: str = None, metadata: Dict[str, Any] = None ) -> Dict[str, Any]: """Add a new fact to the knowledge graph with bi-temporal tracking.""" try: effective_group = group_id or GROUP_ID current_time = valid_at or GraphitiMemory._get_current_timestamp() source_entity = GraphitiMemory._normalize_entity_name(source_entity) target_entity = GraphitiMemory._normalize_entity_name(target_entity) if GraphitiMemory._should_invalidate_previous(relation): GraphitiMemory._invalidate_previous_facts( source_entity, relation, effective_group, current_time ) FalkorDBDriver.write(""" MERGE (s:EntityNode {name: $source, group_id: $group}) ON CREATE SET s.created_at = $timestamp, s.updated_at = $timestamp ON MATCH SET s.updated_at = $timestamp MERGE (t:EntityNode {name: $target, group_id: $group}) ON CREATE SET t.created_at = $timestamp, t.updated_at = $timestamp ON MATCH SET t.updated_at = $timestamp """, { 'source': source_entity, 'target': target_entity, 'group': effective_group, 'timestamp': current_time }) rel_metadata = metadata or {} FalkorDBDriver.write(""" MATCH (s:EntityNode {name: $source, group_id: $group}) MATCH (t:EntityNode {name: $target, group_id: $group}) CREATE (s)-[r:RELATES_TO { fact: $fact, created_at: $timestamp, valid_at: $timestamp, invalid_at: null, expired_at: null, session_id: $session, metadata: $metadata }]->(t) """, { 'source': source_entity, 'target': target_entity, 'group': effective_group, 'fact': relation, 'timestamp': current_time, 'session': session_id, 'metadata': str(rel_metadata) }) if session_id: GraphitiMemory._link_to_episode( session_id, effective_group, [source_entity, target_entity] ) logger.info(f"βœ… Added fact: {source_entity} -{relation}-> {target_entity}") return { 'success': True, 'source': source_entity, 'relation': relation, 'target': target_entity, 'valid_at': current_time } except Exception as e: logger.error(f"Error adding fact: {e}") return {'success': False, 'error': str(e)} @staticmethod def _invalidate_previous_facts( entity: str, relation: str, group_id: str, invalid_at: str ): """Invalidate previous facts for the same entity and relation type.""" try: FalkorDBDriver.write(""" MATCH (s:EntityNode {name: $entity, group_id: $group})-[r:RELATES_TO]->(t:EntityNode) WHERE r.invalid_at IS NULL AND toLower(r.fact) CONTAINS $relation_keyword SET r.invalid_at = $timestamp """, { 'entity': entity, 'group': group_id, 'relation_keyword': relation.lower().split()[0], 'timestamp': invalid_at }) logger.info(f"⚠️ Invalidated previous facts for {entity} ({relation})") except Exception as e: logger.error(f"Error invalidating facts: {e}") @staticmethod def _link_to_episode(session_id: str, group_id: str, entities: List[str]): """Link entities to an episodic node.""" try: current_time = GraphitiMemory._get_current_timestamp() FalkorDBDriver.write(""" MERGE (e:EpisodicNode {session_id: $session, group_id: $group}) ON CREATE SET e.uuid = randomUUID(), e.name = $session, e.created_at = $timestamp, e.updated_at = $timestamp, e.message_count = 1, e.status = 'active', e.source = 'mcp_server' ON MATCH SET e.updated_at = $timestamp, e.message_count = e.message_count + 1 """, { 'session': session_id, 'group': group_id, 'timestamp': current_time }) for entity in entities: FalkorDBDriver.write(""" MATCH (e:EpisodicNode {session_id: $session, group_id: $group}) MATCH (en:EntityNode {name: $entity, group_id: $group}) MERGE (e)-[:MENTIONS]->(en) """, { 'session': session_id, 'group': group_id, 'entity': entity }) except Exception as e: logger.error(f"Error linking to episode: {e}") @staticmethod def add_message_with_extraction( content: str, session_id: str, group_id: str = None, extract_entities: bool = True ) -> Dict[str, Any]: """Add a message and automatically extract entities if enabled.""" try: effective_group = group_id or GROUP_ID SessionStore.get_or_create(session_id, effective_group) SessionStore.increment_messages(session_id) facts_added = [] if extract_entities and OPENAI_API_KEY: relationships = EntityExtractor.extract(content) for source, relation, target in relationships: result = GraphitiMemory.add_fact( source_entity=source, relation=relation, target_entity=target, group_id=effective_group, session_id=session_id ) if result.get('success'): facts_added.append(result) SessionStore.add_entity(session_id, source) SessionStore.add_entity(session_id, target) return { 'success': True, 'session_id': session_id, 'facts_added': len(facts_added), 'facts': facts_added } except Exception as e: logger.error(f"Error processing message: {e}") return {'success': False, 'error': str(e)} @staticmethod def query_facts( entity_name: str = None, group_id: str = None, include_invalid: bool = False, max_facts: int = 20 ) -> Dict[str, Any]: """Query facts from the knowledge graph.""" try: effective_group = group_id or GROUP_ID current_time = GraphitiMemory._get_current_timestamp() if entity_name: entity_name = GraphitiMemory._normalize_entity_name(entity_name) if include_invalid: results = FalkorDBDriver.query(""" MATCH (s:EntityNode)-[r:RELATES_TO]->(t:EntityNode) WHERE s.group_id = $group AND (s.name = $entity OR t.name = $entity) RETURN s.name as source, r.fact as fact, t.name as target, r.valid_at as valid_from, r.invalid_at as invalid_at, r.created_at as created_at ORDER BY r.created_at DESC LIMIT $limit """, { 'group': effective_group, 'entity': entity_name, 'limit': max_facts }) else: results = FalkorDBDriver.query(""" MATCH (s:EntityNode)-[r:RELATES_TO]->(t:EntityNode) WHERE s.group_id = $group AND (s.name = $entity OR t.name = $entity) AND (r.invalid_at IS NULL OR r.invalid_at > $timestamp) RETURN s.name as source, r.fact as fact, t.name as target, r.valid_at as valid_from, r.invalid_at as invalid_at, r.created_at as created_at ORDER BY r.created_at DESC LIMIT $limit """, { 'group': effective_group, 'entity': entity_name, 'timestamp': current_time, 'limit': max_facts }) else: if include_invalid: results = FalkorDBDriver.query(""" MATCH (s:EntityNode)-[r:RELATES_TO]->(t:EntityNode) WHERE s.group_id = $group RETURN s.name as source, r.fact as fact, t.name as target, r.valid_at as valid_from, r.invalid_at as invalid_at, r.created_at as created_at ORDER BY r.created_at DESC LIMIT $limit """, { 'group': effective_group, 'limit': max_facts }) else: results = FalkorDBDriver.query(""" MATCH (s:EntityNode)-[r:RELATES_TO]->(t:EntityNode) WHERE s.group_id = $group AND (r.invalid_at IS NULL OR r.invalid_at > $timestamp) RETURN s.name as source, r.fact as fact, t.name as target, r.valid_at as valid_from, r.invalid_at as invalid_at, r.created_at as created_at ORDER BY r.created_at DESC LIMIT $limit """, { 'group': effective_group, 'timestamp': current_time, 'limit': max_facts }) return { 'success': True, 'facts': results, 'count': len(results), 'entity': entity_name, 'include_invalid': include_invalid } except Exception as e: logger.error(f"Error querying facts: {e}") return {'success': False, 'error': str(e)} @staticmethod def query_at_time( timestamp: str, entity_name: str = None, group_id: str = None, max_facts: int = 20 ) -> Dict[str, Any]: """Query what facts were valid at a specific point in time.""" try: effective_group = group_id or GROUP_ID if entity_name: entity_name = GraphitiMemory._normalize_entity_name(entity_name) results = FalkorDBDriver.query(""" MATCH (s:EntityNode)-[r:RELATES_TO]->(t:EntityNode) WHERE s.group_id = $group AND (s.name = $entity OR t.name = $entity) AND r.valid_at <= $timestamp AND (r.invalid_at IS NULL OR r.invalid_at > $timestamp) RETURN s.name as source, r.fact as fact, t.name as target, r.valid_at as valid_from, r.invalid_at as valid_until, r.created_at as created_at ORDER BY r.valid_at DESC LIMIT $limit """, { 'group': effective_group, 'entity': entity_name, 'timestamp': timestamp, 'limit': max_facts }) else: results = FalkorDBDriver.query(""" MATCH (s:EntityNode)-[r:RELATES_TO]->(t:EntityNode) WHERE s.group_id = $group AND r.valid_at <= $timestamp AND (r.invalid_at IS NULL OR r.invalid_at > $timestamp) RETURN s.name as source, r.fact as fact, t.name as target, r.valid_at as valid_from, r.invalid_at as valid_until, r.created_at as created_at ORDER BY r.valid_at DESC LIMIT $limit """, { 'group': effective_group, 'timestamp': timestamp, 'limit': max_facts }) return { 'success': True, 'timestamp': timestamp, 'facts': results, 'count': len(results), 'message': f'Found {len(results)} facts valid at {timestamp}' } except Exception as e: logger.error(f"Error querying facts at time: {e}") return {'success': False, 'error': str(e)} @staticmethod def get_episodes( group_ids: List[str] = None, max_episodes: int = 10 ) -> Dict[str, Any]: """Get recent episodes (conversation sessions) from the graph.""" try: effective_groups = group_ids or [GROUP_ID] results = FalkorDBDriver.query(""" MATCH (e:EpisodicNode) WHERE e.group_id IN $groups OPTIONAL MATCH (e)-[:MENTIONS]->(en:EntityNode) WITH e, collect(DISTINCT en.name) as entities RETURN e.uuid as uuid, e.session_id as session_id, e.name as name, e.message_count as message_count, e.source as source, e.created_at as created_at, e.updated_at as updated_at, e.status as status, e.group_id as group_id, entities ORDER BY e.updated_at DESC LIMIT $limit """, { 'groups': effective_groups, 'limit': max_episodes }) if not results: return {'success': True, 'message': 'No episodes found', 'episodes': []} return { 'success': True, 'message': f'Found {len(results)} episodes', 'episodes': results } except Exception as e: logger.error(f"Error getting episodes: {e}") return {'success': False, 'error': str(e)} @staticmethod def clear_graph(group_ids: List[str] = None) -> Dict[str, Any]: """Clear all data for specified group IDs.""" try: effective_groups = group_ids or [GROUP_ID] count_result = FalkorDBDriver.query(""" MATCH (n) WHERE n.group_id IN $groups RETURN count(n) as count """, {'groups': effective_groups}) count = count_result[0]['count'] if count_result else 0 FalkorDBDriver.write(""" MATCH (n) WHERE n.group_id IN $groups DETACH DELETE n """, {'groups': effective_groups}) with SessionStore._lock: sessions_to_remove = [ sid for sid, sess in SessionStore._sessions.items() if sess['group_id'] in effective_groups ] for sid in sessions_to_remove: del SessionStore._sessions[sid] return { 'success': True, 'message': f'Deleted {count} nodes from groups: {", ".join(effective_groups)}' } except Exception as e: logger.error(f"Error clearing graph: {e}") return {'success': False, 'error': str(e)} # ============================================================================= # CLEANUP MANAGER # ============================================================================= class CleanupManager: """Background task manager for session and connection cleanup.""" _task = None _running = False @classmethod async def start(cls): """Start the cleanup background task.""" if cls._running: return cls._running = True cls._task = asyncio.create_task(cls._cleanup_loop()) logger.info(f"πŸ”„ Cleanup manager started (interval: {CLEANUP_INTERVAL_SECONDS}s)") @classmethod async def stop(cls): """Stop the cleanup background task.""" cls._running = False if cls._task: cls._task.cancel() try: await cls._task except asyncio.CancelledError: pass logger.info("πŸ›‘ Cleanup manager stopped") @classmethod async def _cleanup_loop(cls): """Main cleanup loop.""" while cls._running: try: await asyncio.sleep(CLEANUP_INTERVAL_SECONDS) expired_count = SessionStore.cleanup_expired() if FalkorDBDriver._should_disconnect(): FalkorDBDriver.disconnect() logger.info("πŸ’€ Closed idle database connection") except asyncio.CancelledError: break except Exception as e: logger.error(f"Cleanup error: {e}") # ============================================================================= # MCP SERVER INITIALIZATION # ============================================================================= mcp = FastMCP("Bi-Temporal Knowledge Graph MCP Server") # ============================================================================= # CORE MEMORY TOOLS # ============================================================================= @mcp.tool() async def add_fact( source_entity: str, relation: str, target_entity: str, group_id: Optional[str] = None, session_id: Optional[str] = None, valid_at: Optional[str] = None ) -> Dict[str, Any]: """ Add a new fact to the knowledge graph with bi-temporal tracking. Automatically invalidates previous facts for location/employment changes. """ return GraphitiMemory.add_fact( source_entity=source_entity, relation=relation, target_entity=target_entity, group_id=group_id, session_id=session_id, valid_at=valid_at ) @mcp.tool() async def add_message( content: str, session_id: str, group_id: Optional[str] = None, extract_entities: bool = True ) -> Dict[str, Any]: """ Add a message and automatically extract entities if enabled. Uses OpenAI to extract entities and relationships from natural language. """ return GraphitiMemory.add_message_with_extraction( content=content, session_id=session_id, group_id=group_id, extract_entities=extract_entities ) @mcp.tool() async def query_facts( entity_name: Optional[str] = None, group_id: Optional[str] = None, include_invalid: bool = False, max_facts: int = 20 ) -> Dict[str, Any]: """Query facts from the knowledge graph.""" return GraphitiMemory.query_facts( entity_name=entity_name, group_id=group_id, include_invalid=include_invalid, max_facts=max_facts ) @mcp.tool() async def query_at_time( timestamp: str, entity_name: Optional[str] = None, group_id: Optional[str] = None, max_facts: int = 20 ) -> Dict[str, Any]: """Query what facts were valid at a specific point in time.""" return GraphitiMemory.query_at_time( timestamp=timestamp, entity_name=entity_name, group_id=group_id, max_facts=max_facts ) @mcp.tool() async def get_episodes( group_ids: Optional[List[str]] = None, max_episodes: int = 10 ) -> Dict[str, Any]: """Get recent episodes (conversation sessions) from the graph.""" return GraphitiMemory.get_episodes( group_ids=group_ids, max_episodes=max_episodes ) @mcp.tool() async def clear_graph( group_ids: Optional[List[str]] = None ) -> Dict[str, Any]: """Clear all data for specified group IDs. WARNING: Permanently deletes data!""" return GraphitiMemory.clear_graph(group_ids=group_ids) @mcp.tool() async def get_status() -> Dict[str, Any]: """Get server status and database connection info.""" try: result = FalkorDBDriver.query("MATCH (n) RETURN count(n) as count") node_count = result[0]['count'] if result else 0 stats = FalkorDBDriver.query(""" MATCH (n) WITH labels(n)[0] as label, count(*) as count RETURN label, count ORDER BY count DESC """) rel_stats = FalkorDBDriver.query(""" MATCH ()-[r]->() WITH type(r) as rel_type, count(*) as count RETURN rel_type, count ORDER BY count DESC """) fact_validity = FalkorDBDriver.query(""" MATCH ()-[r:RELATES_TO]->() RETURN count(CASE WHEN r.invalid_at IS NULL THEN 1 END) as valid_facts, count(CASE WHEN r.invalid_at IS NOT NULL THEN 1 END) as invalid_facts """) session_stats = SessionStore.get_stats() return { 'success': True, 'status': 'ok', 'architecture': 'database-blind', 'database': 'FalkorDB', 'host': f'{FALKORDB_HOST}:{FALKORDB_PORT}', 'graph': FALKORDB_DATABASE, 'total_nodes': node_count, 'node_types': {s['label']: s['count'] for s in stats} if stats else {}, 'relationship_types': {s['rel_type']: s['count'] for s in rel_stats} if rel_stats else {}, 'fact_validity': fact_validity[0] if fact_validity else {}, 'session_store': session_stats, 'db_connected': FalkorDBDriver.is_connected(), 'openai_enabled': OPENAI_API_KEY is not None, 'bi_temporal': True, 'smart_conflict_resolution': True, 'default_group': GROUP_ID, 'resource_management': { 'session_ttl_minutes': SESSION_TTL_MINUTES, 'max_sessions': MAX_SESSIONS, 'cleanup_interval_seconds': CLEANUP_INTERVAL_SECONDS, 'connection_idle_timeout': CONNECTION_IDLE_TIMEOUT } } except Exception as e: logger.error(f"Status check failed: {e}") return {'success': False, 'status': 'error', 'error': str(e)} @mcp.tool() async def force_cleanup() -> Dict[str, Any]: """Manually trigger cleanup of expired sessions and idle connections.""" try: expired_count = SessionStore.cleanup_expired() db_was_connected = FalkorDBDriver.is_connected() if FalkorDBDriver._should_disconnect(): FalkorDBDriver.disconnect() return { 'success': True, 'message': 'Cleanup completed', 'expired_sessions_removed': expired_count, 'db_connection_closed': db_was_connected and not FalkorDBDriver.is_connected() } except Exception as e: logger.error(f"Force cleanup failed: {e}") return {'success': False, 'error': str(e)} # ============================================================================= # β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ•— β–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— #β–ˆβ–ˆβ•”β•β•β•β•β•β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•”β•β•β•β•β•β•šβ•β•β–ˆβ–ˆβ•”β•β•β•β–ˆβ–ˆβ•”β•β•β•β–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ•‘ β•šβ•β•β–ˆβ–ˆβ•”β•β•β•β–ˆβ–ˆβ•”β•β•β•β–ˆβ–ˆβ•—β–ˆβ–ˆβ•”β•β•β•β–ˆβ–ˆβ•—β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•”β•β•β•β•β• #β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•”β–ˆβ–ˆβ–ˆβ–ˆβ•”β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— #β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘β•šβ•β•β•β•β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘β•šβ–ˆβ–ˆβ•”β•β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘ β•šβ•β•β•β•β–ˆβ–ˆβ•‘ #β•šβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β•šβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•”β•β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘ β•šβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•”β•β–ˆβ–ˆβ•‘ β•šβ•β• β–ˆβ–ˆβ•‘ β–ˆβ–ˆβ•‘ β•šβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•”β•β•šβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•”β•β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•‘ # β•šβ•β•β•β•β•β• β•šβ•β•β•β•β•β• β•šβ•β•β•β•β•β•β• β•šβ•β• β•šβ•β•β•β•β•β• β•šβ•β• β•šβ•β• β•šβ•β• β•šβ•β•β•β•β•β• β•šβ•β•β•β•β•β• β•šβ•β•β•β•β•β•β•β•šβ•β•β•β•β•β•β• # # ADD YOUR CUSTOM AUTOMATION TOOLS BELOW # Each tool should be decorated with @mcp.tool() # The Automation Engine App can generate and paste tools here # # Example: # @mcp.tool() # async def my_custom_tool(param1: str, param2: int = 10) -> str: # """Tool description that the AI will see.""" # import httpx # payload = {"param1": param1, "param2": param2} # url = "https://your-webhook-url.com/endpoint" # async with httpx.AsyncClient() as client: # try: # resp = await client.post(url, json=payload) # return f"Success: returned {resp.status_code}" # except Exception as e: # return f"Error: {str(e)}" # ============================================================================= # ============ Template: LinkedIn Poster - Automation Engine Template ============ # Independent tools for each webhook in this template # Total webhooks: 3 @mcp.tool() async def linkedin_poster___automation_engine_template___linkedin__template1_image(caption: str, imageurl: str) -> str: """ Posts an image with a caption to your LinkedIn page. Args: caption: The text content that will appear with the image. (maps to 'caption') imageurl: A direct, public URL to the image file (e.g., ending in .jpg, .png). (maps to 'imageUrl') """ import httpx payload = {"caption": caption, "imageUrl": imageurl} url = "https://webhook.latenode.com/48975/prod/linkedin_template1_image" async with httpx.AsyncClient() as client: try: resp = await client.post(url, json=payload) return f"Success: LinkedIn Poster - Automation Engine Template - Linkedin _Template1_Image returned {resp.status_code}" except Exception as e: return f"Error: {str(e)}" @mcp.tool() async def linkedin_poster___automation_engine_template___linkedin__template1_text(content: str) -> str: """ Posts a text-only update to your LinkedIn page. Args: content: The full text content of the LinkedIn post. (maps to 'content') """ import httpx payload = {"content": content} url = "https://webhook.latenode.com/48975/prod/linkedin_template1_text" async with httpx.AsyncClient() as client: try: resp = await client.post(url, json=payload) return f"Success: LinkedIn Poster - Automation Engine Template - Linkedin _Template1_Text returned {resp.status_code}" except Exception as e: return f"Error: {str(e)}" @mcp.tool() async def linkedin_poster___automation_engine_template___linkedin__template1_video(caption: str, videourl: str) -> str: """ Posts a video with a title and caption to your LinkedIn page. Args: caption: The text content that will appear with the video. (maps to 'caption') videourl: A direct, public URL to the video file (e.g., ending in .mp4). (maps to 'videoUrl') """ import httpx payload = {"caption": caption, "videoUrl": videourl} url = "https://webhook.latenode.com/48975/prod/linkedin_template1_video" async with httpx.AsyncClient() as client: try: resp = await client.post(url, json=payload) return f"Success: LinkedIn Poster - Automation Engine Template - Linkedin _Template1_Video returned {resp.status_code}" except Exception as e: return f"Error: {str(e)}" # ============================================================================= # END CUSTOM TOOLS SECTION # ============================================================================= # ============================================================================= # SERVER STARTUP # ============================================================================= async def startup(): """Initialize all components on server startup.""" logger.info("Starting Bi-Temporal Knowledge Graph MCP Server...") try: FalkorDBDriver.connect() GraphitiMemory.setup_schema() logger.info("βœ… FalkorDB initialized successfully") except Exception as e: logger.error(f"❌ FalkorDB initialization failed: {e}") sys.exit(1) try: await CleanupManager.start() except Exception as e: logger.error(f"Cleanup manager failed to start: {e}") if OPENAI_API_KEY: logger.info("βœ… OpenAI configured - Auto entity extraction enabled") else: logger.warning("⚠️ OPENAI_API_KEY not set - Manual fact addition only") logger.info(f"πŸ“Š Default group: {GROUP_ID}") logger.info(f"⏰ Session TTL: {SESSION_TTL_MINUTES} minutes") logger.info("βœ… Server initialization complete") async def shutdown(): """Clean up resources on server shutdown.""" logger.info("πŸ›‘ Shutting down server...") try: await CleanupManager.stop() except Exception as e: logger.error(f"Error stopping cleanup manager: {e}") try: FalkorDBDriver.disconnect() except Exception as e: logger.error(f"Error closing FalkorDB connection: {e}") logger.info("βœ… Server shutdown complete") if __name__ == "__main__": print(""" +---------------------------------------------------------------+ | Database-Blind Bi-Temporal Knowledge Graph MCP Server | | | | Features: | | - Session-aware episodes with fact invalidation | | - Full bi-temporal tracking (valid_at + invalid_at) | | - Smart conflict resolution (location/employment changes) | | - Auto session cleanup (TTL-based) | | - FalkorDB + OpenAI integration | | | | Custom Tools: Add @mcp.tool() functions in the | | CUSTOM AUTOMATION TOOLS section above | +---------------------------------------------------------------+ """) print(f"πŸš€ Starting server on {SERVER_HOST}:{SERVER_PORT}") print(f"πŸ“‘ MCP endpoint: /sse\n") mcp.run( transport="sse", host=SERVER_HOST, port=SERVER_PORT )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/connectimtiazh/Graphiti-Knowledge-MCP-Server-Starter'

If you have feedback or need assistance with the MCP directory API, please join our Discord server