"""
E2E Tests for Document Handling System
Test completi per upload, storage, parsing e analisi documenti.
"""
import pytest
import pytest_asyncio
import asyncio
from pathlib import Path
import tempfile
from src.file_handler.storage import DocumentStorage
from src.file_handler.parsers import DocumentParserFactory, PDFParser, TextParser
from src.file_handler.manager import DocumentManager
@pytest_asyncio.fixture
async def document_storage():
"""Fixture per DocumentStorage con cleanup."""
from src.config.settings import settings
storage = DocumentStorage(
redis_url=settings.redis_url,
storage_path="./test_data/documents",
retention_days=7
)
await storage.connect()
yield storage
await storage.close()
@pytest_asyncio.fixture
async def document_manager():
"""Fixture per DocumentManager."""
from src.config.settings import settings
manager = DocumentManager(
redis_url=settings.redis_url,
storage_path="./test_data/documents",
retention_days=7
)
yield manager
await manager.storage.close()
@pytest.fixture
def sample_text_file():
"""Crea file di testo di esempio."""
content = """IRIS - Intelligent Routine Integration System
Questo è un documento di test per verificare il sistema di gestione documenti.
Funzionalità principali:
1. Upload documenti
2. Parsing automatico
3. Analisi con LLM
4. Storage con deduplicazione
Il sistema supporta PDF, TXT, CSV, JSON e altri formati.
"""
return content.encode('utf-8')
@pytest.fixture
def sample_pdf_content():
"""Crea contenuto PDF semplice (mock)."""
# In un test reale, qui ci sarebbe un PDF vero
# Per ora usiamo un marker
return b"%PDF-1.4\nMock PDF content for testing"
class TestDocumentStorage:
"""Test per DocumentStorage."""
@pytest.mark.asyncio
async def test_store_and_retrieve_document(self, document_storage, sample_text_file):
"""Test storage e retrieval documento."""
user_id = "test_user_001"
filename = "test_document.txt"
# Store documento
result = await document_storage.store_document(
user_id=user_id,
file_data=sample_text_file,
filename=filename,
mime_type="text/plain"
)
assert 'doc_id' in result
assert 'file_hash' in result
assert result['filename'] == filename
assert result['user_id'] == user_id
doc_id = result['doc_id']
# Retrieve metadata
metadata = await document_storage.get_document(doc_id)
assert metadata is not None
assert metadata['filename'] == filename
# Retrieve content
content = await document_storage.get_document_content(doc_id)
assert content == sample_text_file
print(f"✅ Store and retrieve test passed - doc_id: {doc_id}")
@pytest.mark.asyncio
async def test_document_deduplication(self, document_storage, sample_text_file):
"""Test deduplicazione documenti identici."""
user_id = "test_user_002"
# Upload stesso file due volte
result1 = await document_storage.store_document(
user_id=user_id,
file_data=sample_text_file,
filename="doc1.txt",
mime_type="text/plain"
)
result2 = await document_storage.store_document(
user_id=user_id,
file_data=sample_text_file,
filename="doc2.txt", # Nome diverso ma contenuto identico
mime_type="text/plain"
)
# Stesso hash, stesso doc_id (deduplicazione attiva)
assert result1['file_hash'] == result2['file_hash']
assert result1['doc_id'] == result2['doc_id'] # DEDUPLICAZIONE: stesso contenuto = stesso doc
print(f"✅ Deduplication test passed - same doc_id for same content: {result1['doc_id'][:20]}")
@pytest.mark.asyncio
async def test_list_user_documents(self, document_storage, sample_text_file):
"""Test listing documenti utente."""
import uuid
user_id = f"test_user_list_{uuid.uuid4().hex[:8]}" # User ID unico per evitare conflitti
# Upload multipli documenti con contenuto DIVERSO
doc_ids = []
for i in range(3):
# Modifica il contenuto per evitare deduplicazione
modified_content = sample_text_file + f"\n\nDocument version {i}".encode('utf-8')
result = await document_storage.store_document(
user_id=user_id,
file_data=modified_content,
filename=f"document_{i}.txt",
mime_type="text/plain"
)
doc_ids.append(result['doc_id'])
# List documenti
documents = await document_storage.list_user_documents(user_id)
assert len(documents) == 3, f"Expected 3 documents, got {len(documents)}"
assert all(doc['user_id'] == user_id for doc in documents)
# Verifica ordinamento (più recenti prima)
timestamps = [doc['uploaded_at'] for doc in documents]
assert timestamps == sorted(timestamps, reverse=True)
print(f"✅ List documents test passed - {len(documents)} documents for user {user_id}")
@pytest.mark.asyncio
async def test_delete_document(self, document_storage, sample_text_file):
"""Test eliminazione documento."""
user_id = "test_user_004"
# Upload documento
result = await document_storage.store_document(
user_id=user_id,
file_data=sample_text_file,
filename="to_delete.txt",
mime_type="text/plain"
)
doc_id = result['doc_id']
# Verifica esistenza
metadata = await document_storage.get_document(doc_id)
assert metadata is not None
# Elimina
deleted = await document_storage.delete_document(doc_id)
assert deleted is True
# Verifica eliminazione
metadata_after = await document_storage.get_document(doc_id)
assert metadata_after is None
print(f"✅ Delete document test passed")
class TestDocumentParsers:
"""Test per document parsers."""
def test_text_parser(self, sample_text_file):
"""Test TextParser."""
result = TextParser.parse(sample_text_file, "test.txt")
assert 'text' in result
assert 'encoding' in result
assert 'char_count' in result
assert len(result['text']) > 0
assert 'IRIS' in result['text']
print(f"✅ TextParser test passed - {result['char_count']} chars")
def test_parser_factory(self, sample_text_file):
"""Test DocumentParserFactory."""
result = DocumentParserFactory.parse_document(
file_data=sample_text_file,
mime_type="text/plain",
filename="test.txt"
)
assert result['supported'] is True
assert 'text' in result
assert result['parser'] == 'TextParser'
print(f"✅ ParserFactory test passed")
def test_unsupported_format(self):
"""Test formato non supportato."""
result = DocumentParserFactory.parse_document(
file_data=b"random binary data",
mime_type="application/octet-stream",
filename="unknown.bin"
)
assert result['supported'] is False
assert 'error' in result
print(f"✅ Unsupported format test passed")
class TestDocumentManager:
"""Test per DocumentManager."""
@pytest.mark.asyncio
async def test_document_upload_and_parse(self, document_manager, sample_text_file):
"""Test upload completo con parsing."""
user_id = "test_user_005"
result = await document_manager.handle_document_upload(
file_data=sample_text_file,
filename="test_upload.txt",
user_id=user_id,
mime_type="text/plain",
auto_parse=True
)
assert result['success'] is True
assert 'document' in result
assert 'extracted_text' in result
assert len(result['extracted_text']) > 0
doc_id = result['document']['doc_id']
print(f"✅ Upload and parse test passed - doc_id: {doc_id}")
@pytest.mark.asyncio
async def test_list_documents(self, document_manager, sample_text_file):
"""Test listing via manager."""
user_id = "test_user_006"
# Upload documento
await document_manager.handle_document_upload(
file_data=sample_text_file,
filename="list_test.txt",
user_id=user_id,
mime_type="text/plain"
)
# List
result = await document_manager.list_documents(user_id)
assert result['success'] is True
assert result['count'] >= 1
assert len(result['documents']) >= 1
print(f"✅ List via manager test passed - {result['count']} docs")
@pytest.mark.asyncio
async def test_document_analysis_without_llm(self, document_manager, sample_text_file):
"""Test analisi documento (senza LLM reale)."""
user_id = "test_user_007"
# Upload documento
upload_result = await document_manager.handle_document_upload(
file_data=sample_text_file,
filename="analyze_test.txt",
user_id=user_id,
mime_type="text/plain"
)
assert upload_result['success'] is True
doc_id = upload_result['document']['doc_id']
# Nota: Questo test richiede LLM API configurata
# Se non disponibile, skipperà gracefully
try:
analysis_result = await document_manager.analyze_document(
doc_id=doc_id,
analysis_request="Riassumi questo documento",
user_id=user_id
)
# Se LLM disponibile
if analysis_result['success']:
assert 'analysis' in analysis_result
print(f"✅ Document analysis test passed (with LLM)")
else:
print(f"⚠️ Document analysis skipped (LLM not available)")
except Exception as e:
print(f"⚠️ Document analysis test skipped: {e}")
class TestDocumentWorkflow:
"""Test workflow completo end-to-end."""
@pytest.mark.asyncio
async def test_complete_document_lifecycle(self, document_manager, sample_text_file):
"""Test lifecycle completo: upload → list → delete."""
user_id = "test_user_008"
# 1. Upload
upload_result = await document_manager.handle_document_upload(
file_data=sample_text_file,
filename="lifecycle_test.txt",
user_id=user_id,
mime_type="text/plain"
)
assert upload_result['success'] is True
doc_id = upload_result['document']['doc_id']
print(f" 1. Upload: ✅ doc_id={doc_id[:16]}...")
# 2. List
list_result = await document_manager.list_documents(user_id)
assert list_result['success'] is True
assert list_result['count'] >= 1
print(f" 2. List: ✅ {list_result['count']} document(s)")
# 3. Delete
delete_result = await document_manager.delete_document(doc_id, user_id)
assert delete_result['success'] is True
print(f" 3. Delete: ✅")
# 4. Verify deletion
list_after = await document_manager.list_documents(user_id)
# Doc deve essere assente
doc_ids_after = [d['doc_id'] for d in list_after['documents']]
assert doc_id not in doc_ids_after
print(f" 4. Verify: ✅")
print(f"✅ Complete lifecycle test passed")
# Runner per test manuali
if __name__ == "__main__":
import sys
print("🧪 IRIS Document Handling - E2E Tests")
print("=" * 60)
# Verifica Redis
try:
import redis
r = redis.Redis(host='localhost', port=6379, db=1)
r.ping()
print("✅ Redis connection OK")
except Exception as e:
print(f"❌ Redis connection failed: {e}")
print(" Assicurati che Redis sia attivo: redis-server")
sys.exit(1)
# Run tests
print("\n📋 Running tests...\n")
pytest.main([__file__, "-v", "-s"])