Skip to main content
Glama
test_document_handling_e2e.py12.7 kB
""" E2E Tests for Document Handling System Test completi per upload, storage, parsing e analisi documenti. """ import pytest import pytest_asyncio import asyncio from pathlib import Path import tempfile from src.file_handler.storage import DocumentStorage from src.file_handler.parsers import DocumentParserFactory, PDFParser, TextParser from src.file_handler.manager import DocumentManager @pytest_asyncio.fixture async def document_storage(): """Fixture per DocumentStorage con cleanup.""" from src.config.settings import settings storage = DocumentStorage( redis_url=settings.redis_url, storage_path="./test_data/documents", retention_days=7 ) await storage.connect() yield storage await storage.close() @pytest_asyncio.fixture async def document_manager(): """Fixture per DocumentManager.""" from src.config.settings import settings manager = DocumentManager( redis_url=settings.redis_url, storage_path="./test_data/documents", retention_days=7 ) yield manager await manager.storage.close() @pytest.fixture def sample_text_file(): """Crea file di testo di esempio.""" content = """IRIS - Intelligent Routine Integration System Questo è un documento di test per verificare il sistema di gestione documenti. Funzionalità principali: 1. Upload documenti 2. Parsing automatico 3. Analisi con LLM 4. Storage con deduplicazione Il sistema supporta PDF, TXT, CSV, JSON e altri formati. """ return content.encode('utf-8') @pytest.fixture def sample_pdf_content(): """Crea contenuto PDF semplice (mock).""" # In un test reale, qui ci sarebbe un PDF vero # Per ora usiamo un marker return b"%PDF-1.4\nMock PDF content for testing" class TestDocumentStorage: """Test per DocumentStorage.""" @pytest.mark.asyncio async def test_store_and_retrieve_document(self, document_storage, sample_text_file): """Test storage e retrieval documento.""" user_id = "test_user_001" filename = "test_document.txt" # Store documento result = await document_storage.store_document( user_id=user_id, file_data=sample_text_file, filename=filename, mime_type="text/plain" ) assert 'doc_id' in result assert 'file_hash' in result assert result['filename'] == filename assert result['user_id'] == user_id doc_id = result['doc_id'] # Retrieve metadata metadata = await document_storage.get_document(doc_id) assert metadata is not None assert metadata['filename'] == filename # Retrieve content content = await document_storage.get_document_content(doc_id) assert content == sample_text_file print(f"✅ Store and retrieve test passed - doc_id: {doc_id}") @pytest.mark.asyncio async def test_document_deduplication(self, document_storage, sample_text_file): """Test deduplicazione documenti identici.""" user_id = "test_user_002" # Upload stesso file due volte result1 = await document_storage.store_document( user_id=user_id, file_data=sample_text_file, filename="doc1.txt", mime_type="text/plain" ) result2 = await document_storage.store_document( user_id=user_id, file_data=sample_text_file, filename="doc2.txt", # Nome diverso ma contenuto identico mime_type="text/plain" ) # Stesso hash, stesso doc_id (deduplicazione attiva) assert result1['file_hash'] == result2['file_hash'] assert result1['doc_id'] == result2['doc_id'] # DEDUPLICAZIONE: stesso contenuto = stesso doc print(f"✅ Deduplication test passed - same doc_id for same content: {result1['doc_id'][:20]}") @pytest.mark.asyncio async def test_list_user_documents(self, document_storage, sample_text_file): """Test listing documenti utente.""" import uuid user_id = f"test_user_list_{uuid.uuid4().hex[:8]}" # User ID unico per evitare conflitti # Upload multipli documenti con contenuto DIVERSO doc_ids = [] for i in range(3): # Modifica il contenuto per evitare deduplicazione modified_content = sample_text_file + f"\n\nDocument version {i}".encode('utf-8') result = await document_storage.store_document( user_id=user_id, file_data=modified_content, filename=f"document_{i}.txt", mime_type="text/plain" ) doc_ids.append(result['doc_id']) # List documenti documents = await document_storage.list_user_documents(user_id) assert len(documents) == 3, f"Expected 3 documents, got {len(documents)}" assert all(doc['user_id'] == user_id for doc in documents) # Verifica ordinamento (più recenti prima) timestamps = [doc['uploaded_at'] for doc in documents] assert timestamps == sorted(timestamps, reverse=True) print(f"✅ List documents test passed - {len(documents)} documents for user {user_id}") @pytest.mark.asyncio async def test_delete_document(self, document_storage, sample_text_file): """Test eliminazione documento.""" user_id = "test_user_004" # Upload documento result = await document_storage.store_document( user_id=user_id, file_data=sample_text_file, filename="to_delete.txt", mime_type="text/plain" ) doc_id = result['doc_id'] # Verifica esistenza metadata = await document_storage.get_document(doc_id) assert metadata is not None # Elimina deleted = await document_storage.delete_document(doc_id) assert deleted is True # Verifica eliminazione metadata_after = await document_storage.get_document(doc_id) assert metadata_after is None print(f"✅ Delete document test passed") class TestDocumentParsers: """Test per document parsers.""" def test_text_parser(self, sample_text_file): """Test TextParser.""" result = TextParser.parse(sample_text_file, "test.txt") assert 'text' in result assert 'encoding' in result assert 'char_count' in result assert len(result['text']) > 0 assert 'IRIS' in result['text'] print(f"✅ TextParser test passed - {result['char_count']} chars") def test_parser_factory(self, sample_text_file): """Test DocumentParserFactory.""" result = DocumentParserFactory.parse_document( file_data=sample_text_file, mime_type="text/plain", filename="test.txt" ) assert result['supported'] is True assert 'text' in result assert result['parser'] == 'TextParser' print(f"✅ ParserFactory test passed") def test_unsupported_format(self): """Test formato non supportato.""" result = DocumentParserFactory.parse_document( file_data=b"random binary data", mime_type="application/octet-stream", filename="unknown.bin" ) assert result['supported'] is False assert 'error' in result print(f"✅ Unsupported format test passed") class TestDocumentManager: """Test per DocumentManager.""" @pytest.mark.asyncio async def test_document_upload_and_parse(self, document_manager, sample_text_file): """Test upload completo con parsing.""" user_id = "test_user_005" result = await document_manager.handle_document_upload( file_data=sample_text_file, filename="test_upload.txt", user_id=user_id, mime_type="text/plain", auto_parse=True ) assert result['success'] is True assert 'document' in result assert 'extracted_text' in result assert len(result['extracted_text']) > 0 doc_id = result['document']['doc_id'] print(f"✅ Upload and parse test passed - doc_id: {doc_id}") @pytest.mark.asyncio async def test_list_documents(self, document_manager, sample_text_file): """Test listing via manager.""" user_id = "test_user_006" # Upload documento await document_manager.handle_document_upload( file_data=sample_text_file, filename="list_test.txt", user_id=user_id, mime_type="text/plain" ) # List result = await document_manager.list_documents(user_id) assert result['success'] is True assert result['count'] >= 1 assert len(result['documents']) >= 1 print(f"✅ List via manager test passed - {result['count']} docs") @pytest.mark.asyncio async def test_document_analysis_without_llm(self, document_manager, sample_text_file): """Test analisi documento (senza LLM reale).""" user_id = "test_user_007" # Upload documento upload_result = await document_manager.handle_document_upload( file_data=sample_text_file, filename="analyze_test.txt", user_id=user_id, mime_type="text/plain" ) assert upload_result['success'] is True doc_id = upload_result['document']['doc_id'] # Nota: Questo test richiede LLM API configurata # Se non disponibile, skipperà gracefully try: analysis_result = await document_manager.analyze_document( doc_id=doc_id, analysis_request="Riassumi questo documento", user_id=user_id ) # Se LLM disponibile if analysis_result['success']: assert 'analysis' in analysis_result print(f"✅ Document analysis test passed (with LLM)") else: print(f"⚠️ Document analysis skipped (LLM not available)") except Exception as e: print(f"⚠️ Document analysis test skipped: {e}") class TestDocumentWorkflow: """Test workflow completo end-to-end.""" @pytest.mark.asyncio async def test_complete_document_lifecycle(self, document_manager, sample_text_file): """Test lifecycle completo: upload → list → delete.""" user_id = "test_user_008" # 1. Upload upload_result = await document_manager.handle_document_upload( file_data=sample_text_file, filename="lifecycle_test.txt", user_id=user_id, mime_type="text/plain" ) assert upload_result['success'] is True doc_id = upload_result['document']['doc_id'] print(f" 1. Upload: ✅ doc_id={doc_id[:16]}...") # 2. List list_result = await document_manager.list_documents(user_id) assert list_result['success'] is True assert list_result['count'] >= 1 print(f" 2. List: ✅ {list_result['count']} document(s)") # 3. Delete delete_result = await document_manager.delete_document(doc_id, user_id) assert delete_result['success'] is True print(f" 3. Delete: ✅") # 4. Verify deletion list_after = await document_manager.list_documents(user_id) # Doc deve essere assente doc_ids_after = [d['doc_id'] for d in list_after['documents']] assert doc_id not in doc_ids_after print(f" 4. Verify: ✅") print(f"✅ Complete lifecycle test passed") # Runner per test manuali if __name__ == "__main__": import sys print("🧪 IRIS Document Handling - E2E Tests") print("=" * 60) # Verifica Redis try: import redis r = redis.Redis(host='localhost', port=6379, db=1) r.ping() print("✅ Redis connection OK") except Exception as e: print(f"❌ Redis connection failed: {e}") print(" Assicurati che Redis sia attivo: redis-server") sys.exit(1) # Run tests print("\n📋 Running tests...\n") pytest.main([__file__, "-v", "-s"])

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ilvolodel/iris-legacy'

If you have feedback or need assistance with the MCP directory API, please join our Discord server