Skip to main content
Glama
run_e2e_tests.pyβ€’18.4 kB
#!/usr/bin/env python3 """ End-to-End Integration Tests for IRIS System Comprehensive test suite that validates all core components: - Redis connection and data persistence - LLM providers (OpenAI and Anthropic) - Conversation management - Prompt system with Jinja2 templates - Complete chat workflow """ import asyncio import sys import os from pathlib import Path # Add project root to path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from src.llm_core.client import IRISClient, get_iris_client, quick_chat from src.llm_core.prompts.types import PromptType from src.llm_core.providers.base import MessageRole from src.redis_client.client import RedisClient class E2ETestSuite: """End-to-End Test Suite""" def __init__(self): self.client = None self.test_sessions = [] self.passed_tests = 0 self.failed_tests = 0 self.total_tests = 0 async def setup(self): """Setup test environment""" print("πŸ”§ Setting up test environment...") self.client = get_iris_client() await self.client._ensure_redis_connected() print("βœ… Test environment ready") async def cleanup(self): """Cleanup test data""" print("🧹 Cleaning up test data...") for session_id in self.test_sessions: try: await self.client.end_conversation(session_id) except Exception: pass # Ignore cleanup errors print("βœ… Cleanup completed") def log_test_result(self, test_name: str, success: bool, error: str = None): """Log test result""" self.total_tests += 1 if success: self.passed_tests += 1 print(f"βœ… {test_name}") else: self.failed_tests += 1 print(f"❌ {test_name}") if error: print(f" Error: {error}") async def test_redis_connection_and_persistence(self): """Test Redis connection and data persistence""" try: # Test Redis connection redis_client = RedisClient() await redis_client.connect() # Test basic operations test_key = "test:e2e:redis" test_value = "test_value_123" # Set and get await redis_client.set(test_key, test_value, ttl=60) retrieved = await redis_client.get(test_key) assert retrieved == test_value, f"Expected {test_value}, got {retrieved}" # Hash operations hash_key = "test:e2e:hash" hash_data = {"field1": "value1", "field2": "value2"} await redis_client.hmset(hash_key, hash_data) retrieved_hash = await redis_client.hgetall(hash_key) assert retrieved_hash == hash_data, f"Hash data mismatch" # Cleanup await redis_client.delete(test_key) await redis_client.delete(hash_key) await redis_client.disconnect() self.log_test_result("Redis Connection and Persistence", True) except Exception as e: self.log_test_result("Redis Connection and Persistence", False, str(e)) async def test_llm_providers_health(self): """Test all LLM providers by making simple requests""" try: # Test basic functionality with both providers response_openai = await quick_chat( user_id="health_openai", user_name="Health Test", message="Hello", prompt_type=PromptType.GENERAL_CHAT, provider_name="openai" ) assert response_openai, "No response from OpenAI" response_anthropic = await quick_chat( user_id="health_anthropic", user_name="Health Test", message="Hello", prompt_type=PromptType.GENERAL_CHAT, provider_name="anthropic" ) assert response_anthropic, "No response from Anthropic" self.log_test_result("LLM Providers Health Check", True) except Exception as e: self.log_test_result("LLM Providers Health Check", False, str(e)) async def test_conversation_lifecycle(self): """Test complete conversation lifecycle""" try: # Start conversation session = await self.client.start_conversation( user_id="test_user_e2e", user_name="E2E Test User", prompt_type=PromptType.GENERAL_CHAT, role="Developer", expertise_level="advanced", communication_style="professional" ) self.test_sessions.append(session.session_id) assert session.user_id == "test_user_e2e" assert session.session_id is not None # Send first message response1 = await self.client.chat( session.session_id, "Hello! Can you help me with Python programming?" ) assert response1.content, "No response content" assert response1.provider in ["openai", "anthropic"], f"Invalid provider: {response1.provider}" # Send follow-up message response2 = await self.client.chat( session.session_id, "What are the best practices for async programming?" ) assert response2.content, "No response content for second message" # Check conversation history messages = await self.client.get_conversation_history(session.session_id) assert len(messages) >= 4, f"Expected at least 4 messages, got {len(messages)}" # Verify message order and content user_messages = [msg for msg in messages if msg.role == MessageRole.USER] assistant_messages = [msg for msg in messages if msg.role == MessageRole.ASSISTANT] assert len(user_messages) == 2, f"Expected 2 user messages, got {len(user_messages)}" assert len(assistant_messages) == 2, f"Expected 2 assistant messages, got {len(assistant_messages)}" assert "Python programming" in user_messages[0].content assert "async programming" in user_messages[1].content self.log_test_result("Conversation Lifecycle", True) except Exception as e: self.log_test_result("Conversation Lifecycle", False, str(e)) async def test_prompt_system_integration(self): """Test prompt system with different templates""" try: # Test different prompt types prompt_types = [ PromptType.GENERAL_CHAT, PromptType.BUSINESS_ASSISTANT, PromptType.EMAIL_ANALYSIS, PromptType.EMAIL_COMPOSE ] for prompt_type in prompt_types: session = await self.client.start_conversation( user_id=f"test_user_{prompt_type.value}", user_name="Prompt Test User", prompt_type=prompt_type, role="Tester", expertise_level="intermediate" ) self.test_sessions.append(session.session_id) # Send a message to trigger prompt generation response = await self.client.chat( session.session_id, f"Test message for {prompt_type.value} prompt type" ) assert response.content, f"No response for {prompt_type.value}" assert len(response.content) > 10, f"Response too short for {prompt_type.value}" self.log_test_result("Prompt System Integration", True) except Exception as e: self.log_test_result("Prompt System Integration", False, str(e)) async def test_session_metadata_and_stats(self): """Test session metadata and statistics""" try: # Create session with metadata session = await self.client.start_conversation( user_id="test_metadata_user", user_name="Metadata Test User", prompt_type=PromptType.GENERAL_CHAT, role="QA Engineer", expertise_level="expert", communication_style="technical" ) self.test_sessions.append(session.session_id) # Send some messages to generate stats await self.client.chat(session.session_id, "First test message") await self.client.chat(session.session_id, "Second test message") await self.client.chat(session.session_id, "Third test message") # Check session stats stats = await self.client.get_session_stats(session.session_id) assert stats is not None, "No session stats returned" assert "total_messages" in stats, "No total_messages in stats" assert stats["total_messages"] >= 6, f"Expected at least 6 messages, got {stats['total_messages']}" self.log_test_result("Session Metadata and Stats", True) except Exception as e: self.log_test_result("Session Metadata and Stats", False, str(e)) async def test_provider_switching(self): """Test switching between different LLM providers""" try: session = await self.client.start_conversation( user_id="test_provider_switch", user_name="Provider Switch User", prompt_type=PromptType.GENERAL_CHAT ) self.test_sessions.append(session.session_id) # Test with OpenAI response_openai = await self.client.chat( session.session_id, "Hello from OpenAI!", provider_name="openai" ) assert response_openai.provider == "openai", f"Expected openai, got {response_openai.provider}" # Test with Anthropic response_anthropic = await self.client.chat( session.session_id, "Hello from Anthropic!", provider_name="anthropic" ) assert response_anthropic.provider == "anthropic", f"Expected anthropic, got {response_anthropic.provider}" # Verify both responses in conversation history messages = await self.client.get_conversation_history(session.session_id) assistant_messages = [msg for msg in messages if msg.role == MessageRole.ASSISTANT] assert len(assistant_messages) >= 2, f"Expected at least 2 assistant messages, got {len(assistant_messages)}" self.log_test_result("Provider Switching", True) except Exception as e: self.log_test_result("Provider Switching", False, str(e)) async def test_quick_chat_convenience_function(self): """Test the quick_chat convenience function""" try: response = await quick_chat( user_id="quick_chat_user", user_name="Quick Chat User", message="This is a quick chat test message", prompt_type=PromptType.GENERAL_CHAT, role="User", expertise_level="beginner" ) assert response, "No response from quick_chat" assert len(response) > 10, f"Response too short: {len(response)} chars" assert isinstance(response, str), f"Expected str, got {type(response)}" self.log_test_result("Quick Chat Convenience Function", True) except Exception as e: self.log_test_result("Quick Chat Convenience Function", False, str(e)) async def test_error_handling_and_recovery(self): """Test error handling and system recovery""" try: # Test with invalid session ID try: await self.client.chat("invalid_session_id", "Test message") assert False, "Should have raised an exception for invalid session" except Exception as e: assert "not found" in str(e).lower() or "invalid" in str(e).lower(), f"Unexpected error: {e}" # Test with invalid provider session = await self.client.start_conversation( user_id="error_test_user", user_name="Error Test User", prompt_type=PromptType.GENERAL_CHAT ) self.test_sessions.append(session.session_id) try: await self.client.chat( session.session_id, "Test message", provider_name="invalid_provider" ) assert False, "Should have raised an exception for invalid provider" except Exception as e: assert "provider" in str(e).lower(), f"Unexpected error: {e}" self.log_test_result("Error Handling and Recovery", True) except Exception as e: self.log_test_result("Error Handling and Recovery", False, str(e)) async def test_concurrent_conversations(self): """Test handling multiple concurrent conversations""" try: # Create multiple sessions sessions = [] for i in range(3): session = await self.client.start_conversation( user_id=f"concurrent_user_{i}", user_name=f"Concurrent User {i}", prompt_type=PromptType.GENERAL_CHAT ) sessions.append(session) self.test_sessions.append(session.session_id) # Send messages concurrently tasks = [] for i, session in enumerate(sessions): task = self.client.chat( session.session_id, f"Concurrent message from user {i}" ) tasks.append(task) # Wait for all responses responses = await asyncio.gather(*tasks) # Verify all responses assert len(responses) == 3, f"Expected 3 responses, got {len(responses)}" for i, response in enumerate(responses): assert response.content, f"No content in response {i}" assert len(response.content) > 10, f"Response {i} too short" self.log_test_result("Concurrent Conversations", True) except Exception as e: self.log_test_result("Concurrent Conversations", False, str(e)) async def test_system_health_check(self): """Test overall system health""" try: # Check Redis connection await self.client._ensure_redis_connected() # Test basic functionality with both providers response_openai = await quick_chat( user_id="health_check_openai", user_name="Health Check User", message="System health check message", prompt_type=PromptType.GENERAL_CHAT, provider_name="openai" ) assert response_openai, "No response from OpenAI health check" assert len(response_openai) > 5, f"OpenAI health check response too short: {len(response_openai)} chars" response_anthropic = await quick_chat( user_id="health_check_anthropic", user_name="Health Check User", message="System health check message", prompt_type=PromptType.GENERAL_CHAT, provider_name="anthropic" ) assert response_anthropic, "No response from Anthropic health check" assert len(response_anthropic) > 5, f"Anthropic health check response too short: {len(response_anthropic)} chars" self.log_test_result("System Health Check", True) except Exception as e: self.log_test_result("System Health Check", False, str(e)) async def run_all_tests(self): """Run all E2E tests""" print("πŸš€ Starting IRIS E2E Integration Tests") print("=" * 60) await self.setup() # Run all tests test_methods = [ self.test_redis_connection_and_persistence, self.test_llm_providers_health, self.test_conversation_lifecycle, self.test_prompt_system_integration, self.test_session_metadata_and_stats, self.test_provider_switching, self.test_quick_chat_convenience_function, self.test_error_handling_and_recovery, self.test_concurrent_conversations, self.test_system_health_check ] for test_method in test_methods: await test_method() await self.cleanup() # Print summary print("=" * 60) print(f"πŸ“Š Test Results Summary:") print(f" Total Tests: {self.total_tests}") print(f" Passed: {self.passed_tests}") print(f" Failed: {self.failed_tests}") print(f" Success Rate: {(self.passed_tests/self.total_tests)*100:.1f}%") if self.failed_tests == 0: print("πŸŽ‰ All tests passed! IRIS system is fully operational.") return True else: print(f"⚠️ {self.failed_tests} test(s) failed. Please review the errors above.") return False async def main(): """Main test runner""" test_suite = E2ETestSuite() success = await test_suite.run_all_tests() sys.exit(0 if success else 1) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ilvolodel/iris-legacy'

If you have feedback or need assistance with the MCP directory API, please join our Discord server