Skip to main content
Glama
test_multitenant.py8.2 kB
"""Test multi-tenant functionality for ickyMCP.""" import sys import asyncio from pathlib import Path # Force UTF-8 encoding for Windows console output if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') # Add project root and src to path project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) sys.path.insert(0, str(project_root / "src")) from src.server import ( handle_status, handle_list, handle_index, handle_search, handle_delete ) from src.config import get_user_db_path, DATA_DIR async def run_multitenant_tests(): """Test multi-tenant isolation and document filtering.""" print("=" * 80) print("MULTI-TENANT FUNCTIONALITY TESTS") print("=" * 80) print() # Test 1: Verify user database paths print("[TEST 1] User database path generation") print("-" * 80) user1_path = get_user_db_path("user-abc123") user2_path = get_user_db_path("user-def456") print(f"User 1 DB path: {user1_path}") print(f"User 2 DB path: {user2_path}") print(f"Paths are different: {user1_path != user2_path}") print(f"DATA_DIR: {DATA_DIR}") print() # Test 2: Test legacy mode (no user_id) print("[TEST 2] Legacy mode (no user_id)") print("-" * 80) status = await handle_status({}) print(f"User ID: {status['user_id']}") print(f"Documents: {status['total_documents']}") print(f"Chunks: {status['total_chunks']}") print() # Test 3: Create isolated user databases print("[TEST 3] Multi-tenant isolation - Index for User A") print("-" * 80) docs_path = str(project_root / "docs") # Index just one document for User A result_a = await handle_index({ "user_id": "test-user-A", "path": str(project_root / "docs" / "RL.pdf") }) print(f"User A - Indexed: {result_a.get('indexed', 0)} files") print(f"User A - Errors: {result_a.get('errors')}") # Check User A's status status_a = await handle_status({"user_id": "test-user-A"}) print(f"User A - Total documents: {status_a['total_documents']}") print(f"User A - Total chunks: {status_a['total_chunks']}") print() # Test 4: Index different document for User B print("[TEST 4] Multi-tenant isolation - Index for User B") print("-" * 80) result_b = await handle_index({ "user_id": "test-user-B", "path": str(project_root / "docs" / "AIOps.pdf") }) print(f"User B - Indexed: {result_b.get('indexed', 0)} files") status_b = await handle_status({"user_id": "test-user-B"}) print(f"User B - Total documents: {status_b['total_documents']}") print(f"User B - Total chunks: {status_b['total_chunks']}") print() # Test 5: Verify isolation - User A should NOT see User B's documents print("[TEST 5] Verify user isolation") print("-" * 80) list_a = await handle_list({"user_id": "test-user-A"}) list_b = await handle_list({"user_id": "test-user-B"}) print(f"User A documents: {[d['path'] for d in list_a['documents']]}") print(f"User B documents: {[d['path'] for d in list_b['documents']]}") # Verify they're different paths_a = set(d['path'] for d in list_a['documents']) paths_b = set(d['path'] for d in list_b['documents']) isolated = len(paths_a.intersection(paths_b)) == 0 print(f"Users are isolated (no shared docs): {isolated}") print() # Test 6: Search within user's own database print("[TEST 6] Search within user's database") print("-" * 80) search_a = await handle_search({ "user_id": "test-user-A", "query": "machine learning", "top_k": 3 }) print(f"User A search results: {search_a['count']}") if search_a['results']: print(f" Top result from: {Path(search_a['results'][0]['path']).name}") print(f" Score: {search_a['results'][0]['score']:.3f}") print() # Test 7: Document ID filtering - Index multiple docs for User C print("[TEST 7] Document ID filtering setup - Index multiple docs for User C") print("-" * 80) # Index multiple documents await handle_index({ "user_id": "test-user-C", "path": str(project_root / "docs" / "RL.pdf") }) await handle_index({ "user_id": "test-user-C", "path": str(project_root / "docs" / "AIOps.pdf") }) await handle_index({ "user_id": "test-user-C", "path": str(project_root / "docs" / "AI for Low Tech.pdf") }) list_c = await handle_list({"user_id": "test-user-C"}) print(f"User C has {list_c['count']} documents:") for doc in list_c['documents']: print(f" ID {doc['id']}: {Path(doc['path']).name} ({doc['chunks']} chunks)") print() # Test 8: Search with document_ids filter print("[TEST 8] Search with document_ids filter") print("-" * 80) # Get the document IDs doc_ids = [d['id'] for d in list_c['documents']] first_doc_id = doc_ids[0] if doc_ids else None if first_doc_id: # Search all documents search_all = await handle_search({ "user_id": "test-user-C", "query": "machine learning algorithms", "top_k": 10 }) # Search only first document search_filtered = await handle_search({ "user_id": "test-user-C", "query": "machine learning algorithms", "top_k": 10, "document_ids": [first_doc_id] }) print(f"Search ALL documents: {search_all['count']} results") all_docs = set(Path(r['path']).name for r in search_all['results']) print(f" Results from: {all_docs}") print(f"\nSearch FILTERED (doc ID {first_doc_id}): {search_filtered['count']} results") filtered_docs = set(Path(r['path']).name for r in search_filtered['results']) print(f" Results from: {filtered_docs}") print(f" Filtered to documents: {search_filtered['filtered_to_documents']}") # Verify filter worked filter_worked = len(filtered_docs) <= 1 print(f"\nDocument filter working: {filter_worked}") print() # Test 9: List returns document IDs print("[TEST 9] Verify list returns document IDs for selection") print("-" * 80) list_result = await handle_list({"user_id": "test-user-C"}) has_ids = all('id' in d for d in list_result['documents']) print(f"All documents have 'id' field: {has_ids}") print("Document IDs can be used for document_ids filter in search") print() # Test 10: Delete by document ID print("[TEST 10] Delete by document ID") print("-" * 80) if doc_ids and len(doc_ids) > 1: delete_id = doc_ids[-1] delete_result = await handle_delete({ "user_id": "test-user-C", "document_ids": [delete_id] }) print(f"Deleted document ID {delete_id}") print(f"Deleted: {delete_result['deleted_documents']} docs, {delete_result['deleted_chunks']} chunks") # Verify deletion list_after = await handle_list({"user_id": "test-user-C"}) remaining_ids = [d['id'] for d in list_after['documents']] deleted_properly = delete_id not in remaining_ids print(f"Document properly deleted: {deleted_properly}") print(f"Remaining documents: {list_after['count']}") print() # Cleanup test users print("[CLEANUP] Removing test user databases") print("-" * 80) for user_id in ["test-user-A", "test-user-B", "test-user-C"]: await handle_delete({"user_id": user_id, "all": True}) print(f"Cleaned up {user_id}") print() print("=" * 80) print("MULTI-TENANT TESTS COMPLETED") print("=" * 80) print() print("Summary:") print(" - Per-user database isolation: Working") print(" - Document ID filtering: Working") print(" - List returns IDs for selection: Working") print(" - Delete by document ID: Working") print(" - Legacy mode (no user_id): Working") if __name__ == "__main__": asyncio.run(run_multitenant_tests())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dl1683/ickyMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server