Skip to main content
Glama
connectimtiazh

Bi-Temporal Knowledge Graph MCP Server

Pasted--Bi-Temporal-Knowledge-Graph-MCP-Server-with-Dynamic-To_1766142030933.txtβ€’31.6 kB
""" Bi-Temporal Knowledge Graph MCP Server with Dynamic Tool Generator =================================================================== This is the main orchestrator that: 1. Initializes FastMCP server 2. Registers core memory tools from memory.py 3. Dynamically discovers and registers tools from tools.py 4. Generates MCP tool code from PostgreSQL configurations 5. Manages the complete lifecycle of the server Architecture: - memory.py: Bi-temporal Graphiti memory with FalkorDB - tools.py: Container for user-generated automation tools - main.py: This file - orchestrates everything """ import os import sys import logging import re import importlib import inspect from typing import Dict, Any, List, Optional from datetime import datetime import psycopg2 from psycopg2.extras import RealDictCursor from fastmcp import FastMCP # Import memory components from memory import ( FalkorDBDriver, SessionStore, GraphitiMemory, CleanupManager, GROUP_ID, SESSION_TTL_MINUTES ) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # ============================================================================= # CONFIGURATION # ============================================================================= # PostgreSQL configuration for webhook/tool storage POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost") POSTGRES_PORT = int(os.getenv("POSTGRES_PORT", "5432")) POSTGRES_DB = os.getenv("POSTGRES_DB", "automation_db") POSTGRES_USER = os.getenv("POSTGRES_USER", "postgres") POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "") # Server configuration SERVER_PORT = int(os.getenv("PORT", "8080")) SERVER_HOST = os.getenv("HOST", "0.0.0.0") # ============================================================================= # INITIALIZE FASTMCP SERVER # ============================================================================= mcp = FastMCP("Bi-Temporal Knowledge Graph MCP Server") # ============================================================================= # POSTGRESQL DATABASE INTERFACE # ============================================================================= class PostgresDB: """PostgreSQL database interface for webhook/tool configurations.""" @staticmethod def get_connection(): """Get a PostgreSQL database connection.""" try: conn = psycopg2.connect( host=POSTGRES_HOST, port=POSTGRES_PORT, database=POSTGRES_DB, user=POSTGRES_USER, password=POSTGRES_PASSWORD ) return conn except Exception as e: logger.error(f"PostgreSQL connection error: {e}") raise @staticmethod def setup_schema(): """Create necessary tables if they don't exist.""" try: conn = PostgresDB.get_connection() cur = conn.cursor() # Webhooks table cur.execute(""" CREATE TABLE IF NOT EXISTS webhooks ( id SERIAL PRIMARY KEY, user_id VARCHAR(255) NOT NULL, name VARCHAR(255) NOT NULL, description TEXT, url TEXT NOT NULL, method VARCHAR(10) DEFAULT 'POST', headers JSONB, template_fields JSONB, item_type VARCHAR(50) DEFAULT 'single', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(user_id, name) ) """) # Templates table (for multi-webhook configurations) cur.execute(""" CREATE TABLE IF NOT EXISTS webhook_templates ( id SERIAL PRIMARY KEY, user_id VARCHAR(255) NOT NULL, name VARCHAR(255) NOT NULL, description TEXT, webhook_ids INTEGER[] NOT NULL, item_type VARCHAR(50) DEFAULT 'multi', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(user_id, name) ) """) conn.commit() cur.close() conn.close() logger.info("βœ… PostgreSQL schema initialized") except Exception as e: logger.error(f"Schema setup error: {e}") raise @staticmethod def get_webhook(user_id: str, name: str) -> Optional[Dict[str, Any]]: """Get a webhook configuration by user_id and name.""" try: conn = PostgresDB.get_connection() cur = conn.cursor(cursor_factory=RealDictCursor) cur.execute(""" SELECT * FROM webhooks WHERE user_id = %s AND name = %s """, (user_id, name)) result = cur.fetchone() cur.close() conn.close() return dict(result) if result else None except Exception as e: logger.error(f"Error fetching webhook: {e}") return None @staticmethod def get_webhook_template(user_id: str, name: str) -> Optional[Dict[str, Any]]: """Get a webhook template (multi-webhook) configuration.""" try: conn = PostgresDB.get_connection() cur = conn.cursor(cursor_factory=RealDictCursor) cur.execute(""" SELECT * FROM webhook_templates WHERE user_id = %s AND name = %s """, (user_id, name)) result = cur.fetchone() if result: # Fetch associated webhooks cur.execute(""" SELECT * FROM webhooks WHERE id = ANY(%s) """, (result['webhook_ids'],)) webhooks = [dict(row) for row in cur.fetchall()] result = dict(result) result['webhooks'] = webhooks cur.close() conn.close() return result except Exception as e: logger.error(f"Error fetching webhook template: {e}") return None @staticmethod def get_all_webhooks(user_id: str) -> List[Dict[str, Any]]: """Get all webhooks for a user.""" try: conn = PostgresDB.get_connection() cur = conn.cursor(cursor_factory=RealDictCursor) cur.execute(""" SELECT * FROM webhooks WHERE user_id = %s ORDER BY created_at DESC """, (user_id,)) results = [dict(row) for row in cur.fetchall()] cur.close() conn.close() return results except Exception as e: logger.error(f"Error fetching webhooks: {e}") return [] @staticmethod def get_all_templates(user_id: str) -> List[Dict[str, Any]]: """Get all webhook templates for a user.""" try: conn = PostgresDB.get_connection() cur = conn.cursor(cursor_factory=RealDictCursor) cur.execute(""" SELECT * FROM webhook_templates WHERE user_id = %s ORDER BY created_at DESC """, (user_id,)) results = [dict(row) for row in cur.fetchall()] cur.close() conn.close() return results except Exception as e: logger.error(f"Error fetching templates: {e}") return [] # ============================================================================= # TOOL CODE GENERATOR # ============================================================================= class ToolCodeGenerator: """Generate Python code for MCP tools from database configurations.""" @staticmethod def sanitize_function_name(name: str) -> str: """ Sanitize function name: lowercase, replace spaces/hyphens with underscores, remove special characters. """ # Convert to lowercase name = name.lower() # Replace spaces and hyphens with underscores name = re.sub(r'[\s\-]+', '_', name) # Remove any character that's not alphanumeric or underscore name = re.sub(r'[^\w]', '', name) # Ensure it starts with a letter or underscore if name and not name[0].isalpha() and name[0] != '_': name = f'tool_{name}' # Ensure it's not empty if not name: name = 'unnamed_tool' return name @staticmethod def generate_single_webhook_tool( user_id: str, item_name: str, config: Dict[str, Any] ) -> str: """ Generate code for a single webhook tool. Returns: Python code as a string """ func_name = ToolCodeGenerator.sanitize_function_name(item_name) description = config.get('description', f'Execute {item_name} webhook') url = config.get('url', '') method = config.get('method', 'POST') headers = config.get('headers', {}) template_fields = config.get('template_fields', {}) # Build parameter list params = [] param_docs = [] if template_fields and isinstance(template_fields, dict): for field_name, field_config in template_fields.items(): field_type = field_config.get('type', 'str') field_desc = field_config.get('description', f'{field_name} field') required = field_config.get('required', False) default = field_config.get('default') # Build parameter if required: params.append(f"{field_name}: {field_type}") else: default_val = f'"{default}"' if isinstance(default, str) else default default_val = default_val if default_val is not None else 'None' params.append(f"{field_name}: Optional[{field_type}] = {default_val}") param_docs.append(f" {field_name}: {field_desc}") params_str = ',\n '.join(params) if params else '' params_doc_str = '\n'.join(param_docs) if param_docs else ' No parameters' # Build data dictionary if template_fields and isinstance(template_fields, dict): data_items = [f" '{k}': {k}" for k in template_fields.keys()] data_dict = "{\n" + ',\n'.join(data_items) + "\n }" else: data_dict = "{}" # Build headers headers_str = f"{headers}" if headers else "None" code = f''' @mcp.tool() async def {func_name}( {params_str} ) -> Dict[str, Any]: """ {description} Args: {params_doc_str} Returns: Dict with execution result """ from tools import execute_webhook data = {data_dict} result = await execute_webhook( url="{url}", data=data, method="{method}", headers={headers_str} ) logger.info(f"Executed {func_name}: {{result.get('success')}}") return result ''' return code @staticmethod def generate_multi_webhook_tool( user_id: str, item_name: str, config: Dict[str, Any] ) -> str: """ Generate code for a multi-webhook tool that fires webhooks in parallel. Returns: Python code as a string """ func_name = ToolCodeGenerator.sanitize_function_name(item_name) description = config.get('description', f'Execute {item_name} multi-webhook') webhooks = config.get('webhooks', []) # Collect all unique fields across webhooks all_fields = {} for webhook in webhooks: template_fields = webhook.get('template_fields', {}) if isinstance(template_fields, dict): for field_name, field_config in template_fields.items(): if field_name not in all_fields: all_fields[field_name] = field_config # Build parameter list params = [] param_docs = [] for field_name, field_config in all_fields.items(): field_type = field_config.get('type', 'str') field_desc = field_config.get('description', f'{field_name} field') required = field_config.get('required', False) default = field_config.get('default') if required: params.append(f"{field_name}: {field_type}") else: default_val = f'"{default}"' if isinstance(default, str) else default default_val = default_val if default_val is not None else 'None' params.append(f"{field_name}: Optional[{field_type}] = {default_val}") param_docs.append(f" {field_name}: {field_desc}") params_str = ',\n '.join(params) if params else '' params_doc_str = '\n'.join(param_docs) if param_docs else ' No parameters' # Build webhook configurations webhook_configs = [] for webhook in webhooks: url = webhook.get('url', '') method = webhook.get('method', 'POST') headers = webhook.get('headers', {}) template_fields = webhook.get('template_fields', {}) # Build data dict for this webhook if template_fields and isinstance(template_fields, dict): data_items = [f"'{k}': {k}" for k in template_fields.keys()] data_dict = "{" + ', '.join(data_items) + "}" else: data_dict = "{}" headers_str = f"{headers}" if headers else "None" webhook_config = f'''{{ 'url': "{url}", 'data': {data_dict}, 'method': "{method}", 'headers': {headers_str} }}''' webhook_configs.append(webhook_config) webhooks_list = ',\n '.join(webhook_configs) code = f''' @mcp.tool() async def {func_name}( {params_str} ) -> Dict[str, Any]: """ {description} Executes multiple webhooks in parallel. Args: {params_doc_str} Returns: Dict with results from all webhooks """ from tools import execute_parallel_webhooks webhooks = [ {webhooks_list} ] result = await execute_parallel_webhooks(webhooks) logger.info(f"Executed {func_name}: {{result.get('successful')}}/{{result.get('total')}} successful") return result ''' return code # ============================================================================= # MAIN TOOL GENERATOR FUNCTION # ============================================================================= def generate_mcp_tool_code_from_db( user_id: str, item_name: str, item_type: str = 'single' ) -> str: """ Generate MCP tool code from database configuration. This is the main entry point for tool generation. Args: user_id: User identifier item_name: Name of the webhook or template item_type: 'single' or 'multi' Returns: Python code as a string, ready to be appended to tools.py """ try: if item_type == 'single': config = PostgresDB.get_webhook(user_id, item_name) if not config: logger.error(f"Webhook not found: {user_id}/{item_name}") return f"# Error: Webhook '{item_name}' not found for user '{user_id}'\n" return ToolCodeGenerator.generate_single_webhook_tool( user_id, item_name, config ) elif item_type == 'multi': config = PostgresDB.get_webhook_template(user_id, item_name) if not config: logger.error(f"Template not found: {user_id}/{item_name}") return f"# Error: Template '{item_name}' not found for user '{user_id}'\n" return ToolCodeGenerator.generate_multi_webhook_tool( user_id, item_name, config ) else: logger.error(f"Unknown item_type: {item_type}") return f"# Error: Unknown item_type '{item_type}'\n" except Exception as e: logger.error(f"Error generating tool code: {e}") return f"# Error generating tool code: {e}\n" # ============================================================================= # DYNAMIC TOOL LOADER # ============================================================================= def load_dynamic_tools(): """ Dynamically import and register tools from tools.py. This function discovers all @mcp.tool decorated functions in tools.py and registers them with the FastMCP server. """ try: # Import tools module import tools # Reload to get any newly added tools importlib.reload(tools) # Find all functions decorated with @mcp.tool tool_count = 0 for name, obj in inspect.getmembers(tools): if inspect.isfunction(obj) or inspect.iscoroutinefunction(obj): # Check if it's a tool by looking for MCP attributes # Note: FastMCP tools are already registered when decorated if hasattr(obj, '__name__') and not name.startswith('_'): # Functions in tools.py that use @mcp.tool are auto-registered tool_count += 1 logger.info(f"βœ… Loaded {tool_count} dynamic tools from tools.py") except Exception as e: logger.error(f"Error loading dynamic tools: {e}") # ============================================================================= # CORE MEMORY TOOLS # ============================================================================= @mcp.tool() async def add_fact( source_entity: str, relation: str, target_entity: str, group_id: Optional[str] = None, session_id: Optional[str] = None, valid_at: Optional[str] = None ) -> Dict[str, Any]: """ Add a new fact to the knowledge graph with bi-temporal tracking. Automatically invalidates previous facts for location/employment changes. Args: source_entity: Source entity name relation: Relationship type (e.g., "works at", "lives in") target_entity: Target entity name group_id: Optional group identifier (defaults to DEFAULT_GROUP_ID) session_id: Optional session identifier valid_at: Optional timestamp (ISO format) when fact became valid Returns: Dict with success status and fact details """ return GraphitiMemory.add_fact( source_entity=source_entity, relation=relation, target_entity=target_entity, group_id=group_id, session_id=session_id, valid_at=valid_at ) @mcp.tool() async def add_message( content: str, session_id: str, group_id: Optional[str] = None, extract_entities: bool = True ) -> Dict[str, Any]: """ Add a message and automatically extract entities if enabled. Uses OpenAI to extract entities and relationships from natural language. Args: content: Message content session_id: Session identifier group_id: Optional group identifier extract_entities: Whether to extract entities using AI (requires OPENAI_API_KEY) Returns: Dict with success status and extracted facts """ return GraphitiMemory.add_message_with_extraction( content=content, session_id=session_id, group_id=group_id, extract_entities=extract_entities ) @mcp.tool() async def query_facts( entity_name: Optional[str] = None, group_id: Optional[str] = None, include_invalid: bool = False, max_facts: int = 20 ) -> Dict[str, Any]: """ Query facts from the knowledge graph. Args: entity_name: Optional entity to filter by group_id: Optional group identifier include_invalid: Whether to include invalidated facts max_facts: Maximum number of facts to return Returns: Dict with facts and metadata """ return GraphitiMemory.query_facts( entity_name=entity_name, group_id=group_id, include_invalid=include_invalid, max_facts=max_facts ) @mcp.tool() async def query_at_time( timestamp: str, entity_name: Optional[str] = None, group_id: Optional[str] = None, max_facts: int = 20 ) -> Dict[str, Any]: """ Query what facts were valid at a specific point in time. Args: timestamp: ISO format timestamp (e.g., "2024-01-15T10:30:00Z") entity_name: Optional entity to filter by group_id: Optional group identifier max_facts: Maximum number of facts to return Returns: Dict with facts valid at the specified time """ return GraphitiMemory.query_at_time( timestamp=timestamp, entity_name=entity_name, group_id=group_id, max_facts=max_facts ) @mcp.tool() async def get_episodes( group_ids: Optional[List[str]] = None, max_episodes: int = 10 ) -> Dict[str, Any]: """ Get recent episodes (conversation sessions) from the graph. Args: group_ids: Optional list of group identifiers max_episodes: Maximum number of episodes to return Returns: Dict with episode information """ return GraphitiMemory.get_episodes( group_ids=group_ids, max_episodes=max_episodes ) @mcp.tool() async def clear_graph( group_ids: Optional[List[str]] = None ) -> Dict[str, Any]: """ Clear all data for specified group IDs. WARNING: This permanently deletes data! Args: group_ids: Optional list of group identifiers (defaults to DEFAULT_GROUP_ID) Returns: Dict with deletion count """ return GraphitiMemory.clear_graph(group_ids=group_ids) @mcp.tool() async def get_status() -> Dict[str, Any]: """ Get server status and database connection info. Returns: Dict with comprehensive server statistics """ try: result = FalkorDBDriver.query("MATCH (n) RETURN count(n) as count") node_count = result[0]['count'] if result else 0 stats = FalkorDBDriver.query(""" MATCH (n) WITH labels(n)[0] as label, count(*) as count RETURN label, count ORDER BY count DESC """) rel_stats = FalkorDBDriver.query(""" MATCH ()-[r]->() WITH type(r) as rel_type, count(*) as count RETURN rel_type, count ORDER BY count DESC """) fact_validity = FalkorDBDriver.query(""" MATCH ()-[r:RELATES_TO]->() RETURN count(CASE WHEN r.invalid_at IS NULL THEN 1 END) as valid_facts, count(CASE WHEN r.invalid_at IS NOT NULL THEN 1 END) as invalid_facts """) session_stats = SessionStore.get_stats() return { 'success': True, 'status': 'ok', 'database': 'FalkorDB', 'host': f'{os.getenv("FALKORDB_HOST")}:{os.getenv("FALKORDB_PORT")}', 'graph': os.getenv("FALKORDB_DATABASE"), 'total_nodes': node_count, 'node_types': {s['label']: s['count'] for s in stats} if stats else {}, 'relationship_types': {s['rel_type']: s['count'] for s in rel_stats} if rel_stats else {}, 'fact_validity': fact_validity[0] if fact_validity else {}, 'session_store': session_stats, 'db_connected': FalkorDBDriver.is_connected(), 'openai_enabled': os.getenv('OPENAI_API_KEY') is not None, 'postgres_configured': os.getenv('POSTGRES_PASSWORD') is not None, 'bi_temporal': True, 'smart_conflict_resolution': True, 'default_group': GROUP_ID, 'resource_management': { 'session_ttl_minutes': SESSION_TTL_MINUTES, 'max_sessions': os.getenv('MAX_SESSIONS'), 'cleanup_interval_seconds': os.getenv('CLEANUP_INTERVAL_SECONDS'), 'connection_idle_timeout': os.getenv('CONNECTION_IDLE_TIMEOUT') } } except Exception as e: logger.error(f"Status check failed: {e}") return { 'success': False, 'status': 'error', 'error': str(e) } @mcp.tool() async def force_cleanup() -> Dict[str, Any]: """ Manually trigger cleanup of expired sessions and idle connections. Returns: Dict with cleanup statistics """ try: expired_count = SessionStore.cleanup_expired() db_was_connected = FalkorDBDriver.is_connected() if FalkorDBDriver._should_disconnect(): FalkorDBDriver.disconnect() return { 'success': True, 'message': 'Cleanup completed', 'expired_sessions_removed': expired_count, 'db_connection_closed': db_was_connected and not FalkorDBDriver.is_connected() } except Exception as e: logger.error(f"Force cleanup failed: {e}") return {'success': False, 'error': str(e)} @mcp.tool() async def generate_tool_from_db( user_id: str, item_name: str, item_type: str = 'single' ) -> Dict[str, Any]: """ Generate and append a new MCP tool from database configuration. This tool reads webhook/template data from PostgreSQL and generates Python code that is appended to tools.py, then dynamically loaded. Args: user_id: User identifier item_name: Name of the webhook or template item_type: 'single' or 'multi' Returns: Dict with generation status and generated code """ try: # Generate code code = generate_mcp_tool_code_from_db(user_id, item_name, item_type) if code.startswith('# Error'): return { 'success': False, 'error': 'Tool generation failed', 'code': code } # Append to tools.py tools_file = os.path.join(os.path.dirname(__file__), 'tools.py') with open(tools_file, 'a') as f: f.write('\n\n') f.write(code) # Reload dynamic tools load_dynamic_tools() return { 'success': True, 'message': f'Generated and loaded tool for {item_name}', 'item_type': item_type, 'code_preview': code[:500] + '...' if len(code) > 500 else code } except Exception as e: logger.error(f"Error generating tool: {e}") return { 'success': False, 'error': str(e) } # ============================================================================= # INITIALIZATION AND STARTUP # ============================================================================= async def startup(): """Initialize all components on server startup.""" logger.info("πŸš€ Starting Bi-Temporal Knowledge Graph MCP Server...") # Initialize FalkorDB try: FalkorDBDriver.connect() GraphitiMemory.setup_schema() logger.info("βœ… FalkorDB initialized") except Exception as e: logger.error(f"❌ FalkorDB initialization failed: {e}") sys.exit(1) # Initialize PostgreSQL (optional) try: if POSTGRES_PASSWORD: PostgresDB.setup_schema() logger.info("βœ… PostgreSQL initialized") else: logger.warning("⚠️ PostgreSQL not configured - Tool generation disabled") except Exception as e: logger.warning(f"⚠️ PostgreSQL initialization failed: {e}") # Load dynamic tools try: load_dynamic_tools() except Exception as e: logger.warning(f"⚠️ Dynamic tool loading failed: {e}") # Start cleanup manager try: await CleanupManager.start() except Exception as e: logger.error(f"⚠️ Cleanup manager failed to start: {e}") # Log configuration if os.getenv('OPENAI_API_KEY'): logger.info("βœ… OpenAI configured - Auto entity extraction enabled") else: logger.warning("⚠️ OPENAI_API_KEY not set - Manual fact addition only") logger.info(f"πŸ“Š Default group: {GROUP_ID}") logger.info(f"⏰ Session TTL: {SESSION_TTL_MINUTES} minutes") logger.info("βœ… Server initialization complete") async def shutdown(): """Clean up resources on server shutdown.""" logger.info("πŸ›‘ Shutting down server...") try: await CleanupManager.stop() except Exception as e: logger.error(f"Error stopping cleanup manager: {e}") try: FalkorDBDriver.disconnect() except Exception as e: logger.error(f"Error closing FalkorDB connection: {e}") logger.info("πŸ‘‹ Server shutdown complete") # ============================================================================= # MAIN ENTRY POINT # ============================================================================= if __name__ == "__main__": print(""" ╔═══════════════════════════════════════════════════════════════╗ β•‘ Bi-Temporal Knowledge Graph MCP Server β•‘ β•‘ with Dynamic Automation Tool Generator β•‘ β•‘ β•‘ β•‘ Features: β•‘ β•‘ ✨ Session-aware episodes with fact invalidation β•‘ β•‘ ⏰ Full bi-temporal tracking (valid_at + invalid_at) β•‘ β•‘ 🧠 Smart conflict resolution (location/employment changes) β•‘ β•‘ πŸ”„ Auto session cleanup (TTL-based) β•‘ β•‘ πŸ› οΈ Dynamic tool generation from PostgreSQL β•‘ β•‘ πŸš€ FalkorDB + OpenAI + PostgreSQL integration β•‘ β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β• """) print(f"πŸ“‘ Starting server on {SERVER_HOST}:{SERVER_PORT}") print(f"πŸ”— MCP endpoint: sse\n") # Run server with lifecycle hooks mcp.run( transport="sse", host=SERVER_HOST, port=SERVER_PORT)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/connectimtiazh/Graphiti-Knowledge-MCP-Server-Starter'

If you have feedback or need assistance with the MCP directory API, please join our Discord server