#!/usr/bin/env python3
"""
MongoDB Data Verification Script
Check if data is being stored properly in all collections
"""
import asyncio
import sys
from pathlib import Path
from datetime import datetime
# Add project root to path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
async def verify_mongodb_data():
"""Verify data in MongoDB collections."""
print("π MONGODB DATA VERIFICATION")
print("=" * 60)
try:
from database.mongodb_manager import mongodb_manager
# Connect to MongoDB
await mongodb_manager.connect()
# Get database stats
stats = await mongodb_manager.get_database_stats()
print("π DATABASE OVERVIEW:")
print(f" Database size: {stats.get('database', {}).get('size_mb', 0)} MB")
print(f" Storage size: {stats.get('database', {}).get('storage_mb', 0)} MB")
print(f" Total indexes: {stats.get('database', {}).get('indexes', 0)}")
print()
# Check each collection
collections_to_check = [
'conversations',
'agent_logs',
'extracted_data',
'query_cache',
'user_sessions',
'agent_performance'
]
for collection_name in collections_to_check:
if collection_name in stats:
count = stats[collection_name]['document_count']
print(f"π {collection_name}: {count} documents")
# Show recent documents if any exist
if count > 0:
collection = mongodb_manager.collections[collection_name]
recent_docs = list(collection.find().sort("timestamp", -1).limit(3))
print(f" π Recent entries:")
for i, doc in enumerate(recent_docs, 1):
timestamp = doc.get('timestamp', 'No timestamp')
if isinstance(timestamp, datetime):
timestamp = timestamp.strftime('%Y-%m-%d %H:%M:%S')
if collection_name == 'conversations':
query = doc.get('query', 'No query')[:50]
print(f" {i}. Query: {query}... ({timestamp})")
elif collection_name == 'agent_logs':
agent = doc.get('agent_id', 'Unknown')
status = doc.get('status', 'Unknown')
print(f" {i}. Agent: {agent}, Status: {status} ({timestamp})")
elif collection_name == 'extracted_data':
source = doc.get('source_file', 'Unknown')
text_len = doc.get('text_length', 0)
print(f" {i}. Source: {source}, Text: {text_len} chars ({timestamp})")
else:
print(f" {i}. Document exists ({timestamp})")
print()
# Test search functionality
print("π TESTING SEARCH FUNCTIONALITY:")
# Search conversations
conv_results = await mongodb_manager.search_conversations("weather", limit=3)
print(f" Weather conversations: {len(conv_results)} found")
# Search extracted data
data_results = await mongodb_manager.search_extracted_data("dave matthews", limit=3)
print(f" Dave Matthews extracts: {len(data_results)} found")
print()
# Test agent performance
print("π AGENT PERFORMANCE METRICS:")
performance = await mongodb_manager.get_agent_performance(hours=24)
if performance:
for agent_id, metrics in performance.items():
success_rate = metrics.get('success_rate', 0) * 100
avg_time = metrics.get('avg_execution_time', 0)
total_exec = metrics.get('total_executions', 0)
print(f" π€ {agent_id}:")
print(f" Executions: {total_exec}")
print(f" Success rate: {success_rate:.1f}%")
print(f" Avg time: {avg_time:.2f}s")
else:
print(" No performance data available yet")
print("\nβ
MongoDB verification completed!")
return True
except Exception as e:
print(f"β MongoDB verification failed: {e}")
return False
async def test_conversation_engine():
"""Test conversation engine functionality."""
print("\nπ£οΈ CONVERSATION ENGINE TEST")
print("=" * 60)
try:
from core.conversation_engine import conversation_engine
# Test query processing
test_query = "What is 10% of 500?"
user_id = "test_user"
session_id = "test_session"
print(f"π Testing query: {test_query}")
result = await conversation_engine.process_query(user_id, session_id, test_query)
print(f"π Result status: {result.get('status', 'unknown')}")
print(f"π¬ Response: {result.get('message', 'No message')[:100]}...")
print(f"π€ Agent used: {result.get('agent_used', 'unknown')}")
# Test context retrieval
context = await conversation_engine.get_conversation_context(user_id, session_id)
print(f"π Context entries: {len(context)}")
return result.get('status') == 'success'
except Exception as e:
print(f"β Conversation engine test failed: {e}")
return False
async def test_inter_agent_coordinator():
"""Test inter-agent coordinator."""
print("\nπ€ INTER-AGENT COORDINATOR TEST")
print("=" * 60)
try:
from core.inter_agent_coordinator import inter_agent_coordinator
# Get coordination stats
stats = await inter_agent_coordinator.get_coordination_stats()
print(f"π Registered agents: {stats.get('registered_agents', 0)}")
print(f"π Available workflows: {stats.get('available_workflows', 0)}")
print(f"β‘ Workflow patterns: {', '.join(stats.get('workflow_patterns', []))}")
# Test multi-agent coordination
test_task = "Check weather in Delhi and calculate 20% of 1000"
user_id = "test_user"
session_id = "test_session"
print(f"\nπ Testing coordination: {test_task}")
result = await inter_agent_coordinator.coordinate_multi_agent_task(
test_task, user_id, session_id
)
print(f"π Coordination status: {result.get('status', 'unknown')}")
print(f"π¬ Response: {result.get('message', 'No message')[:100]}...")
return result.get('status') == 'success'
except Exception as e:
print(f"β Inter-agent coordinator test failed: {e}")
return False
async def main():
"""Main verification function."""
print("π§ͺ COMPREHENSIVE SYSTEM VERIFICATION")
print("=" * 80)
print("π― Verifying MongoDB data storage and system functionality")
print("=" * 80)
tests = [
("MongoDB Data", verify_mongodb_data),
("Conversation Engine", test_conversation_engine),
("Inter-Agent Coordinator", test_inter_agent_coordinator)
]
passed_tests = 0
for test_name, test_function in tests:
print(f"\nπ Running {test_name} test...")
try:
result = await test_function()
if result:
print(f"β
{test_name} test PASSED")
passed_tests += 1
else:
print(f"β {test_name} test FAILED")
except Exception as e:
print(f"β {test_name} test ERROR: {e}")
print("\n" + "=" * 80)
print("π VERIFICATION RESULTS")
print("=" * 80)
print(f"β
Passed tests: {passed_tests}/{len(tests)}")
print(f"π Success rate: {(passed_tests/len(tests))*100:.1f}%")
if passed_tests == len(tests):
print("\nπ ALL TESTS PASSED!")
print("β
Your MCP system is working perfectly!")
print("πΎ MongoDB integration confirmed")
print("π£οΈ Conversational AI operational")
print("π€ Inter-agent coordination ready")
else:
print("\nβ οΈ SOME TESTS FAILED")
print("π§ Check the error messages above")
return passed_tests == len(tests)
if __name__ == "__main__":
try:
success = asyncio.run(main())
if success:
print("\nπ System verification completed successfully!")
else:
print("\nπ§ System verification found issues.")
except Exception as e:
print(f"\nβ Verification failed: {e}")
import traceback
traceback.print_exc()