#!/usr/bin/env python3
"""
End-to-End Integration Tests for IRIS System
Comprehensive test suite that validates all core components:
- Redis connection and data persistence
- LLM providers (OpenAI and Anthropic)
- Conversation management
- Prompt system with Jinja2 templates
- Complete chat workflow
"""
import asyncio
import sys
import os
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.llm_core.client import IRISClient, get_iris_client, quick_chat
from src.llm_core.prompts.types import PromptType
from src.llm_core.providers.base import MessageRole
from src.redis_client.client import RedisClient
class E2ETestSuite:
"""End-to-End Test Suite"""
def __init__(self):
self.client = None
self.test_sessions = []
self.passed_tests = 0
self.failed_tests = 0
self.total_tests = 0
async def setup(self):
"""Setup test environment"""
print("π§ Setting up test environment...")
self.client = get_iris_client()
await self.client._ensure_redis_connected()
print("β
Test environment ready")
async def cleanup(self):
"""Cleanup test data"""
print("π§Ή Cleaning up test data...")
for session_id in self.test_sessions:
try:
await self.client.end_conversation(session_id)
except Exception:
pass # Ignore cleanup errors
print("β
Cleanup completed")
def log_test_result(self, test_name: str, success: bool, error: str = None):
"""Log test result"""
self.total_tests += 1
if success:
self.passed_tests += 1
print(f"β
{test_name}")
else:
self.failed_tests += 1
print(f"β {test_name}")
if error:
print(f" Error: {error}")
async def test_redis_connection_and_persistence(self):
"""Test Redis connection and data persistence"""
try:
# Test Redis connection
redis_client = RedisClient()
await redis_client.connect()
# Test basic operations
test_key = "test:e2e:redis"
test_value = "test_value_123"
# Set and get
await redis_client.set(test_key, test_value, ttl=60)
retrieved = await redis_client.get(test_key)
assert retrieved == test_value, f"Expected {test_value}, got {retrieved}"
# Hash operations
hash_key = "test:e2e:hash"
hash_data = {"field1": "value1", "field2": "value2"}
await redis_client.hmset(hash_key, hash_data)
retrieved_hash = await redis_client.hgetall(hash_key)
assert retrieved_hash == hash_data, f"Hash data mismatch"
# Cleanup
await redis_client.delete(test_key)
await redis_client.delete(hash_key)
await redis_client.disconnect()
self.log_test_result("Redis Connection and Persistence", True)
except Exception as e:
self.log_test_result("Redis Connection and Persistence", False, str(e))
async def test_llm_providers_health(self):
"""Test all LLM providers by making simple requests"""
try:
# Test basic functionality with both providers
response_openai = await quick_chat(
user_id="health_openai",
user_name="Health Test",
message="Hello",
prompt_type=PromptType.GENERAL_CHAT,
provider_name="openai"
)
assert response_openai, "No response from OpenAI"
response_anthropic = await quick_chat(
user_id="health_anthropic",
user_name="Health Test",
message="Hello",
prompt_type=PromptType.GENERAL_CHAT,
provider_name="anthropic"
)
assert response_anthropic, "No response from Anthropic"
self.log_test_result("LLM Providers Health Check", True)
except Exception as e:
self.log_test_result("LLM Providers Health Check", False, str(e))
async def test_conversation_lifecycle(self):
"""Test complete conversation lifecycle"""
try:
# Start conversation
session = await self.client.start_conversation(
user_id="test_user_e2e",
user_name="E2E Test User",
prompt_type=PromptType.GENERAL_CHAT,
role="Developer",
expertise_level="advanced",
communication_style="professional"
)
self.test_sessions.append(session.session_id)
assert session.user_id == "test_user_e2e"
assert session.session_id is not None
# Send first message
response1 = await self.client.chat(
session.session_id,
"Hello! Can you help me with Python programming?"
)
assert response1.content, "No response content"
assert response1.provider in ["openai", "anthropic"], f"Invalid provider: {response1.provider}"
# Send follow-up message
response2 = await self.client.chat(
session.session_id,
"What are the best practices for async programming?"
)
assert response2.content, "No response content for second message"
# Check conversation history
messages = await self.client.get_conversation_history(session.session_id)
assert len(messages) >= 4, f"Expected at least 4 messages, got {len(messages)}"
# Verify message order and content
user_messages = [msg for msg in messages if msg.role == MessageRole.USER]
assistant_messages = [msg for msg in messages if msg.role == MessageRole.ASSISTANT]
assert len(user_messages) == 2, f"Expected 2 user messages, got {len(user_messages)}"
assert len(assistant_messages) == 2, f"Expected 2 assistant messages, got {len(assistant_messages)}"
assert "Python programming" in user_messages[0].content
assert "async programming" in user_messages[1].content
self.log_test_result("Conversation Lifecycle", True)
except Exception as e:
self.log_test_result("Conversation Lifecycle", False, str(e))
async def test_prompt_system_integration(self):
"""Test prompt system with different templates"""
try:
# Test different prompt types
prompt_types = [
PromptType.GENERAL_CHAT,
PromptType.BUSINESS_ASSISTANT,
PromptType.EMAIL_ANALYSIS,
PromptType.EMAIL_COMPOSE
]
for prompt_type in prompt_types:
session = await self.client.start_conversation(
user_id=f"test_user_{prompt_type.value}",
user_name="Prompt Test User",
prompt_type=prompt_type,
role="Tester",
expertise_level="intermediate"
)
self.test_sessions.append(session.session_id)
# Send a message to trigger prompt generation
response = await self.client.chat(
session.session_id,
f"Test message for {prompt_type.value} prompt type"
)
assert response.content, f"No response for {prompt_type.value}"
assert len(response.content) > 10, f"Response too short for {prompt_type.value}"
self.log_test_result("Prompt System Integration", True)
except Exception as e:
self.log_test_result("Prompt System Integration", False, str(e))
async def test_session_metadata_and_stats(self):
"""Test session metadata and statistics"""
try:
# Create session with metadata
session = await self.client.start_conversation(
user_id="test_metadata_user",
user_name="Metadata Test User",
prompt_type=PromptType.GENERAL_CHAT,
role="QA Engineer",
expertise_level="expert",
communication_style="technical"
)
self.test_sessions.append(session.session_id)
# Send some messages to generate stats
await self.client.chat(session.session_id, "First test message")
await self.client.chat(session.session_id, "Second test message")
await self.client.chat(session.session_id, "Third test message")
# Check session stats
stats = await self.client.get_session_stats(session.session_id)
assert stats is not None, "No session stats returned"
assert "total_messages" in stats, "No total_messages in stats"
assert stats["total_messages"] >= 6, f"Expected at least 6 messages, got {stats['total_messages']}"
self.log_test_result("Session Metadata and Stats", True)
except Exception as e:
self.log_test_result("Session Metadata and Stats", False, str(e))
async def test_provider_switching(self):
"""Test switching between different LLM providers"""
try:
session = await self.client.start_conversation(
user_id="test_provider_switch",
user_name="Provider Switch User",
prompt_type=PromptType.GENERAL_CHAT
)
self.test_sessions.append(session.session_id)
# Test with OpenAI
response_openai = await self.client.chat(
session.session_id,
"Hello from OpenAI!",
provider_name="openai"
)
assert response_openai.provider == "openai", f"Expected openai, got {response_openai.provider}"
# Test with Anthropic
response_anthropic = await self.client.chat(
session.session_id,
"Hello from Anthropic!",
provider_name="anthropic"
)
assert response_anthropic.provider == "anthropic", f"Expected anthropic, got {response_anthropic.provider}"
# Verify both responses in conversation history
messages = await self.client.get_conversation_history(session.session_id)
assistant_messages = [msg for msg in messages if msg.role == MessageRole.ASSISTANT]
assert len(assistant_messages) >= 2, f"Expected at least 2 assistant messages, got {len(assistant_messages)}"
self.log_test_result("Provider Switching", True)
except Exception as e:
self.log_test_result("Provider Switching", False, str(e))
async def test_quick_chat_convenience_function(self):
"""Test the quick_chat convenience function"""
try:
response = await quick_chat(
user_id="quick_chat_user",
user_name="Quick Chat User",
message="This is a quick chat test message",
prompt_type=PromptType.GENERAL_CHAT,
role="User",
expertise_level="beginner"
)
assert response, "No response from quick_chat"
assert len(response) > 10, f"Response too short: {len(response)} chars"
assert isinstance(response, str), f"Expected str, got {type(response)}"
self.log_test_result("Quick Chat Convenience Function", True)
except Exception as e:
self.log_test_result("Quick Chat Convenience Function", False, str(e))
async def test_error_handling_and_recovery(self):
"""Test error handling and system recovery"""
try:
# Test with invalid session ID
try:
await self.client.chat("invalid_session_id", "Test message")
assert False, "Should have raised an exception for invalid session"
except Exception as e:
assert "not found" in str(e).lower() or "invalid" in str(e).lower(), f"Unexpected error: {e}"
# Test with invalid provider
session = await self.client.start_conversation(
user_id="error_test_user",
user_name="Error Test User",
prompt_type=PromptType.GENERAL_CHAT
)
self.test_sessions.append(session.session_id)
try:
await self.client.chat(
session.session_id,
"Test message",
provider_name="invalid_provider"
)
assert False, "Should have raised an exception for invalid provider"
except Exception as e:
assert "provider" in str(e).lower(), f"Unexpected error: {e}"
self.log_test_result("Error Handling and Recovery", True)
except Exception as e:
self.log_test_result("Error Handling and Recovery", False, str(e))
async def test_concurrent_conversations(self):
"""Test handling multiple concurrent conversations"""
try:
# Create multiple sessions
sessions = []
for i in range(3):
session = await self.client.start_conversation(
user_id=f"concurrent_user_{i}",
user_name=f"Concurrent User {i}",
prompt_type=PromptType.GENERAL_CHAT
)
sessions.append(session)
self.test_sessions.append(session.session_id)
# Send messages concurrently
tasks = []
for i, session in enumerate(sessions):
task = self.client.chat(
session.session_id,
f"Concurrent message from user {i}"
)
tasks.append(task)
# Wait for all responses
responses = await asyncio.gather(*tasks)
# Verify all responses
assert len(responses) == 3, f"Expected 3 responses, got {len(responses)}"
for i, response in enumerate(responses):
assert response.content, f"No content in response {i}"
assert len(response.content) > 10, f"Response {i} too short"
self.log_test_result("Concurrent Conversations", True)
except Exception as e:
self.log_test_result("Concurrent Conversations", False, str(e))
async def test_system_health_check(self):
"""Test overall system health"""
try:
# Check Redis connection
await self.client._ensure_redis_connected()
# Test basic functionality with both providers
response_openai = await quick_chat(
user_id="health_check_openai",
user_name="Health Check User",
message="System health check message",
prompt_type=PromptType.GENERAL_CHAT,
provider_name="openai"
)
assert response_openai, "No response from OpenAI health check"
assert len(response_openai) > 5, f"OpenAI health check response too short: {len(response_openai)} chars"
response_anthropic = await quick_chat(
user_id="health_check_anthropic",
user_name="Health Check User",
message="System health check message",
prompt_type=PromptType.GENERAL_CHAT,
provider_name="anthropic"
)
assert response_anthropic, "No response from Anthropic health check"
assert len(response_anthropic) > 5, f"Anthropic health check response too short: {len(response_anthropic)} chars"
self.log_test_result("System Health Check", True)
except Exception as e:
self.log_test_result("System Health Check", False, str(e))
async def run_all_tests(self):
"""Run all E2E tests"""
print("π Starting IRIS E2E Integration Tests")
print("=" * 60)
await self.setup()
# Run all tests
test_methods = [
self.test_redis_connection_and_persistence,
self.test_llm_providers_health,
self.test_conversation_lifecycle,
self.test_prompt_system_integration,
self.test_session_metadata_and_stats,
self.test_provider_switching,
self.test_quick_chat_convenience_function,
self.test_error_handling_and_recovery,
self.test_concurrent_conversations,
self.test_system_health_check
]
for test_method in test_methods:
await test_method()
await self.cleanup()
# Print summary
print("=" * 60)
print(f"π Test Results Summary:")
print(f" Total Tests: {self.total_tests}")
print(f" Passed: {self.passed_tests}")
print(f" Failed: {self.failed_tests}")
print(f" Success Rate: {(self.passed_tests/self.total_tests)*100:.1f}%")
if self.failed_tests == 0:
print("π All tests passed! IRIS system is fully operational.")
return True
else:
print(f"β οΈ {self.failed_tests} test(s) failed. Please review the errors above.")
return False
async def main():
"""Main test runner"""
test_suite = E2ETestSuite()
success = await test_suite.run_all_tests()
sys.exit(0 if success else 1)
if __name__ == "__main__":
asyncio.run(main())