"""
Bi-Temporal Knowledge Graph MCP Server with Dynamic Tool Generator
===================================================================
This is the main orchestrator that:
1. Initializes FastMCP server
2. Registers core memory tools from memory.py
3. Dynamically discovers and registers tools from tools.py
4. Generates MCP tool code from PostgreSQL configurations
5. Manages the complete lifecycle of the server
Architecture:
- memory.py: Bi-temporal Graphiti memory with FalkorDB
- tools.py: Container for user-generated automation tools
- main.py: This file - orchestrates everything
"""
import os
import sys
import logging
import re
import importlib
import inspect
from typing import Dict, Any, List, Optional
from datetime import datetime
import psycopg2
from psycopg2.extras import RealDictCursor
from fastmcp import FastMCP
# Import memory components
from memory import (
FalkorDBDriver,
SessionStore,
GraphitiMemory,
CleanupManager,
GROUP_ID,
SESSION_TTL_MINUTES
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# =============================================================================
# CONFIGURATION
# =============================================================================
# PostgreSQL configuration for webhook/tool storage
POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost")
POSTGRES_PORT = int(os.getenv("POSTGRES_PORT", "5432"))
POSTGRES_DB = os.getenv("POSTGRES_DB", "automation_db")
POSTGRES_USER = os.getenv("POSTGRES_USER", "postgres")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "")
# Server configuration
SERVER_PORT = int(os.getenv("PORT", "8080"))
SERVER_HOST = os.getenv("HOST", "0.0.0.0")
# =============================================================================
# INITIALIZE FASTMCP SERVER
# =============================================================================
mcp = FastMCP("Bi-Temporal Knowledge Graph MCP Server")
# =============================================================================
# POSTGRESQL DATABASE INTERFACE
# =============================================================================
class PostgresDB:
"""PostgreSQL database interface for webhook/tool configurations."""
@staticmethod
def get_connection():
"""Get a PostgreSQL database connection."""
try:
conn = psycopg2.connect(
host=POSTGRES_HOST,
port=POSTGRES_PORT,
database=POSTGRES_DB,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD
)
return conn
except Exception as e:
logger.error(f"PostgreSQL connection error: {e}")
raise
@staticmethod
def setup_schema():
"""Create necessary tables if they don't exist."""
try:
conn = PostgresDB.get_connection()
cur = conn.cursor()
# Webhooks table
cur.execute("""
CREATE TABLE IF NOT EXISTS webhooks (
id SERIAL PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
description TEXT,
url TEXT NOT NULL,
method VARCHAR(10) DEFAULT 'POST',
headers JSONB,
template_fields JSONB,
item_type VARCHAR(50) DEFAULT 'single',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, name)
)
""")
# Templates table (for multi-webhook configurations)
cur.execute("""
CREATE TABLE IF NOT EXISTS webhook_templates (
id SERIAL PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
description TEXT,
webhook_ids INTEGER[] NOT NULL,
item_type VARCHAR(50) DEFAULT 'multi',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, name)
)
""")
conn.commit()
cur.close()
conn.close()
logger.info("β
PostgreSQL schema initialized")
except Exception as e:
logger.error(f"Schema setup error: {e}")
raise
@staticmethod
def get_webhook(user_id: str, name: str) -> Optional[Dict[str, Any]]:
"""Get a webhook configuration by user_id and name."""
try:
conn = PostgresDB.get_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
cur.execute("""
SELECT * FROM webhooks
WHERE user_id = %s AND name = %s
""", (user_id, name))
result = cur.fetchone()
cur.close()
conn.close()
return dict(result) if result else None
except Exception as e:
logger.error(f"Error fetching webhook: {e}")
return None
@staticmethod
def get_webhook_template(user_id: str, name: str) -> Optional[Dict[str, Any]]:
"""Get a webhook template (multi-webhook) configuration."""
try:
conn = PostgresDB.get_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
cur.execute("""
SELECT * FROM webhook_templates
WHERE user_id = %s AND name = %s
""", (user_id, name))
result = cur.fetchone()
if result:
# Fetch associated webhooks
cur.execute("""
SELECT * FROM webhooks
WHERE id = ANY(%s)
""", (result['webhook_ids'],))
webhooks = [dict(row) for row in cur.fetchall()]
result = dict(result)
result['webhooks'] = webhooks
cur.close()
conn.close()
return result
except Exception as e:
logger.error(f"Error fetching webhook template: {e}")
return None
@staticmethod
def get_all_webhooks(user_id: str) -> List[Dict[str, Any]]:
"""Get all webhooks for a user."""
try:
conn = PostgresDB.get_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
cur.execute("""
SELECT * FROM webhooks
WHERE user_id = %s
ORDER BY created_at DESC
""", (user_id,))
results = [dict(row) for row in cur.fetchall()]
cur.close()
conn.close()
return results
except Exception as e:
logger.error(f"Error fetching webhooks: {e}")
return []
@staticmethod
def get_all_templates(user_id: str) -> List[Dict[str, Any]]:
"""Get all webhook templates for a user."""
try:
conn = PostgresDB.get_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
cur.execute("""
SELECT * FROM webhook_templates
WHERE user_id = %s
ORDER BY created_at DESC
""", (user_id,))
results = [dict(row) for row in cur.fetchall()]
cur.close()
conn.close()
return results
except Exception as e:
logger.error(f"Error fetching templates: {e}")
return []
# =============================================================================
# TOOL CODE GENERATOR
# =============================================================================
class ToolCodeGenerator:
"""Generate Python code for MCP tools from database configurations."""
@staticmethod
def sanitize_function_name(name: str) -> str:
"""
Sanitize function name: lowercase, replace spaces/hyphens with underscores,
remove special characters.
"""
# Convert to lowercase
name = name.lower()
# Replace spaces and hyphens with underscores
name = re.sub(r'[\s\-]+', '_', name)
# Remove any character that's not alphanumeric or underscore
name = re.sub(r'[^\w]', '', name)
# Ensure it starts with a letter or underscore
if name and not name[0].isalpha() and name[0] != '_':
name = f'tool_{name}'
# Ensure it's not empty
if not name:
name = 'unnamed_tool'
return name
@staticmethod
def generate_single_webhook_tool(
user_id: str,
item_name: str,
config: Dict[str, Any]
) -> str:
"""
Generate code for a single webhook tool.
Returns:
Python code as a string
"""
func_name = ToolCodeGenerator.sanitize_function_name(item_name)
description = config.get('description', f'Execute {item_name} webhook')
url = config.get('url', '')
method = config.get('method', 'POST')
headers = config.get('headers', {})
template_fields = config.get('template_fields', {})
# Build parameter list
params = []
param_docs = []
if template_fields and isinstance(template_fields, dict):
for field_name, field_config in template_fields.items():
field_type = field_config.get('type', 'str')
field_desc = field_config.get('description', f'{field_name} field')
required = field_config.get('required', False)
default = field_config.get('default')
# Build parameter
if required:
params.append(f"{field_name}: {field_type}")
else:
default_val = f'"{default}"' if isinstance(default, str) else default
default_val = default_val if default_val is not None else 'None'
params.append(f"{field_name}: Optional[{field_type}] = {default_val}")
param_docs.append(f" {field_name}: {field_desc}")
params_str = ',\n '.join(params) if params else ''
params_doc_str = '\n'.join(param_docs) if param_docs else ' No parameters'
# Build data dictionary
if template_fields and isinstance(template_fields, dict):
data_items = [f" '{k}': {k}" for k in template_fields.keys()]
data_dict = "{\n" + ',\n'.join(data_items) + "\n }"
else:
data_dict = "{}"
# Build headers
headers_str = f"{headers}" if headers else "None"
code = f'''
@mcp.tool()
async def {func_name}(
{params_str}
) -> Dict[str, Any]:
"""
{description}
Args:
{params_doc_str}
Returns:
Dict with execution result
"""
from tools import execute_webhook
data = {data_dict}
result = await execute_webhook(
url="{url}",
data=data,
method="{method}",
headers={headers_str}
)
logger.info(f"Executed {func_name}: {{result.get('success')}}")
return result
'''
return code
@staticmethod
def generate_multi_webhook_tool(
user_id: str,
item_name: str,
config: Dict[str, Any]
) -> str:
"""
Generate code for a multi-webhook tool that fires webhooks in parallel.
Returns:
Python code as a string
"""
func_name = ToolCodeGenerator.sanitize_function_name(item_name)
description = config.get('description', f'Execute {item_name} multi-webhook')
webhooks = config.get('webhooks', [])
# Collect all unique fields across webhooks
all_fields = {}
for webhook in webhooks:
template_fields = webhook.get('template_fields', {})
if isinstance(template_fields, dict):
for field_name, field_config in template_fields.items():
if field_name not in all_fields:
all_fields[field_name] = field_config
# Build parameter list
params = []
param_docs = []
for field_name, field_config in all_fields.items():
field_type = field_config.get('type', 'str')
field_desc = field_config.get('description', f'{field_name} field')
required = field_config.get('required', False)
default = field_config.get('default')
if required:
params.append(f"{field_name}: {field_type}")
else:
default_val = f'"{default}"' if isinstance(default, str) else default
default_val = default_val if default_val is not None else 'None'
params.append(f"{field_name}: Optional[{field_type}] = {default_val}")
param_docs.append(f" {field_name}: {field_desc}")
params_str = ',\n '.join(params) if params else ''
params_doc_str = '\n'.join(param_docs) if param_docs else ' No parameters'
# Build webhook configurations
webhook_configs = []
for webhook in webhooks:
url = webhook.get('url', '')
method = webhook.get('method', 'POST')
headers = webhook.get('headers', {})
template_fields = webhook.get('template_fields', {})
# Build data dict for this webhook
if template_fields and isinstance(template_fields, dict):
data_items = [f"'{k}': {k}" for k in template_fields.keys()]
data_dict = "{" + ', '.join(data_items) + "}"
else:
data_dict = "{}"
headers_str = f"{headers}" if headers else "None"
webhook_config = f'''{{
'url': "{url}",
'data': {data_dict},
'method': "{method}",
'headers': {headers_str}
}}'''
webhook_configs.append(webhook_config)
webhooks_list = ',\n '.join(webhook_configs)
code = f'''
@mcp.tool()
async def {func_name}(
{params_str}
) -> Dict[str, Any]:
"""
{description}
Executes multiple webhooks in parallel.
Args:
{params_doc_str}
Returns:
Dict with results from all webhooks
"""
from tools import execute_parallel_webhooks
webhooks = [
{webhooks_list}
]
result = await execute_parallel_webhooks(webhooks)
logger.info(f"Executed {func_name}: {{result.get('successful')}}/{{result.get('total')}} successful")
return result
'''
return code
# =============================================================================
# MAIN TOOL GENERATOR FUNCTION
# =============================================================================
def generate_mcp_tool_code_from_db(
user_id: str,
item_name: str,
item_type: str = 'single'
) -> str:
"""
Generate MCP tool code from database configuration.
This is the main entry point for tool generation.
Args:
user_id: User identifier
item_name: Name of the webhook or template
item_type: 'single' or 'multi'
Returns:
Python code as a string, ready to be appended to tools.py
"""
try:
if item_type == 'single':
config = PostgresDB.get_webhook(user_id, item_name)
if not config:
logger.error(f"Webhook not found: {user_id}/{item_name}")
return f"# Error: Webhook '{item_name}' not found for user '{user_id}'\n"
return ToolCodeGenerator.generate_single_webhook_tool(
user_id, item_name, config
)
elif item_type == 'multi':
config = PostgresDB.get_webhook_template(user_id, item_name)
if not config:
logger.error(f"Template not found: {user_id}/{item_name}")
return f"# Error: Template '{item_name}' not found for user '{user_id}'\n"
return ToolCodeGenerator.generate_multi_webhook_tool(
user_id, item_name, config
)
else:
logger.error(f"Unknown item_type: {item_type}")
return f"# Error: Unknown item_type '{item_type}'\n"
except Exception as e:
logger.error(f"Error generating tool code: {e}")
return f"# Error generating tool code: {e}\n"
# =============================================================================
# DYNAMIC TOOL LOADER
# =============================================================================
def load_dynamic_tools():
"""
Dynamically import and register tools from tools.py.
This function discovers all @mcp.tool decorated functions in tools.py
and registers them with the FastMCP server.
"""
try:
# Import tools module
import tools
# Reload to get any newly added tools
importlib.reload(tools)
# Find all functions decorated with @mcp.tool
tool_count = 0
for name, obj in inspect.getmembers(tools):
if inspect.isfunction(obj) or inspect.iscoroutinefunction(obj):
# Check if it's a tool by looking for MCP attributes
# Note: FastMCP tools are already registered when decorated
if hasattr(obj, '__name__') and not name.startswith('_'):
# Functions in tools.py that use @mcp.tool are auto-registered
tool_count += 1
logger.info(f"β
Loaded {tool_count} dynamic tools from tools.py")
except Exception as e:
logger.error(f"Error loading dynamic tools: {e}")
# =============================================================================
# CORE MEMORY TOOLS
# =============================================================================
@mcp.tool()
async def add_fact(
source_entity: str,
relation: str,
target_entity: str,
group_id: Optional[str] = None,
session_id: Optional[str] = None,
valid_at: Optional[str] = None
) -> Dict[str, Any]:
"""
Add a new fact to the knowledge graph with bi-temporal tracking.
Automatically invalidates previous facts for location/employment changes.
Args:
source_entity: Source entity name
relation: Relationship type (e.g., "works at", "lives in")
target_entity: Target entity name
group_id: Optional group identifier (defaults to DEFAULT_GROUP_ID)
session_id: Optional session identifier
valid_at: Optional timestamp (ISO format) when fact became valid
Returns:
Dict with success status and fact details
"""
return GraphitiMemory.add_fact(
source_entity=source_entity,
relation=relation,
target_entity=target_entity,
group_id=group_id,
session_id=session_id,
valid_at=valid_at
)
@mcp.tool()
async def add_message(
content: str,
session_id: str,
group_id: Optional[str] = None,
extract_entities: bool = True
) -> Dict[str, Any]:
"""
Add a message and automatically extract entities if enabled.
Uses OpenAI to extract entities and relationships from natural language.
Args:
content: Message content
session_id: Session identifier
group_id: Optional group identifier
extract_entities: Whether to extract entities using AI (requires OPENAI_API_KEY)
Returns:
Dict with success status and extracted facts
"""
return GraphitiMemory.add_message_with_extraction(
content=content,
session_id=session_id,
group_id=group_id,
extract_entities=extract_entities
)
@mcp.tool()
async def query_facts(
entity_name: Optional[str] = None,
group_id: Optional[str] = None,
include_invalid: bool = False,
max_facts: int = 20
) -> Dict[str, Any]:
"""
Query facts from the knowledge graph.
Args:
entity_name: Optional entity to filter by
group_id: Optional group identifier
include_invalid: Whether to include invalidated facts
max_facts: Maximum number of facts to return
Returns:
Dict with facts and metadata
"""
return GraphitiMemory.query_facts(
entity_name=entity_name,
group_id=group_id,
include_invalid=include_invalid,
max_facts=max_facts
)
@mcp.tool()
async def query_at_time(
timestamp: str,
entity_name: Optional[str] = None,
group_id: Optional[str] = None,
max_facts: int = 20
) -> Dict[str, Any]:
"""
Query what facts were valid at a specific point in time.
Args:
timestamp: ISO format timestamp (e.g., "2024-01-15T10:30:00Z")
entity_name: Optional entity to filter by
group_id: Optional group identifier
max_facts: Maximum number of facts to return
Returns:
Dict with facts valid at the specified time
"""
return GraphitiMemory.query_at_time(
timestamp=timestamp,
entity_name=entity_name,
group_id=group_id,
max_facts=max_facts
)
@mcp.tool()
async def get_episodes(
group_ids: Optional[List[str]] = None,
max_episodes: int = 10
) -> Dict[str, Any]:
"""
Get recent episodes (conversation sessions) from the graph.
Args:
group_ids: Optional list of group identifiers
max_episodes: Maximum number of episodes to return
Returns:
Dict with episode information
"""
return GraphitiMemory.get_episodes(
group_ids=group_ids,
max_episodes=max_episodes
)
@mcp.tool()
async def clear_graph(
group_ids: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Clear all data for specified group IDs.
WARNING: This permanently deletes data!
Args:
group_ids: Optional list of group identifiers (defaults to DEFAULT_GROUP_ID)
Returns:
Dict with deletion count
"""
return GraphitiMemory.clear_graph(group_ids=group_ids)
@mcp.tool()
async def get_status() -> Dict[str, Any]:
"""
Get server status and database connection info.
Returns:
Dict with comprehensive server statistics
"""
try:
result = FalkorDBDriver.query("MATCH (n) RETURN count(n) as count")
node_count = result[0]['count'] if result else 0
stats = FalkorDBDriver.query("""
MATCH (n)
WITH labels(n)[0] as label, count(*) as count
RETURN label, count
ORDER BY count DESC
""")
rel_stats = FalkorDBDriver.query("""
MATCH ()-[r]->()
WITH type(r) as rel_type, count(*) as count
RETURN rel_type, count
ORDER BY count DESC
""")
fact_validity = FalkorDBDriver.query("""
MATCH ()-[r:RELATES_TO]->()
RETURN
count(CASE WHEN r.invalid_at IS NULL THEN 1 END) as valid_facts,
count(CASE WHEN r.invalid_at IS NOT NULL THEN 1 END) as invalid_facts
""")
session_stats = SessionStore.get_stats()
return {
'success': True,
'status': 'ok',
'database': 'FalkorDB',
'host': f'{os.getenv("FALKORDB_HOST")}:{os.getenv("FALKORDB_PORT")}',
'graph': os.getenv("FALKORDB_DATABASE"),
'total_nodes': node_count,
'node_types': {s['label']: s['count'] for s in stats} if stats else {},
'relationship_types': {s['rel_type']: s['count'] for s in rel_stats} if rel_stats else {},
'fact_validity': fact_validity[0] if fact_validity else {},
'session_store': session_stats,
'db_connected': FalkorDBDriver.is_connected(),
'openai_enabled': os.getenv('OPENAI_API_KEY') is not None,
'postgres_configured': os.getenv('POSTGRES_PASSWORD') is not None,
'bi_temporal': True,
'smart_conflict_resolution': True,
'default_group': GROUP_ID,
'resource_management': {
'session_ttl_minutes': SESSION_TTL_MINUTES,
'max_sessions': os.getenv('MAX_SESSIONS'),
'cleanup_interval_seconds': os.getenv('CLEANUP_INTERVAL_SECONDS'),
'connection_idle_timeout': os.getenv('CONNECTION_IDLE_TIMEOUT')
}
}
except Exception as e:
logger.error(f"Status check failed: {e}")
return {
'success': False,
'status': 'error',
'error': str(e)
}
@mcp.tool()
async def force_cleanup() -> Dict[str, Any]:
"""
Manually trigger cleanup of expired sessions and idle connections.
Returns:
Dict with cleanup statistics
"""
try:
expired_count = SessionStore.cleanup_expired()
db_was_connected = FalkorDBDriver.is_connected()
if FalkorDBDriver._should_disconnect():
FalkorDBDriver.disconnect()
return {
'success': True,
'message': 'Cleanup completed',
'expired_sessions_removed': expired_count,
'db_connection_closed': db_was_connected and not FalkorDBDriver.is_connected()
}
except Exception as e:
logger.error(f"Force cleanup failed: {e}")
return {'success': False, 'error': str(e)}
@mcp.tool()
async def generate_tool_from_db(
user_id: str,
item_name: str,
item_type: str = 'single'
) -> Dict[str, Any]:
"""
Generate and append a new MCP tool from database configuration.
This tool reads webhook/template data from PostgreSQL and generates
Python code that is appended to tools.py, then dynamically loaded.
Args:
user_id: User identifier
item_name: Name of the webhook or template
item_type: 'single' or 'multi'
Returns:
Dict with generation status and generated code
"""
try:
# Generate code
code = generate_mcp_tool_code_from_db(user_id, item_name, item_type)
if code.startswith('# Error'):
return {
'success': False,
'error': 'Tool generation failed',
'code': code
}
# Append to tools.py
tools_file = os.path.join(os.path.dirname(__file__), 'tools.py')
with open(tools_file, 'a') as f:
f.write('\n\n')
f.write(code)
# Reload dynamic tools
load_dynamic_tools()
return {
'success': True,
'message': f'Generated and loaded tool for {item_name}',
'item_type': item_type,
'code_preview': code[:500] + '...' if len(code) > 500 else code
}
except Exception as e:
logger.error(f"Error generating tool: {e}")
return {
'success': False,
'error': str(e)
}
# =============================================================================
# INITIALIZATION AND STARTUP
# =============================================================================
async def startup():
"""Initialize all components on server startup."""
logger.info("π Starting Bi-Temporal Knowledge Graph MCP Server...")
# Initialize FalkorDB
try:
FalkorDBDriver.connect()
GraphitiMemory.setup_schema()
logger.info("β
FalkorDB initialized")
except Exception as e:
logger.error(f"β FalkorDB initialization failed: {e}")
sys.exit(1)
# Initialize PostgreSQL (optional)
try:
if POSTGRES_PASSWORD:
PostgresDB.setup_schema()
logger.info("β
PostgreSQL initialized")
else:
logger.warning("β οΈ PostgreSQL not configured - Tool generation disabled")
except Exception as e:
logger.warning(f"β οΈ PostgreSQL initialization failed: {e}")
# Load dynamic tools
try:
load_dynamic_tools()
except Exception as e:
logger.warning(f"β οΈ Dynamic tool loading failed: {e}")
# Start cleanup manager
try:
await CleanupManager.start()
except Exception as e:
logger.error(f"β οΈ Cleanup manager failed to start: {e}")
# Log configuration
if os.getenv('OPENAI_API_KEY'):
logger.info("β
OpenAI configured - Auto entity extraction enabled")
else:
logger.warning("β οΈ OPENAI_API_KEY not set - Manual fact addition only")
logger.info(f"π Default group: {GROUP_ID}")
logger.info(f"β° Session TTL: {SESSION_TTL_MINUTES} minutes")
logger.info("β
Server initialization complete")
async def shutdown():
"""Clean up resources on server shutdown."""
logger.info("π Shutting down server...")
try:
await CleanupManager.stop()
except Exception as e:
logger.error(f"Error stopping cleanup manager: {e}")
try:
FalkorDBDriver.disconnect()
except Exception as e:
logger.error(f"Error closing FalkorDB connection: {e}")
logger.info("π Server shutdown complete")
# =============================================================================
# MAIN ENTRY POINT
# =============================================================================
if __name__ == "__main__":
print("""
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Bi-Temporal Knowledge Graph MCP Server β
β with Dynamic Automation Tool Generator β
β β
β Features: β
β β¨ Session-aware episodes with fact invalidation β
β β° Full bi-temporal tracking (valid_at + invalid_at) β
β π§ Smart conflict resolution (location/employment changes) β
β π Auto session cleanup (TTL-based) β
β π οΈ Dynamic tool generation from PostgreSQL β
β π FalkorDB + OpenAI + PostgreSQL integration β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
""")
print(f"π‘ Starting server on {SERVER_HOST}:{SERVER_PORT}")
print(f"π MCP endpoint: sse\n")
# Run server with lifecycle hooks
mcp.run(
transport="sse",
host=SERVER_HOST,
port=SERVER_PORT)