Skip to main content
Glama
connectimtiazh

Bi-Temporal Knowledge Graph MCP Server

examples.py14.2 kB
""" Example Usage and Test Scenarios ================================= This file contains practical examples and test scenarios for the Bi-Temporal Knowledge Graph MCP Server. Run with: python examples.py """ import asyncio import sys from datetime import datetime, timedelta # Import the MCP tools (assumes server is running) # In practice, these would be called via MCP protocol class ExampleScenarios: """Collection of example usage scenarios.""" @staticmethod async def scenario_1_basic_facts(): """ Scenario 1: Adding Basic Facts Demonstrates: Simple fact addition and querying """ print("\n" + "="*60) print("SCENARIO 1: Basic Facts") print("="*60) # Add some basic facts facts = [ ("Alice", "knows", "Bob"), ("Alice", "works at", "TechCorp"), ("Bob", "lives in", "New York"), ("Bob", "works at", "DataCorp") ] print("\n1. Adding facts to the graph...") for source, relation, target in facts: print(f" Adding: {source} -{relation}-> {target}") # In practice: await add_fact(source, relation, target) print("\n2. Querying facts for Alice...") # In practice: result = await query_facts(entity_name="Alice") print(" Expected results:") print(" - Alice knows Bob") print(" - Alice works at TechCorp") print("\n✅ Scenario 1 complete!") @staticmethod async def scenario_2_employment_change(): """ Scenario 2: Employment Change with Automatic Invalidation Demonstrates: Smart conflict resolution """ print("\n" + "="*60) print("SCENARIO 2: Employment Change") print("="*60) print("\n1. Initial employment...") # await add_fact("John", "works at", "Startup Inc") print(" John works at Startup Inc") # Wait a bit (simulated) print("\n2. Job change after 6 months...") # await add_fact("John", "works at", "BigTech Corp") print(" John works at BigTech Corp") print(" ⚠️ Previous employment automatically invalidated!") print("\n3. Querying current employment...") # result = await query_facts(entity_name="John", include_invalid=False) print(" Result: John works at BigTech Corp") print("\n4. Querying employment history...") # result = await query_facts(entity_name="John", include_invalid=True) print(" Results:") print(" - John works at BigTech Corp (valid)") print(" - John works at Startup Inc (invalidated)") print("\n✅ Scenario 2 complete!") @staticmethod async def scenario_3_temporal_query(): """ Scenario 3: Temporal Queries Demonstrates: Querying facts at specific points in time """ print("\n" + "="*60) print("SCENARIO 3: Temporal Queries") print("="*60) # Create a timeline t1 = "2024-01-01T00:00:00Z" t2 = "2024-06-01T00:00:00Z" t3 = "2024-12-01T00:00:00Z" print("\n1. Building timeline...") print(f" {t1}: Sarah lives in Boston") print(f" {t2}: Sarah moves to Austin") print(f" {t3}: Sarah moves to Seattle") # Add facts with specific valid_at times # await add_fact("Sarah", "lives in", "Boston", valid_at=t1) # await add_fact("Sarah", "lives in", "Austin", valid_at=t2) # await add_fact("Sarah", "lives in", "Seattle", valid_at=t3) print("\n2. Query: Where did Sarah live in March 2024?") # result = await query_at_time("2024-03-15T00:00:00Z", entity_name="Sarah") print(" Result: Boston (between Jan and Jun)") print("\n3. Query: Where did Sarah live in August 2024?") # result = await query_at_time("2024-08-15T00:00:00Z", entity_name="Sarah") print(" Result: Austin (between Jun and Dec)") print("\n4. Query: Where does Sarah live now?") # result = await query_facts(entity_name="Sarah", include_invalid=False) print(" Result: Seattle (current)") print("\n✅ Scenario 3 complete!") @staticmethod async def scenario_4_ai_extraction(): """ Scenario 4: AI-Powered Entity Extraction Demonstrates: Using OpenAI to extract entities from natural language """ print("\n" + "="*60) print("SCENARIO 4: AI Entity Extraction") print("="*60) messages = [ "Michael joined Google as a software engineer last month.", "Jennifer moved from Chicago to San Francisco last week.", "The team celebrated Tom's promotion to senior architect.", "Emma started her new role at Microsoft's cloud division." ] print("\n1. Processing natural language messages...") for i, msg in enumerate(messages, 1): print(f"\n Message {i}: '{msg}'") # result = await add_message(content=msg, session_id="demo", extract_entities=True) print(" Extracted facts:") # Expected extractions if "Michael" in msg: print(" - Michael works at Google") print(" - Michael has role software engineer") elif "Jennifer" in msg: print(" - Jennifer lives in San Francisco") print(" - Jennifer previously lived in Chicago") elif "Tom" in msg: print(" - Tom has position senior architect") elif "Emma" in msg: print(" - Emma works at Microsoft") print("\n2. Querying the extracted knowledge...") # result = await query_facts() print(" All extracted relationships are now queryable!") print("\n✅ Scenario 4 complete!") @staticmethod async def scenario_5_session_tracking(): """ Scenario 5: Session-Aware Episodes Demonstrates: Tracking conversation sessions """ print("\n" + "="*60) print("SCENARIO 5: Session Tracking") print("="*60) session_id = "user_conversation_123" print(f"\n1. Starting session: {session_id}") print(" Adding facts across multiple messages...") # Simulate a conversation messages = [ "I met David at the conference", "David works at CloudTech", "We discussed the new AI framework" ] for msg in messages: print(f" - {msg}") # await add_message(content=msg, session_id=session_id) print("\n2. Querying episode information...") # result = await get_episodes(max_episodes=5) print(" Episode details:") print(f" - Session ID: {session_id}") print(" - Message count: 3") print(" - Entities mentioned: David, CloudTech") print(" - Status: active") print("\n3. Session expires after 30 minutes of inactivity...") print(" Cleanup manager automatically removes expired sessions") print("\n✅ Scenario 5 complete!") @staticmethod async def scenario_6_dynamic_tools(): """ Scenario 6: Dynamic Tool Generation Demonstrates: Creating webhook tools from database configs """ print("\n" + "="*60) print("SCENARIO 6: Dynamic Tool Generation") print("="*60) print("\n1. Webhook configuration in PostgreSQL:") print(""" { "user_id": "demo_user", "name": "Slack Notification", "url": "https://hooks.slack.com/services/...", "template_fields": { "message": {"type": "str", "required": true}, "channel": {"type": "str", "default": "#general"} } } """) print("\n2. Generating MCP tool...") # result = await generate_tool_from_db("demo_user", "Slack Notification") print(" ✅ Tool generated: slack_notification()") print("\n3. Generated function signature:") print(""" async def slack_notification( message: str, channel: Optional[str] = "#general" ) -> Dict[str, Any]: '''Send a notification to Slack.''' """) print("\n4. Using the generated tool...") # result = await slack_notification( # message="Deployment completed!", # channel="#devops" # ) print(" ✅ Webhook executed successfully!") print("\n✅ Scenario 6 complete!") @staticmethod async def scenario_7_multi_webhook(): """ Scenario 7: Multi-Webhook Parallel Execution Demonstrates: Firing multiple webhooks simultaneously """ print("\n" + "="*60) print("SCENARIO 7: Multi-Webhook Execution") print("="*60) print("\n1. Template configuration:") print(" 'Broadcast Message' template includes:") print(" - Slack webhook") print(" - Discord webhook") print(" - Email webhook") print("\n2. Generating multi-webhook tool...") # result = await generate_tool_from_db("demo_user", "Broadcast Message", "multi") print(" ✅ Tool generated: broadcast_message()") print("\n3. Using the tool (executes all webhooks in parallel)...") # result = await broadcast_message( # message="Important announcement!" # ) print(" Results:") print(" - Slack: ✅ Success") print(" - Discord: ✅ Success") print(" - Email: ✅ Success") print(" Total time: 1.2s (parallel execution)") print(" vs ~3.6s if sequential") print("\n✅ Scenario 7 complete!") @staticmethod async def scenario_8_complex_workflow(): """ Scenario 8: Complex Workflow Demonstrates: Combining knowledge graph with automation """ print("\n" + "="*60) print("SCENARIO 8: Complex Workflow") print("="*60) print("\n1. Business scenario:") print(" When a customer's status changes to 'premium',") print(" notify sales team and update CRM") print("\n2. Adding customer fact...") # await add_fact("CustomerA", "status", "premium") print(" CustomerA status -> premium") print("\n3. Triggering automation...") print(" a) Query recent status changes") # recent_changes = await query_facts(entity_name="CustomerA") print(" b) Detected: CustomerA upgraded to premium") print(" c) Execute notification workflow...") # await slack_notification( # message="CustomerA upgraded to premium!", # channel="#sales" # ) print(" ✅ Slack notification sent") # await update_crm_webhook( # customer_id="CustomerA", # status="premium" # ) print(" ✅ CRM updated") print("\n4. Recording the workflow execution...") # await add_fact( # "AutomationSystem", # "processed_upgrade", # "CustomerA" # ) print(" Workflow execution recorded in graph") print("\n✅ Scenario 8 complete!") @staticmethod async def run_all_scenarios(): """Run all example scenarios.""" print("\n") print("╔" + "="*58 + "╗") print("║" + " "*58 + "║") print("║" + " Bi-Temporal Knowledge Graph MCP Server".center(58) + "║") print("║" + " Example Scenarios".center(58) + "║") print("║" + " "*58 + "║") print("╚" + "="*58 + "╝") scenarios = [ ("Basic Facts", ExampleScenarios.scenario_1_basic_facts), ("Employment Change", ExampleScenarios.scenario_2_employment_change), ("Temporal Queries", ExampleScenarios.scenario_3_temporal_query), ("AI Extraction", ExampleScenarios.scenario_4_ai_extraction), ("Session Tracking", ExampleScenarios.scenario_5_session_tracking), ("Dynamic Tools", ExampleScenarios.scenario_6_dynamic_tools), ("Multi-Webhook", ExampleScenarios.scenario_7_multi_webhook), ("Complex Workflow", ExampleScenarios.scenario_8_complex_workflow) ] for i, (name, scenario_func) in enumerate(scenarios, 1): await scenario_func() if i < len(scenarios): input("\nPress Enter to continue to next scenario...") print("\n" + "="*60) print("ALL SCENARIOS COMPLETE!") print("="*60) print("\nThese examples demonstrate the key features:") print(" ✅ Bi-temporal fact tracking") print(" ✅ Smart conflict resolution") print(" ✅ Temporal queries") print(" ✅ AI-powered entity extraction") print(" ✅ Session management") print(" ✅ Dynamic tool generation") print(" ✅ Parallel webhook execution") print(" ✅ Complex workflow automation") print("\nReady to deploy? See DEPLOYMENT.md for instructions.") print() if __name__ == "__main__": print("\n⚠️ Note: This is a demonstration script.") print("The actual MCP tools should be called via the MCP protocol.") print("This script shows the expected behavior and usage patterns.\n") try: asyncio.run(ExampleScenarios.run_all_scenarios()) except KeyboardInterrupt: print("\n\n👋 Examples interrupted. Goodbye!") sys.exit(0)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/connectimtiazh/Graphiti-Knowledge-MCP-Server-Starter'

If you have feedback or need assistance with the MCP directory API, please join our Discord server