"""
Example Usage and Test Scenarios
=================================
This file contains practical examples and test scenarios for the
Bi-Temporal Knowledge Graph MCP Server.
Run with: python examples.py
"""
import asyncio
import sys
from datetime import datetime, timedelta
# Import the MCP tools (assumes server is running)
# In practice, these would be called via MCP protocol
class ExampleScenarios:
"""Collection of example usage scenarios."""
@staticmethod
async def scenario_1_basic_facts():
"""
Scenario 1: Adding Basic Facts
Demonstrates: Simple fact addition and querying
"""
print("\n" + "="*60)
print("SCENARIO 1: Basic Facts")
print("="*60)
# Add some basic facts
facts = [
("Alice", "knows", "Bob"),
("Alice", "works at", "TechCorp"),
("Bob", "lives in", "New York"),
("Bob", "works at", "DataCorp")
]
print("\n1. Adding facts to the graph...")
for source, relation, target in facts:
print(f" Adding: {source} -{relation}-> {target}")
# In practice: await add_fact(source, relation, target)
print("\n2. Querying facts for Alice...")
# In practice: result = await query_facts(entity_name="Alice")
print(" Expected results:")
print(" - Alice knows Bob")
print(" - Alice works at TechCorp")
print("\n✅ Scenario 1 complete!")
@staticmethod
async def scenario_2_employment_change():
"""
Scenario 2: Employment Change with Automatic Invalidation
Demonstrates: Smart conflict resolution
"""
print("\n" + "="*60)
print("SCENARIO 2: Employment Change")
print("="*60)
print("\n1. Initial employment...")
# await add_fact("John", "works at", "Startup Inc")
print(" John works at Startup Inc")
# Wait a bit (simulated)
print("\n2. Job change after 6 months...")
# await add_fact("John", "works at", "BigTech Corp")
print(" John works at BigTech Corp")
print(" ⚠️ Previous employment automatically invalidated!")
print("\n3. Querying current employment...")
# result = await query_facts(entity_name="John", include_invalid=False)
print(" Result: John works at BigTech Corp")
print("\n4. Querying employment history...")
# result = await query_facts(entity_name="John", include_invalid=True)
print(" Results:")
print(" - John works at BigTech Corp (valid)")
print(" - John works at Startup Inc (invalidated)")
print("\n✅ Scenario 2 complete!")
@staticmethod
async def scenario_3_temporal_query():
"""
Scenario 3: Temporal Queries
Demonstrates: Querying facts at specific points in time
"""
print("\n" + "="*60)
print("SCENARIO 3: Temporal Queries")
print("="*60)
# Create a timeline
t1 = "2024-01-01T00:00:00Z"
t2 = "2024-06-01T00:00:00Z"
t3 = "2024-12-01T00:00:00Z"
print("\n1. Building timeline...")
print(f" {t1}: Sarah lives in Boston")
print(f" {t2}: Sarah moves to Austin")
print(f" {t3}: Sarah moves to Seattle")
# Add facts with specific valid_at times
# await add_fact("Sarah", "lives in", "Boston", valid_at=t1)
# await add_fact("Sarah", "lives in", "Austin", valid_at=t2)
# await add_fact("Sarah", "lives in", "Seattle", valid_at=t3)
print("\n2. Query: Where did Sarah live in March 2024?")
# result = await query_at_time("2024-03-15T00:00:00Z", entity_name="Sarah")
print(" Result: Boston (between Jan and Jun)")
print("\n3. Query: Where did Sarah live in August 2024?")
# result = await query_at_time("2024-08-15T00:00:00Z", entity_name="Sarah")
print(" Result: Austin (between Jun and Dec)")
print("\n4. Query: Where does Sarah live now?")
# result = await query_facts(entity_name="Sarah", include_invalid=False)
print(" Result: Seattle (current)")
print("\n✅ Scenario 3 complete!")
@staticmethod
async def scenario_4_ai_extraction():
"""
Scenario 4: AI-Powered Entity Extraction
Demonstrates: Using OpenAI to extract entities from natural language
"""
print("\n" + "="*60)
print("SCENARIO 4: AI Entity Extraction")
print("="*60)
messages = [
"Michael joined Google as a software engineer last month.",
"Jennifer moved from Chicago to San Francisco last week.",
"The team celebrated Tom's promotion to senior architect.",
"Emma started her new role at Microsoft's cloud division."
]
print("\n1. Processing natural language messages...")
for i, msg in enumerate(messages, 1):
print(f"\n Message {i}: '{msg}'")
# result = await add_message(content=msg, session_id="demo", extract_entities=True)
print(" Extracted facts:")
# Expected extractions
if "Michael" in msg:
print(" - Michael works at Google")
print(" - Michael has role software engineer")
elif "Jennifer" in msg:
print(" - Jennifer lives in San Francisco")
print(" - Jennifer previously lived in Chicago")
elif "Tom" in msg:
print(" - Tom has position senior architect")
elif "Emma" in msg:
print(" - Emma works at Microsoft")
print("\n2. Querying the extracted knowledge...")
# result = await query_facts()
print(" All extracted relationships are now queryable!")
print("\n✅ Scenario 4 complete!")
@staticmethod
async def scenario_5_session_tracking():
"""
Scenario 5: Session-Aware Episodes
Demonstrates: Tracking conversation sessions
"""
print("\n" + "="*60)
print("SCENARIO 5: Session Tracking")
print("="*60)
session_id = "user_conversation_123"
print(f"\n1. Starting session: {session_id}")
print(" Adding facts across multiple messages...")
# Simulate a conversation
messages = [
"I met David at the conference",
"David works at CloudTech",
"We discussed the new AI framework"
]
for msg in messages:
print(f" - {msg}")
# await add_message(content=msg, session_id=session_id)
print("\n2. Querying episode information...")
# result = await get_episodes(max_episodes=5)
print(" Episode details:")
print(f" - Session ID: {session_id}")
print(" - Message count: 3")
print(" - Entities mentioned: David, CloudTech")
print(" - Status: active")
print("\n3. Session expires after 30 minutes of inactivity...")
print(" Cleanup manager automatically removes expired sessions")
print("\n✅ Scenario 5 complete!")
@staticmethod
async def scenario_6_dynamic_tools():
"""
Scenario 6: Dynamic Tool Generation
Demonstrates: Creating webhook tools from database configs
"""
print("\n" + "="*60)
print("SCENARIO 6: Dynamic Tool Generation")
print("="*60)
print("\n1. Webhook configuration in PostgreSQL:")
print("""
{
"user_id": "demo_user",
"name": "Slack Notification",
"url": "https://hooks.slack.com/services/...",
"template_fields": {
"message": {"type": "str", "required": true},
"channel": {"type": "str", "default": "#general"}
}
}
""")
print("\n2. Generating MCP tool...")
# result = await generate_tool_from_db("demo_user", "Slack Notification")
print(" ✅ Tool generated: slack_notification()")
print("\n3. Generated function signature:")
print("""
async def slack_notification(
message: str,
channel: Optional[str] = "#general"
) -> Dict[str, Any]:
'''Send a notification to Slack.'''
""")
print("\n4. Using the generated tool...")
# result = await slack_notification(
# message="Deployment completed!",
# channel="#devops"
# )
print(" ✅ Webhook executed successfully!")
print("\n✅ Scenario 6 complete!")
@staticmethod
async def scenario_7_multi_webhook():
"""
Scenario 7: Multi-Webhook Parallel Execution
Demonstrates: Firing multiple webhooks simultaneously
"""
print("\n" + "="*60)
print("SCENARIO 7: Multi-Webhook Execution")
print("="*60)
print("\n1. Template configuration:")
print(" 'Broadcast Message' template includes:")
print(" - Slack webhook")
print(" - Discord webhook")
print(" - Email webhook")
print("\n2. Generating multi-webhook tool...")
# result = await generate_tool_from_db("demo_user", "Broadcast Message", "multi")
print(" ✅ Tool generated: broadcast_message()")
print("\n3. Using the tool (executes all webhooks in parallel)...")
# result = await broadcast_message(
# message="Important announcement!"
# )
print(" Results:")
print(" - Slack: ✅ Success")
print(" - Discord: ✅ Success")
print(" - Email: ✅ Success")
print(" Total time: 1.2s (parallel execution)")
print(" vs ~3.6s if sequential")
print("\n✅ Scenario 7 complete!")
@staticmethod
async def scenario_8_complex_workflow():
"""
Scenario 8: Complex Workflow
Demonstrates: Combining knowledge graph with automation
"""
print("\n" + "="*60)
print("SCENARIO 8: Complex Workflow")
print("="*60)
print("\n1. Business scenario:")
print(" When a customer's status changes to 'premium',")
print(" notify sales team and update CRM")
print("\n2. Adding customer fact...")
# await add_fact("CustomerA", "status", "premium")
print(" CustomerA status -> premium")
print("\n3. Triggering automation...")
print(" a) Query recent status changes")
# recent_changes = await query_facts(entity_name="CustomerA")
print(" b) Detected: CustomerA upgraded to premium")
print(" c) Execute notification workflow...")
# await slack_notification(
# message="CustomerA upgraded to premium!",
# channel="#sales"
# )
print(" ✅ Slack notification sent")
# await update_crm_webhook(
# customer_id="CustomerA",
# status="premium"
# )
print(" ✅ CRM updated")
print("\n4. Recording the workflow execution...")
# await add_fact(
# "AutomationSystem",
# "processed_upgrade",
# "CustomerA"
# )
print(" Workflow execution recorded in graph")
print("\n✅ Scenario 8 complete!")
@staticmethod
async def run_all_scenarios():
"""Run all example scenarios."""
print("\n")
print("╔" + "="*58 + "╗")
print("║" + " "*58 + "║")
print("║" + " Bi-Temporal Knowledge Graph MCP Server".center(58) + "║")
print("║" + " Example Scenarios".center(58) + "║")
print("║" + " "*58 + "║")
print("╚" + "="*58 + "╝")
scenarios = [
("Basic Facts", ExampleScenarios.scenario_1_basic_facts),
("Employment Change", ExampleScenarios.scenario_2_employment_change),
("Temporal Queries", ExampleScenarios.scenario_3_temporal_query),
("AI Extraction", ExampleScenarios.scenario_4_ai_extraction),
("Session Tracking", ExampleScenarios.scenario_5_session_tracking),
("Dynamic Tools", ExampleScenarios.scenario_6_dynamic_tools),
("Multi-Webhook", ExampleScenarios.scenario_7_multi_webhook),
("Complex Workflow", ExampleScenarios.scenario_8_complex_workflow)
]
for i, (name, scenario_func) in enumerate(scenarios, 1):
await scenario_func()
if i < len(scenarios):
input("\nPress Enter to continue to next scenario...")
print("\n" + "="*60)
print("ALL SCENARIOS COMPLETE!")
print("="*60)
print("\nThese examples demonstrate the key features:")
print(" ✅ Bi-temporal fact tracking")
print(" ✅ Smart conflict resolution")
print(" ✅ Temporal queries")
print(" ✅ AI-powered entity extraction")
print(" ✅ Session management")
print(" ✅ Dynamic tool generation")
print(" ✅ Parallel webhook execution")
print(" ✅ Complex workflow automation")
print("\nReady to deploy? See DEPLOYMENT.md for instructions.")
print()
if __name__ == "__main__":
print("\n⚠️ Note: This is a demonstration script.")
print("The actual MCP tools should be called via the MCP protocol.")
print("This script shows the expected behavior and usage patterns.\n")
try:
asyncio.run(ExampleScenarios.run_all_scenarios())
except KeyboardInterrupt:
print("\n\n👋 Examples interrupted. Goodbye!")
sys.exit(0)