Skip to main content
Glama

cognee-mcp

test_client.py30.5 kB
#!/usr/bin/env python3 """ Test client for Cognee MCP Server functionality. This script tests all the tools and functions available in the Cognee MCP server, including cognify, codify, search, prune, status checks, and utility functions. Usage: # Set your OpenAI API key first export OPENAI_API_KEY="your-api-key-here" # Run the test client python src/test_client.py # Or use LLM_API_KEY instead of OPENAI_API_KEY export LLM_API_KEY="your-api-key-here" python src/test_client.py """ import asyncio import os import tempfile import time from contextlib import asynccontextmanager from cognee.shared.logging_utils import setup_logging from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from cognee.modules.pipelines.models.PipelineRun import PipelineRunStatus from cognee.infrastructure.databases.exceptions import DatabaseNotCreatedError from src.server import ( node_to_string, retrieved_edges_to_string, load_class, ) # Set timeout for cognify/codify to complete in TIMEOUT = 5 * 60 # 5 min in seconds class CogneeTestClient: """Test client for Cognee MCP Server functionality.""" def __init__(self): self.test_results = {} self.temp_files = [] async def setup(self): """Setup test environment.""" print("🔧 Setting up test environment...") # Check for required API keys api_key = os.environ.get("OPENAI_API_KEY") or os.environ.get("LLM_API_KEY") if not api_key: print("⚠️ Warning: No OPENAI_API_KEY or LLM_API_KEY found in environment.") print(" Some tests may fail without proper LLM API configuration.") print(" Set OPENAI_API_KEY environment variable for full functionality.") else: print("✅ API key configured.") # Create temporary test files self.test_data_dir = tempfile.mkdtemp(prefix="cognee_test_") # Create a test text file self.test_text_file = os.path.join(self.test_data_dir, "test.txt") with open(self.test_text_file, "w") as f: f.write( "This is a test document for Cognee testing. It contains information about AI and knowledge graphs." ) # Create a test code repository structure self.test_repo_dir = os.path.join(self.test_data_dir, "test_repo") os.makedirs(self.test_repo_dir) # Create test Python files test_py_file = os.path.join(self.test_repo_dir, "main.py") with open(test_py_file, "w") as f: f.write(""" def hello_world(): '''A simple hello world function.''' return "Hello, World!" class TestClass: '''A test class for demonstration.''' def __init__(self, name): self.name = name def greet(self): return f"Hello, {self.name}!" """) # Create a test configuration file config_file = os.path.join(self.test_repo_dir, "config.py") with open(config_file, "w") as f: f.write(""" # Configuration settings DATABASE_URL = "sqlite:///test.db" DEBUG = True """) # Create test developer rules files cursorrules_file = os.path.join(self.test_data_dir, ".cursorrules") with open(cursorrules_file, "w") as f: f.write("# Test cursor rules\nUse Python best practices.") self.temp_files.extend([self.test_text_file, test_py_file, config_file, cursorrules_file]) print(f"✅ Test environment created at: {self.test_data_dir}") async def cleanup(self): """Clean up test environment.""" print("🧹 Cleaning up test environment...") import shutil if os.path.exists(self.test_data_dir): shutil.rmtree(self.test_data_dir) print("✅ Cleanup completed") @asynccontextmanager async def mcp_server_session(self): """Context manager to start and manage MCP server session.""" # Get the path to the server script server_script = os.path.join(os.path.dirname(__file__), "server.py") # Pass current environment variables to the server process # This ensures OpenAI API key and other config is available server_env = os.environ.copy() # Start the server process server_params = StdioServerParameters( command="python", args=[server_script, "--transport", "stdio"], env=server_env, ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # Initialize the session await session.initialize() yield session async def test_mcp_server_startup_and_tools(self): """Test that the MCP server starts properly and returns tool results.""" print("\n🧪 Testing MCP server startup and tool execution...") try: async with self.mcp_server_session() as session: # Test 1: List available tools print(" 🔍 Testing tool discovery...") tools_result = await session.list_tools() expected_tools = { "cognify", "codify", "search", "prune", "cognify_status", "codify_status", "cognee_add_developer_rules", "list_data", "delete", } available_tools = {tool.name for tool in tools_result.tools} if not expected_tools.issubset(available_tools): missing_tools = expected_tools - available_tools raise AssertionError(f"Missing expected tools: {missing_tools}") print( f" ✅ Found {len(available_tools)} tools: {', '.join(sorted(available_tools))}" ) except Exception as e: self.test_results["mcp_server_integration"] = { "status": "FAIL", "error": str(e), "message": "MCP server integration test failed", } print(f"❌ MCP server integration test failed: {e}") async def test_prune(self): """Test the prune functionality using MCP client.""" print("\n🧪 Testing prune functionality...") try: async with self.mcp_server_session() as session: result = await session.call_tool("prune", arguments={}) self.test_results["prune"] = { "status": "PASS", "result": result, "message": "Prune executed successfully", } print("✅ Prune test passed") except Exception as e: self.test_results["prune"] = { "status": "FAIL", "error": str(e), "message": "Prune test failed", } print(f"❌ Prune test failed: {e}") raise e async def test_cognify(self, test_text, test_name): """Test the cognify functionality using MCP client.""" print("\n🧪 Testing cognify functionality...") try: # Test with simple text using MCP client async with self.mcp_server_session() as session: cognify_result = await session.call_tool("cognify", arguments={"data": test_text}) start = time.time() # mark the start while True: try: # Wait a moment await asyncio.sleep(5) # Check if cognify processing is finished status_result = await session.call_tool("cognify_status", arguments={}) if hasattr(status_result, "content") and status_result.content: status_text = ( status_result.content[0].text if status_result.content else str(status_result) ) else: status_text = str(status_result) if str(PipelineRunStatus.DATASET_PROCESSING_COMPLETED) in status_text: break elif time.time() - start > TIMEOUT: raise TimeoutError("Cognify did not complete in 5min") except DatabaseNotCreatedError: if time.time() - start > TIMEOUT: raise TimeoutError("Database was not created in 5min") self.test_results[test_name] = { "status": "PASS", "result": cognify_result, "message": f"{test_name} executed successfully", } print(f"✅ {test_name} test passed") except Exception as e: self.test_results[test_name] = { "status": "FAIL", "error": str(e), "message": f"{test_name} test failed", } print(f"❌ {test_name} test failed: {e}") async def test_codify(self): """Test the codify functionality using MCP client.""" print("\n🧪 Testing codify functionality...") try: async with self.mcp_server_session() as session: codify_result = await session.call_tool( "codify", arguments={"repo_path": self.test_repo_dir} ) start = time.time() # mark the start while True: try: # Wait a moment await asyncio.sleep(5) # Check if codify processing is finished status_result = await session.call_tool("codify_status", arguments={}) if hasattr(status_result, "content") and status_result.content: status_text = ( status_result.content[0].text if status_result.content else str(status_result) ) else: status_text = str(status_result) if str(PipelineRunStatus.DATASET_PROCESSING_COMPLETED) in status_text: break elif time.time() - start > TIMEOUT: raise TimeoutError("Codify did not complete in 5min") except DatabaseNotCreatedError: if time.time() - start > TIMEOUT: raise TimeoutError("Database was not created in 5min") self.test_results["codify"] = { "status": "PASS", "result": codify_result, "message": "Codify executed successfully", } print("✅ Codify test passed") except Exception as e: self.test_results["codify"] = { "status": "FAIL", "error": str(e), "message": "Codify test failed", } print(f"❌ Codify test failed: {e}") async def test_cognee_add_developer_rules(self): """Test the cognee_add_developer_rules functionality using MCP client.""" print("\n🧪 Testing cognee_add_developer_rules functionality...") try: async with self.mcp_server_session() as session: result = await session.call_tool( "cognee_add_developer_rules", arguments={"base_path": self.test_data_dir} ) start = time.time() # mark the start while True: try: # Wait a moment await asyncio.sleep(5) # Check if developer rule cognify processing is finished status_result = await session.call_tool("cognify_status", arguments={}) if hasattr(status_result, "content") and status_result.content: status_text = ( status_result.content[0].text if status_result.content else str(status_result) ) else: status_text = str(status_result) if str(PipelineRunStatus.DATASET_PROCESSING_COMPLETED) in status_text: break elif time.time() - start > TIMEOUT: raise TimeoutError( "Cognify of developer rules did not complete in 5min" ) except DatabaseNotCreatedError: if time.time() - start > TIMEOUT: raise TimeoutError("Database was not created in 5min") self.test_results["cognee_add_developer_rules"] = { "status": "PASS", "result": result, "message": "Developer rules addition executed successfully", } print("✅ Developer rules test passed") except Exception as e: self.test_results["cognee_add_developer_rules"] = { "status": "FAIL", "error": str(e), "message": "Developer rules test failed", } print(f"❌ Developer rules test failed: {e}") async def test_search_functionality(self): """Test the search functionality with different search types using MCP client.""" print("\n🧪 Testing search functionality...") search_query = "What is artificial intelligence?" # Test if all search types will execute from cognee import SearchType # Go through all Cognee search types for search_type in SearchType: # Don't test these search types if search_type in [SearchType.NATURAL_LANGUAGE, SearchType.CYPHER]: break try: async with self.mcp_server_session() as session: result = await session.call_tool( "search", arguments={"search_query": search_query, "search_type": search_type.value}, ) self.test_results[f"search_{search_type}"] = { "status": "PASS", "result": result, "message": f"Search with {search_type} successful", } print(f"✅ Search {search_type} test passed") except Exception as e: self.test_results[f"search_{search_type}"] = { "status": "FAIL", "error": str(e), "message": f"Search with {search_type} failed", } print(f"❌ Search {search_type} test failed: {e}") async def test_list_data(self): """Test the list_data functionality.""" print("\n🧪 Testing list_data functionality...") try: async with self.mcp_server_session() as session: # Test listing all datasets result = await session.call_tool("list_data", arguments={}) if result.content and len(result.content) > 0: content = result.content[0].text # Check if the output contains expected elements if "Available Datasets:" in content or "No datasets found" in content: self.test_results["list_data_all"] = { "status": "PASS", "result": content[:200] + "..." if len(content) > 200 else content, "message": "list_data (all datasets) successful", } print("✅ list_data (all datasets) test passed") # If there are datasets, try to list data for the first one if "Dataset ID:" in content: # Extract the first dataset ID from the output lines = content.split("\n") dataset_id = None for line in lines: if "Dataset ID:" in line: dataset_id = line.split("Dataset ID:")[1].strip() break if dataset_id: # Test listing data for specific dataset specific_result = await session.call_tool( "list_data", arguments={"dataset_id": dataset_id} ) if specific_result.content and len(specific_result.content) > 0: specific_content = specific_result.content[0].text if "Dataset:" in specific_content: self.test_results["list_data_specific"] = { "status": "PASS", "result": specific_content[:200] + "..." if len(specific_content) > 200 else specific_content, "message": "list_data (specific dataset) successful", } print("✅ list_data (specific dataset) test passed") else: raise Exception( "Specific dataset listing returned unexpected format" ) else: raise Exception("Specific dataset listing returned no content") else: raise Exception("list_data returned unexpected format") else: raise Exception("list_data returned no content") except Exception as e: self.test_results["list_data"] = { "status": "FAIL", "error": str(e), "message": "list_data test failed", } print(f"❌ list_data test failed: {e}") async def test_delete(self): """Test the delete functionality.""" print("\n🧪 Testing delete functionality...") try: async with self.mcp_server_session() as session: # First, let's get available data to delete list_result = await session.call_tool("list_data", arguments={}) if not (list_result.content and len(list_result.content) > 0): raise Exception("No data available for delete test - list_data returned empty") content = list_result.content[0].text # Look for data IDs and dataset IDs in the content lines = content.split("\n") dataset_id = None data_id = None for line in lines: if "Dataset ID:" in line: dataset_id = line.split("Dataset ID:")[1].strip() elif "Data ID:" in line: data_id = line.split("Data ID:")[1].strip() break # Get the first data item if dataset_id and data_id: # Test soft delete (default) delete_result = await session.call_tool( "delete", arguments={"data_id": data_id, "dataset_id": dataset_id, "mode": "soft"}, ) if delete_result.content and len(delete_result.content) > 0: delete_content = delete_result.content[0].text if "Delete operation completed successfully" in delete_content: self.test_results["delete_soft"] = { "status": "PASS", "result": delete_content[:200] + "..." if len(delete_content) > 200 else delete_content, "message": "delete (soft mode) successful", } print("✅ delete (soft mode) test passed") else: # Check if it's an expected error (like document not found) if "not found" in delete_content.lower(): self.test_results["delete_soft"] = { "status": "PASS", "result": delete_content, "message": "delete test passed with expected 'not found' error", } print("✅ delete test passed (expected 'not found' error)") else: raise Exception( f"Delete returned unexpected content: {delete_content}" ) else: raise Exception("Delete returned no content") else: # Test with invalid UUIDs to check error handling invalid_result = await session.call_tool( "delete", arguments={ "data_id": "invalid-uuid", "dataset_id": "another-invalid-uuid", "mode": "soft", }, ) if invalid_result.content and len(invalid_result.content) > 0: invalid_content = invalid_result.content[0].text if "Invalid UUID format" in invalid_content: self.test_results["delete_error_handling"] = { "status": "PASS", "result": invalid_content, "message": "delete error handling works correctly", } print("✅ delete error handling test passed") else: raise Exception(f"Expected UUID error not found: {invalid_content}") else: raise Exception("Delete error test returned no content") except Exception as e: self.test_results["delete"] = { "status": "FAIL", "error": str(e), "message": "delete test failed", } print(f"❌ delete test failed: {e}") def test_utility_functions(self): """Test utility functions.""" print("\n🧪 Testing utility functions...") # Test node_to_string try: test_node = {"id": "test_id", "name": "test_name", "type": "test_type"} result = node_to_string(test_node) expected = 'Node(id: "test_id", name: "test_name")' if result == expected: self.test_results["node_to_string"] = { "status": "PASS", "result": result, "message": "node_to_string function works correctly", } print("✅ node_to_string test passed") else: self.test_results["node_to_string"] = { "status": "FAIL", "result": result, "expected": expected, "message": "node_to_string function output mismatch", } print(f"❌ node_to_string test failed: expected {expected}, got {result}") except Exception as e: self.test_results["node_to_string"] = { "status": "FAIL", "error": str(e), "message": "node_to_string test failed", } print(f"❌ node_to_string test failed: {e}") # Test retrieved_edges_to_string try: test_triplet = [ ( {"id": "node1", "name": "Node1"}, {"relationship_name": "CONNECTS_TO"}, {"id": "node2", "name": "Node2"}, ) ] result = retrieved_edges_to_string(test_triplet) expected = ( 'Node(id: "node1", name: "Node1") CONNECTS_TO Node(id: "node2", name: "Node2")' ) if result == expected: self.test_results["retrieved_edges_to_string"] = { "status": "PASS", "result": result, "message": "retrieved_edges_to_string function works correctly", } print("✅ retrieved_edges_to_string test passed") else: self.test_results["retrieved_edges_to_string"] = { "status": "FAIL", "result": result, "expected": expected, "message": "retrieved_edges_to_string function output mismatch", } print( f"❌ retrieved_edges_to_string test failed: expected {expected}, got {result}" ) except Exception as e: self.test_results["retrieved_edges_to_string"] = { "status": "FAIL", "error": str(e), "message": "retrieved_edges_to_string test failed", } print(f"❌ retrieved_edges_to_string test failed: {e}") def test_load_class_function(self): """Test load_class function.""" print("\n🧪 Testing load_class function...") try: # Create a temporary Python file with a test class test_module_file = os.path.join(self.test_data_dir, "test_model.py") with open(test_module_file, "w") as f: f.write(""" class TestModel: def __init__(self): self.name = "TestModel" def get_name(self): return self.name """) # Test loading the class loaded_class = load_class(test_module_file, "TestModel") instance = loaded_class() if hasattr(instance, "get_name") and instance.get_name() == "TestModel": self.test_results["load_class"] = { "status": "PASS", "message": "load_class function works correctly", } print("✅ load_class test passed") else: self.test_results["load_class"] = { "status": "FAIL", "message": "load_class function did not load class correctly", } print("❌ load_class test failed: class not loaded correctly") except Exception as e: self.test_results["load_class"] = { "status": "FAIL", "error": str(e), "message": "load_class test failed", } print(f"❌ load_class test failed: {e}") async def run_all_tests(self): """Run all tests.""" print("🚀 Starting Cognee MCP Server Test Suite") print("=" * 50) await self.setup() # Test MCP server integration first await self.test_mcp_server_startup_and_tools() # Run tests in logical order await self.test_prune() # Start with clean slate # Test cognify twice to make sure updating a dataset with new docs is working as expected await self.test_cognify( test_text="Artificial Intelligence is transforming the world through machine learning and deep learning technologies.", test_name="Cognify1", ) await self.test_cognify( test_text="Natural language processing (NLP) is an interdisciplinary subfield of computer science and information retrieval.", test_name="Cognify2", ) await self.test_codify() await self.test_cognee_add_developer_rules() # Test list_data and delete functionality await self.test_list_data() await self.test_delete() await self.test_search_functionality() # Test utility functions (synchronous) self.test_utility_functions() self.test_load_class_function() await self.cleanup() # Print summary self.print_test_summary() def print_test_summary(self): """Print test results summary.""" print("\n" + "=" * 50) print("📊 TEST RESULTS SUMMARY") print("=" * 50) passed = 0 failed = 0 for test_name, result in self.test_results.items(): if result["status"] == "PASS": status_emoji = "✅" passed += 1 else: status_emoji = "❌" failed += 1 print(f"{status_emoji} {test_name}: {result['status']}") if result["status"] == "FAIL" and "error" in result: print(f" Error: {result['error']}") print("\n" + "-" * 50) total_tests = passed + failed print(f"Total Tests: {total_tests}") print(f"Passed: {passed}") print(f"Failed: {failed}") print(f"Success Rate: {(passed / total_tests * 100):.1f}%") if failed > 0: print(f"\n ⚠️ {failed} test(s) failed - review results above for details") async def main(): """Main function to run the test suite.""" client = CogneeTestClient() await client.run_all_tests() if __name__ == "__main__": from logging import ERROR logger = setup_logging(log_level=ERROR) asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/topoteretes/cognee'

If you have feedback or need assistance with the MCP directory API, please join our Discord server